diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-05-05 13:35:24 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-05-05 13:35:24 +0200 |
| commit | c6dbac4ce8db20d6e93309ee378c7c4ba7bab51d (patch) | |
| tree | e9514734cc2e8dc6b574f02d5246c805945e0c49 | |
| parent | 835a778d2b879f30070c2db726261704782f0011 (diff) | |
| parent | cb03c311aefbb705b93eebe80456aa8a906690ff (diff) | |
| download | mullvadvpn-c6dbac4ce8db20d6e93309ee378c7c4ba7bab51d.tar.xz mullvadvpn-c6dbac4ce8db20d6e93309ee378c7c4ba7bab51d.zip | |
Merge branch 'child-proc-with-duct'
| -rw-r--r-- | Cargo.lock | 78 | ||||
| -rw-r--r-- | talpid_cli/src/main.rs | 50 | ||||
| -rw-r--r-- | talpid_core/Cargo.toml | 2 | ||||
| -rw-r--r-- | talpid_core/src/lib.rs | 2 | ||||
| -rw-r--r-- | talpid_core/src/process/mod.rs | 26 | ||||
| -rw-r--r-- | talpid_core/src/process/monitor.rs | 291 | ||||
| -rw-r--r-- | talpid_core/src/process/openvpn.rs | 144 |
7 files changed, 198 insertions, 395 deletions
diff --git a/Cargo.lock b/Cargo.lock index 28f04d7f0b..ee987cf14f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,11 @@ dependencies = [ [[package]] name = "bitflags" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bitflags" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -108,14 +113,6 @@ dependencies = [ ] [[package]] -name = "clonablechild" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "dbghelp-sys" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -130,6 +127,17 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "duct" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "os_pipe 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "env_logger" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -145,6 +153,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "error-chain" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "error-chain" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ @@ -276,6 +292,11 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "lazycell" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "libc" version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -371,6 +392,17 @@ dependencies = [ ] [[package]] +name = "nix" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "num-traits" version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -394,6 +426,16 @@ dependencies = [ ] [[package]] +name = "os_pipe" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "pkg-config" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -491,6 +533,16 @@ dependencies = [ ] [[package]] +name = "shared_child" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "slab" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -544,7 +596,7 @@ name = "talpid_core" version = "0.0.0" dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "clonablechild 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "openvpn_ffi 0.1.0", @@ -773,17 +825,19 @@ dependencies = [ "checksum backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f551bc2ddd53aea015d453ef0b635af89444afa5ed2405dd0b2062ad5d600d80" "checksum backtrace-sys 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d192fd129132fbc97497c1f2ec2c2c5174e376b95f535199ef4fe0a293d33842" "checksum base64 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9892882c3bd89ed02dec391c128984c772b663a29700c32b5de0b33861cdf2bd" +"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" "checksum bitflags 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1370e9fc2a6ae53aea8b7a5110edbd08836ed87c88736dfabccade1c2b44bff4" "checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8" "checksum bytes 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3941933da81d8717b427c2ddc2d73567cd15adb6c57514a2726d9ee598a5439a" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" "checksum clap 2.23.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf1114886d7cde2d6448517161d7db8d681a9a1c09f7d210f0b0864e48195f6" -"checksum clonablechild 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4a4946a850c1e921fbdd9a1f92bf1298c41a301c0f6e9bacbabf95ea7d6d0225" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" "checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" +"checksum duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e45aa15fe0a8a8f511e6d834626afd55e49b62e5c8802e18328a87e8a8f6065c" "checksum env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e3856f1697098606fc6cb97a93de88ca3f3bc35bb878c725920e6e82ecf05e83" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" "checksum error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "318cb3c71ee4cdea69fdc9e15c173b245ed6063e1709029e8fd32525a881120f" +"checksum error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6930e04918388a9a2e41d518c25cf679ccafe26733fb4127dbf21993f2575d46" "checksum futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "55f0008e13fc853f79ea8fc86e931486860d4c4c156cdffb59fa5f7fa833660a" "checksum futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a283c84501e92cade5ea673a2a7ca44f71f209ccdd302a3e0896f50083d2c5ff" "checksum gcc 0.3.45 (registry+https://github.com/rust-lang/crates.io-index)" = "40899336fb50db0c78710f53e87afc54d8c7266fb76262fecc78ca1a7f09deae" @@ -798,6 +852,7 @@ dependencies = [ "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" +"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" "checksum libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "88ee81885f9f04bff991e306fea7c1c60a5f0f9e409e99f6b40e3311a3363135" "checksum log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5141eca02775a762cc6cd564d8d2c50f67c0ea3a372cbf1c51592b3e029e10ad" "checksum matches 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "efd7622e3022e1a6eaa602c4cea8912254e5582c9c692e9167714182244801b1" @@ -807,8 +862,10 @@ dependencies = [ "checksum mio 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f27d38f824a0d267d55b29b171e9e99269a53812e385fa75c1fe700ae254a6a4" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum net2 0.2.27 (registry+https://github.com/rust-lang/crates.io-index)" = "18b9642ad6222faf5ce46f6966f59b71b9775ad5758c9e09fcf0a6c8061972b4" +"checksum nix 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "47e49f6982987135c5e9620ab317623e723bd06738fd85377e8d55f57c8b6487" "checksum num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "e1cbfa3781f3fe73dc05321bed52a06d2d491eaa764c52335cf4399f046ece99" "checksum num_cpus 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a18c392466409c50b87369414a2680c93e739aedeb498eb2bff7d7eb569744e2" +"checksum os_pipe 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "998bfbb3042e715190fe2a41abfa047d7e8cb81374d2977d7f100eacd8619cb1" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d" @@ -823,6 +880,7 @@ dependencies = [ "checksum serde_codegen_internals 0.14.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bc888bd283bd2420b16ad0d860e35ad8acb21941180a83a189bb2046f9d00400" "checksum serde_derive 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)" = "d75c72ef4dd193d89eb652b73890fe2489996c9ead8b37980f57a1078f96ed50" "checksum serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ad8bcf487be7d2e15d3d543f04312de991d631cfe1b43ea0ade69e6a8a5b16a1" +"checksum shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "099b38928dbe4a0a01fcd8c233183072f14a7d126a34bed05880869be66e14cc" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs index 0a857e05d3..cc236677fc 100644 --- a/talpid_cli/src/main.rs +++ b/talpid_cli/src/main.rs @@ -9,12 +9,10 @@ extern crate error_chain; extern crate log; extern crate env_logger; -use std::io::{self, Read, Write}; -use std::result::Result as StdResult; +use std::path::Path; use std::sync::mpsc::{self, Receiver}; -use std::thread; -use talpid_core::process::openvpn::{self, OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor}; +use talpid_core::process::openvpn::{OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor}; mod cli; @@ -29,8 +27,7 @@ fn run() -> Result<()> { init_logger()?; let args = cli::parse_args_or_exit(); let command = create_openvpn_command(&args); - let monitor = OpenVpnMonitor::new(command, args.plugin_path); - main_loop(monitor) + main_loop(command, args.plugin_path.as_path()) } pub fn init_logger() -> Result<()> { @@ -42,19 +39,20 @@ fn create_openvpn_command(args: &Args) -> OpenVpnCommand { command .config(&args.config) .remotes(&args.remotes[..]) - .unwrap() - .pipe_output(args.verbosity > 0); - + .unwrap(); command } -fn main_loop(mut monitor: OpenVpnMonitor) -> Result<()> { +fn main_loop(command: OpenVpnCommand, plugin_path: &Path) -> Result<()> { loop { - let rx = start_monitor(&mut monitor).chain_err(|| "Unable to start OpenVPN")?; + let (_monitor, rx) = start_monitor(command.clone(), plugin_path)?; while let Ok(msg) = rx.recv() { match msg { - OpenVpnEvent::Shutdown(clean) => { - println!("Monitored process exited. clean: {}", clean); + OpenVpnEvent::Shutdown(result) => { + println!( + "Monitored process exited. clean: {}", + result.map(|s| s.success()).unwrap_or(false) + ); break; } OpenVpnEvent::PluginEvent(Ok((event, env))) => { @@ -67,24 +65,12 @@ fn main_loop(mut monitor: OpenVpnMonitor) -> Result<()> { } } -fn start_monitor(monitor: &mut OpenVpnMonitor) - -> StdResult<Receiver<OpenVpnEvent>, openvpn::Error> { +fn start_monitor(command: OpenVpnCommand, + plugin_path: &Path) + -> Result<(OpenVpnMonitor, Receiver<OpenVpnEvent>)> { let (tx, rx) = mpsc::channel(); - let callback = move |clean| tx.send(clean).unwrap(); - monitor - .start(callback) - .map( - |(stdout, stderr)| { - stdout.map(|stream| pass_io(stream, io::stdout())); - stderr.map(|stream| pass_io(stream, io::stderr())); - rx - }, - ) -} - -fn pass_io<I, O>(mut input: I, mut output: O) - where I: Read + Send + 'static, - O: Write + Send + 'static -{ - thread::spawn(move || { io::copy(&mut input, &mut output).unwrap(); }); + let listener = move |event: OpenVpnEvent| tx.send(event).unwrap(); + OpenVpnMonitor::start(command, plugin_path, listener) + .map(|m| (m, rx)) + .chain_err(|| "Unable to start OpenVPN") } diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml index efaf6838af..c098d204d9 100644 --- a/talpid_core/Cargo.toml +++ b/talpid_core/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Linus Färnstrand <linus@mullvad.net>", "Erik Larkö <erik@mullvad.n description = "Core backend functionality of the Mullvad VPN client" [dependencies] -clonablechild = "0.1" +duct = "0.8" error-chain = "0.10" log = "0.3" diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs index 8fe76a5a69..1a10532787 100644 --- a/talpid_core/src/lib.rs +++ b/talpid_core/src/lib.rs @@ -6,7 +6,7 @@ #[macro_use] extern crate assert_matches; -extern crate clonablechild; +extern crate duct; #[macro_use] extern crate log; diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs index 3259f90524..a7d76ea9f4 100644 --- a/talpid_core/src/process/mod.rs +++ b/talpid_core/src/process/mod.rs @@ -1,31 +1,5 @@ -use std::io; - -use std::process::{ChildStderr, ChildStdout}; - /// A module for monitoring child processes and get notified of events on them. pub mod monitor; -use self::monitor::MonitoredChild; /// A module for all OpenVPN related process management. pub mod openvpn; - -use clonablechild::ClonableChild; - - -impl MonitoredChild for ClonableChild { - fn wait(&self) -> io::Result<bool> { - ClonableChild::wait(self).map(|exit_status| exit_status.success()) - } - - fn kill(&self) -> io::Result<()> { - ClonableChild::kill(self) - } - - fn stdout(&mut self) -> Option<ChildStdout> { - self.stdout() - } - - fn stderr(&mut self) -> Option<ChildStderr> { - self.stderr() - } -} diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs index c4e8b402d4..bab2876e05 100644 --- a/talpid_core/src/process/monitor.rs +++ b/talpid_core/src/process/monitor.rs @@ -1,144 +1,50 @@ +use duct; + use std::io; -use std::process::{ChildStderr, ChildStdout}; -use std::sync::{Arc, Mutex}; +use std::process; +use std::sync::Arc; use std::thread; - -error_chain! { - errors { - /// The transition could not be made because the state machine was not in a state that - /// could transition to the desired state. - InvalidState { - description("Invalid state for desired transition") - } - /// Error representing a failure in spawning the child process - Spawn { - description("Unable to spawn child process") - } - /// Error representing a failure in sending a kill signal to the child process - Kill { - description("Unable to send kill signal to process") - } - } -} - -/// Trait for objects that represent child processes that `ChildMonitor` can monitor -pub trait MonitoredChild: Clone + Send + 'static { - /// Waits for the child to exit completely, returning if the child exited cleanly or not. - fn wait(&self) -> io::Result<bool>; - - /// Forces the child to exit. - fn kill(&self) -> io::Result<()>; - - /// Retreives the stdout stream for the child. - fn stdout(&mut self) -> Option<ChildStdout>; - - /// Retreives the stderr stream for the child. - fn stderr(&mut self) -> Option<ChildStderr>; -} - -/// Trait for objects that can spawn any type of child process object implementing `MonitoredChild`. -pub trait ChildSpawner: Send + 'static { - /// The type of child being spawned. - type Child: MonitoredChild; - - /// Spawns the child process, returning a handle to it on success. - fn spawn(&mut self) -> io::Result<Self::Child>; +/// A child process monitor. Takes care of starting and monitoring a child process and calls the +/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out +/// of scope it will kill the child and wait for it to exit properly. +pub struct ChildMonitor { + child: Arc<duct::Handle>, + thread: Option<thread::JoinHandle<()>>, } - -enum State<C: MonitoredChild> { - Stopped, - Running(RunningState<C>), -} - -struct RunningState<C: MonitoredChild> { - child: C, - thread_handle: Option<thread::JoinHandle<()>>, -} - -/// A child process monitor. Takes care of starting and monitoring a child process and runs the -/// listener on child exit. -pub struct ChildMonitor<S: ChildSpawner> { - spawner: S, - state: Arc<Mutex<State<S::Child>>>, -} - -impl<S: ChildSpawner> ChildMonitor<S> { - /// Creates a new `ChildMonitor` that spawns processes with the given `spawner`. The new - /// `ChildMonitor` will be in the stopped state and not start any process until you call - /// `start()`. - pub fn new(spawner: S) -> Self { - ChildMonitor { - spawner: spawner, - state: Arc::new(Mutex::new(State::Stopped)), - } - } - - /// Starts the child process and begins to monitor it. `listener` will be called as soon as the +impl ChildMonitor { + /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the /// child process exits. - pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)> - where L: FnMut(bool) + Send + 'static + pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self> + where L: FnMut(io::Result<&process::Output>) + Send + 'static { - let mut state_lock = self.state.lock().unwrap(); - if let State::Stopped = *state_lock { - let mut child = self.spawner.spawn().chain_err(|| ErrorKind::Spawn)?; - let io = (child.stdout(), child.stderr()); - let thread_handle = self.spawn_monitor(child.clone(), listener); - *state_lock = State::Running( - RunningState { - child: child, - thread_handle: Some(thread_handle), - }, - ); - Ok(io) - } else { - bail!(ErrorKind::InvalidState); - } + let child = Arc::new(expression.start()?); + let child_clone = child.clone(); + let thread = Some(thread::spawn(move || on_exit(child_clone.wait()))); + Ok(ChildMonitor { child, thread }) } - fn spawn_monitor<L>(&self, child: S::Child, mut listener: L) -> thread::JoinHandle<()> - where L: FnMut(bool) + Send + 'static - { - let state_mutex = self.state.clone(); - thread::spawn( - move || { - let success = child.wait().unwrap_or(false); - { - let mut state_lock = state_mutex.lock().unwrap(); - *state_lock = State::Stopped; - } - listener(success); - }, - ) + /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is + /// guaranteed to fire before this method returns. + pub fn wait(&mut self) -> io::Result<&process::Output> { + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + self.child.wait() } - /// Sends a kill signal to the child process. - pub fn stop(&self) -> Result<()> { - let state_lock = self.state.lock().unwrap(); - if let State::Running(ref running_state) = *state_lock { - running_state.child.kill().chain_err(|| ErrorKind::Kill)?; - Ok(()) - } else { - bail!(ErrorKind::InvalidState); - } + /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor + /// will wait for the process for you. + pub fn kill(&self) -> io::Result<()> { + self.child.kill() } } -impl<S: ChildSpawner> Drop for ChildMonitor<S> { +impl Drop for ChildMonitor { fn drop(&mut self) { - let thread_handle = { - let mut state_lock = self.state.lock().unwrap(); - if let State::Running(ref mut state) = *state_lock { - let _ = state.child.kill(); - state.thread_handle.take() - } else { - None - } - }; - if let Some(thread_handle) = thread_handle { - let _ = thread_handle.join(); - } + let _ = self.kill(); + let _ = self.wait(); } } @@ -146,77 +52,11 @@ impl<S: ChildSpawner> Drop for ChildMonitor<S> { #[cfg(test)] mod child_monitor_tests { use super::*; - use std::io; - use std::process::{ChildStderr, ChildStdout}; - use std::sync::{Arc, Mutex}; + use duct::{Expression, cmd}; + use std::sync::mpsc; - use std::thread; use std::time::Duration; - #[derive(Clone)] - struct MockChild { - died: Arc<Mutex<bool>>, - } - - impl MockChild { - pub fn instant_exit() -> Self { - Self::new(true) - } - - pub fn alive_until_kill() -> Self { - Self::new(false) - } - - fn new(died: bool) -> Self { - MockChild { died: Arc::new(Mutex::new(died)) } - } - } - - impl MonitoredChild for MockChild { - fn wait(&self) -> io::Result<bool> { - loop { - if *self.died.lock().unwrap() { - break; - } - thread::sleep(Duration::new(0, 1_000_000)); - } - Ok(true) - } - - fn kill(&self) -> io::Result<()> { - *self.died.lock().unwrap() = true; - Ok(()) - } - - fn stdout(&mut self) -> Option<ChildStdout> { - None - } - - fn stderr(&mut self) -> Option<ChildStderr> { - None - } - } - - struct MockChildSpawner { - spawn_result: Option<MockChild>, - } - - impl MockChildSpawner { - pub fn new(spawn_result: Option<MockChild>) -> Self { - MockChildSpawner { spawn_result: spawn_result } - } - } - - impl ChildSpawner for MockChildSpawner { - type Child = MockChild; - - fn spawn(&mut self) -> io::Result<MockChild> { - self.spawn_result - .clone() - .ok_or(io::Error::new(io::ErrorKind::Other, "Mocking a failed process spawn"),) - } - } - /// Tries to recv a message from the given `$rx` for one second and tries to match it with the /// given expected value, `$expected` macro_rules! assert_event { @@ -226,46 +66,55 @@ mod child_monitor_tests { }} } - #[test] - fn normal_start() { - let spawner = MockChildSpawner::new(Some(MockChild::instant_exit())); - let mut testee = ChildMonitor::new(spawner); + fn echo_cmd(s: &str) -> Expression { + cmd("echo", &[s]).stdout_capture().unchecked() + } - let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); - assert_event!(rx, Ok(true)); + fn invalid_cmd() -> Expression { + cmd("this command does not exist", &[""]).unchecked() } - #[test] - fn start_failed() { - let spawner = MockChildSpawner::new(None); - let mut testee = ChildMonitor::new(spawner); + fn sleep_cmd(secs: u32) -> Expression { + if cfg!(windows) { + cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked() + } else { + cmd("sleep", &[secs.to_string()]).unchecked() + } + } + fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) { let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_err()); - // Make sure that the listener is not kept anywhere. Failing to start should drop the - // listener - assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected)); + let child = + ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap()) + .expect("Unable to start process"); + (child, rx) } #[test] - fn normal_stop() { - let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill())); - let mut testee = ChildMonitor::new(spawner); + fn echo() { + let (mut child, rx) = spawn(&echo_cmd("foobar")); + let wait_output = child.wait().unwrap(); + let callback_output = rx.try_recv().unwrap().unwrap(); - let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); - assert_event!(rx, Err(mpsc::RecvTimeoutError::Timeout)); + assert!(callback_output.status.success()); + assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]); + assert_eq!(wait_output.status, callback_output.status); + assert_eq!(wait_output.stdout, callback_output.stdout); + } - assert!(testee.stop().is_ok()); - assert_event!(rx, Ok(true)); + #[test] + fn invalid_command() { + assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err()); } #[test] - fn stop_without_start() { - let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill())); - let testee = ChildMonitor::new(spawner); + fn callback_after_kill() { + let (child, rx) = spawn(&sleep_cmd(100000)); + // Make sure on_exit is not triggered within the first second. It should not be called + // until we kill the process. + assert!(rx.recv_timeout(Duration::from_secs(1)).is_err()); - assert_matches!(testee.stop(), Err(Error(ErrorKind::InvalidState, _))); + child.kill().unwrap(); + assert!(!rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap().status.success()); } } diff --git a/talpid_core/src/process/openvpn.rs b/talpid_core/src/process/openvpn.rs index 231898c6f8..d565b085a8 100644 --- a/talpid_core/src/process/openvpn.rs +++ b/talpid_core/src/process/openvpn.rs @@ -1,8 +1,8 @@ extern crate openvpn_ffi; -use super::monitor::{ChildMonitor, ChildSpawner}; +use super::monitor::ChildMonitor; -use clonablechild::{ChildExt, ClonableChild}; +use duct; use net::{RemoteAddr, ToRemoteAddrs}; @@ -12,19 +12,21 @@ use std::fmt; use std::io; use std::ops::DerefMut; use std::path::{Path, PathBuf}; -use std::process::{Child, ChildStderr, ChildStdout, Command, Stdio}; +use std::process; use std::sync::{Arc, Mutex}; use talpid_ipc; error_chain!{ errors { - /// Error while communicating with the OpenVPN plugin - PluginCommunicationError - } - links { - ChildMonitorError(::process::monitor::Error, ::process::monitor::ErrorKind) - #[doc="Something went wrong in the underlying ChildMonitor"]; + /// Error while communicating with the OpenVPN plugin. + PluginCommunicationError { + description("Error while communicating with the OpenVPN plugin") + } + /// Error while trying to spawn OpenVPN process. + ChildSpawnError { + description("Error while trying to spawn OpenVPN process") + } } } @@ -36,7 +38,6 @@ pub struct OpenVpnCommand { config: Option<PathBuf>, remotes: Vec<RemoteAddr>, plugin: Option<(PathBuf, Vec<String>)>, - pipe_output: bool, } impl OpenVpnCommand { @@ -48,7 +49,6 @@ impl OpenVpnCommand { config: None, remotes: vec![], plugin: None, - pipe_output: true, } } @@ -71,38 +71,10 @@ impl OpenVpnCommand { self } - /// If piping the standard streams, stdout and stderr will be available to the parent process. - /// This is the default behavior. If you want the equivalence of attaching the child streams to - /// /dev/null, invoke this method with false. - pub fn pipe_output(&mut self, pipe_output: bool) -> &mut Self { - self.pipe_output = pipe_output; - self - } - - /// Executes the OpenVPN process as a child process, returning a handle to it. - pub fn spawn(&self) -> io::Result<Child> { - let mut command = self.create_command(); - let args = self.get_arguments(); - command.args(&args); - debug!("Spawning: {}", &self); - command.spawn() - } - - fn create_command(&self) -> Command { - let mut command = Command::new(&self.openvpn_bin); - command - .stdin(Stdio::null()) - .stdout(self.get_output_pipe_policy()) - .stderr(self.get_output_pipe_policy()); - command - } - - fn get_output_pipe_policy(&self) -> Stdio { - if self.pipe_output { - Stdio::piped() - } else { - Stdio::null() - } + /// Build a runnable expression from the current state of the command. + pub fn build(&self) -> duct::Expression { + debug!("Building expression: {}", &self); + duct::cmd(&self.openvpn_bin, self.get_arguments()) } /// Returns all arguments that the subprocess would be spawned with. @@ -152,81 +124,60 @@ fn write_argument(fmt: &mut fmt::Formatter, arg: &str) -> fmt::Result { } -impl ChildSpawner for OpenVpnCommand { - type Child = ClonableChild; - - fn spawn(&mut self) -> io::Result<ClonableChild> { - OpenVpnCommand::spawn(self).map(|child| child.into_clonable()) - } -} - - /// Possible events from OpenVPN pub enum OpenVpnEvent { /// An event from the plugin loaded into OpenVPN. - PluginEvent(Result<(openvpn_ffi::OpenVpnPluginEvent, HashMap<String, String>)>), - /// The OpenVPN process exited. The bool indicates if the process exited cleanly. - Shutdown(bool), + PluginEvent(talpid_ipc::Result<(openvpn_ffi::OpenVpnPluginEvent, HashMap<String, String>)>), + /// The OpenVPN process exited. Containing the result of waiting for the process. + Shutdown(io::Result<process::ExitStatus>), } /// A struct able to start and monitor OpenVPN processes. pub struct OpenVpnMonitor { - command: OpenVpnCommand, - plugin_path: PathBuf, - monitor: ChildMonitor<OpenVpnCommand>, + child: ChildMonitor, } impl OpenVpnMonitor { - /// Creates a new `OpenVpnMonitor` based on the given command - pub fn new<P: AsRef<Path>>(command: OpenVpnCommand, plugin_path: P) -> Self { - OpenVpnMonitor { - command: command.clone(), - plugin_path: plugin_path.as_ref().to_path_buf(), - monitor: ChildMonitor::new(command), - } - } - - /// Starts OpenVPN and begins to monitor it. - pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)> - where L: FnMut(OpenVpnEvent) + Send + 'static + /// Spawns a new OpenVPN process and monitors it for exit and events. + pub fn start<P, L>(mut cmd: OpenVpnCommand, plugin_path: P, listener: L) -> Result<Self> + where P: AsRef<Path>, + L: FnMut(OpenVpnEvent) + Send + 'static { let shared_listener = Arc::new(Mutex::new(listener)); - self.start_plugin_listener(shared_listener.clone())?; - self.start_child_monitor(shared_listener) + let server_id = Self::start_plugin_listener(shared_listener.clone())?; + cmd.plugin(plugin_path, vec![server_id]); + let child = Self::start_child_monitor(&cmd, shared_listener)?; + Ok(OpenVpnMonitor { child }) } - fn start_plugin_listener<L>(&mut self, shared_listener: Arc<Mutex<L>>) -> Result<()> + fn start_plugin_listener<L>(shared_listener: Arc<Mutex<L>>) -> Result<String> where L: FnMut(OpenVpnEvent) + Send + 'static { - let server_id = talpid_ipc::start_new_server( + talpid_ipc::start_new_server( move |msg| { - let chained_msg = msg.chain_err(|| ErrorKind::PluginCommunicationError); let mut listener = shared_listener.lock().unwrap(); - (listener.deref_mut())(OpenVpnEvent::PluginEvent(chained_msg)); + (listener.deref_mut())(OpenVpnEvent::PluginEvent(msg)); }, ) - .chain_err(|| ErrorKind::PluginCommunicationError)?; - self.command.plugin(&self.plugin_path, vec![server_id]); - Ok(()) + .chain_err(|| ErrorKind::PluginCommunicationError) } - fn start_child_monitor<L>(&mut self, + fn start_child_monitor<L>(cmd: &OpenVpnCommand, shared_listener: Arc<Mutex<L>>) - -> Result<(Option<ChildStdout>, Option<ChildStderr>)> + -> Result<ChildMonitor> where L: FnMut(OpenVpnEvent) + Send + 'static { - let callback = move |clean_exit| { + let on_exit = move |result: io::Result<&process::Output>| { + let status = result.map(|out: &process::Output| out.status.clone()); let mut listener = shared_listener.lock().unwrap(); - (listener.deref_mut())(OpenVpnEvent::Shutdown(clean_exit)); + (listener.deref_mut())(OpenVpnEvent::Shutdown(status)); }; - - self.monitor = ChildMonitor::new(self.command.clone()); - Ok(self.monitor.start(callback)?) + ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildSpawnError) } - /// Forwards a stop call to the underlying `ChildMonitor`. - pub fn stop(&self) -> Result<()> { - Ok(self.monitor.stop()?) + /// Send a kill signal to the OpenVPN process. + pub fn kill(&self) -> io::Result<()> { + self.child.kill() } } @@ -300,18 +251,3 @@ mod openvpn_command_tests { assert!(testee_args.contains(&OsString::from("cde"))); } } - - -#[cfg(test)] -mod openvpn_monitor_tests { - use super::*; - - #[test] - fn stop_without_start() { - let command = OpenVpnCommand::new(""); - let testee = OpenVpnMonitor::new(command, ""); - - use super::super::monitor::ErrorKind::InvalidState as MInvalidState; - assert_matches!(testee.stop(), Err(Error(ErrorKind::ChildMonitorError(MInvalidState), _))); - } -} |
