summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-05-05 13:35:24 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-05-05 13:35:24 +0200
commitc6dbac4ce8db20d6e93309ee378c7c4ba7bab51d (patch)
treee9514734cc2e8dc6b574f02d5246c805945e0c49
parent835a778d2b879f30070c2db726261704782f0011 (diff)
parentcb03c311aefbb705b93eebe80456aa8a906690ff (diff)
downloadmullvadvpn-c6dbac4ce8db20d6e93309ee378c7c4ba7bab51d.tar.xz
mullvadvpn-c6dbac4ce8db20d6e93309ee378c7c4ba7bab51d.zip
Merge branch 'child-proc-with-duct'
-rw-r--r--Cargo.lock78
-rw-r--r--talpid_cli/src/main.rs50
-rw-r--r--talpid_core/Cargo.toml2
-rw-r--r--talpid_core/src/lib.rs2
-rw-r--r--talpid_core/src/process/mod.rs26
-rw-r--r--talpid_core/src/process/monitor.rs291
-rw-r--r--talpid_core/src/process/openvpn.rs144
7 files changed, 198 insertions, 395 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 28f04d7f0b..ee987cf14f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -70,6 +70,11 @@ dependencies = [
[[package]]
name = "bitflags"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "bitflags"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -108,14 +113,6 @@ dependencies = [
]
[[package]]
-name = "clonablechild"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "dbghelp-sys"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -130,6 +127,17 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "duct"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "os_pipe 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "env_logger"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -145,6 +153,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "error-chain"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "error-chain"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
@@ -276,6 +292,11 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "lazycell"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "libc"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -371,6 +392,17 @@ dependencies = [
]
[[package]]
+name = "nix"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
+ "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "num-traits"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -394,6 +426,16 @@ dependencies = [
]
[[package]]
+name = "os_pipe"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "nix 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "pkg-config"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -491,6 +533,16 @@ dependencies = [
]
[[package]]
+name = "shared_child"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "slab"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -544,7 +596,7 @@ name = "talpid_core"
version = "0.0.0"
dependencies = [
"assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "clonablechild 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"openvpn_ffi 0.1.0",
@@ -773,17 +825,19 @@ dependencies = [
"checksum backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f551bc2ddd53aea015d453ef0b635af89444afa5ed2405dd0b2062ad5d600d80"
"checksum backtrace-sys 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d192fd129132fbc97497c1f2ec2c2c5174e376b95f535199ef4fe0a293d33842"
"checksum base64 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9892882c3bd89ed02dec391c128984c772b663a29700c32b5de0b33861cdf2bd"
+"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum bitflags 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1370e9fc2a6ae53aea8b7a5110edbd08836ed87c88736dfabccade1c2b44bff4"
"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8"
"checksum bytes 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3941933da81d8717b427c2ddc2d73567cd15adb6c57514a2726d9ee598a5439a"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum clap 2.23.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf1114886d7cde2d6448517161d7db8d681a9a1c09f7d210f0b0864e48195f6"
-"checksum clonablechild 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4a4946a850c1e921fbdd9a1f92bf1298c41a301c0f6e9bacbabf95ea7d6d0225"
"checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850"
"checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90"
+"checksum duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e45aa15fe0a8a8f511e6d834626afd55e49b62e5c8802e18328a87e8a8f6065c"
"checksum env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e3856f1697098606fc6cb97a93de88ca3f3bc35bb878c725920e6e82ecf05e83"
"checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8"
"checksum error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "318cb3c71ee4cdea69fdc9e15c173b245ed6063e1709029e8fd32525a881120f"
+"checksum error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6930e04918388a9a2e41d518c25cf679ccafe26733fb4127dbf21993f2575d46"
"checksum futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "55f0008e13fc853f79ea8fc86e931486860d4c4c156cdffb59fa5f7fa833660a"
"checksum futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a283c84501e92cade5ea673a2a7ca44f71f209ccdd302a3e0896f50083d2c5ff"
"checksum gcc 0.3.45 (registry+https://github.com/rust-lang/crates.io-index)" = "40899336fb50db0c78710f53e87afc54d8c7266fb76262fecc78ca1a7f09deae"
@@ -798,6 +852,7 @@ dependencies = [
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
+"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b"
"checksum libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "88ee81885f9f04bff991e306fea7c1c60a5f0f9e409e99f6b40e3311a3363135"
"checksum log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5141eca02775a762cc6cd564d8d2c50f67c0ea3a372cbf1c51592b3e029e10ad"
"checksum matches 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "efd7622e3022e1a6eaa602c4cea8912254e5582c9c692e9167714182244801b1"
@@ -807,8 +862,10 @@ dependencies = [
"checksum mio 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f27d38f824a0d267d55b29b171e9e99269a53812e385fa75c1fe700ae254a6a4"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum net2 0.2.27 (registry+https://github.com/rust-lang/crates.io-index)" = "18b9642ad6222faf5ce46f6966f59b71b9775ad5758c9e09fcf0a6c8061972b4"
+"checksum nix 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "47e49f6982987135c5e9620ab317623e723bd06738fd85377e8d55f57c8b6487"
"checksum num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "e1cbfa3781f3fe73dc05321bed52a06d2d491eaa764c52335cf4399f046ece99"
"checksum num_cpus 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a18c392466409c50b87369414a2680c93e739aedeb498eb2bff7d7eb569744e2"
+"checksum os_pipe 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "998bfbb3042e715190fe2a41abfa047d7e8cb81374d2977d7f100eacd8619cb1"
"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903"
"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
"checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d"
@@ -823,6 +880,7 @@ dependencies = [
"checksum serde_codegen_internals 0.14.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bc888bd283bd2420b16ad0d860e35ad8acb21941180a83a189bb2046f9d00400"
"checksum serde_derive 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)" = "d75c72ef4dd193d89eb652b73890fe2489996c9ead8b37980f57a1078f96ed50"
"checksum serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ad8bcf487be7d2e15d3d543f04312de991d631cfe1b43ea0ade69e6a8a5b16a1"
+"checksum shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "099b38928dbe4a0a01fcd8c233183072f14a7d126a34bed05880869be66e14cc"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013"
"checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694"
diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs
index 0a857e05d3..cc236677fc 100644
--- a/talpid_cli/src/main.rs
+++ b/talpid_cli/src/main.rs
@@ -9,12 +9,10 @@ extern crate error_chain;
extern crate log;
extern crate env_logger;
-use std::io::{self, Read, Write};
-use std::result::Result as StdResult;
+use std::path::Path;
use std::sync::mpsc::{self, Receiver};
-use std::thread;
-use talpid_core::process::openvpn::{self, OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor};
+use talpid_core::process::openvpn::{OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor};
mod cli;
@@ -29,8 +27,7 @@ fn run() -> Result<()> {
init_logger()?;
let args = cli::parse_args_or_exit();
let command = create_openvpn_command(&args);
- let monitor = OpenVpnMonitor::new(command, args.plugin_path);
- main_loop(monitor)
+ main_loop(command, args.plugin_path.as_path())
}
pub fn init_logger() -> Result<()> {
@@ -42,19 +39,20 @@ fn create_openvpn_command(args: &Args) -> OpenVpnCommand {
command
.config(&args.config)
.remotes(&args.remotes[..])
- .unwrap()
- .pipe_output(args.verbosity > 0);
-
+ .unwrap();
command
}
-fn main_loop(mut monitor: OpenVpnMonitor) -> Result<()> {
+fn main_loop(command: OpenVpnCommand, plugin_path: &Path) -> Result<()> {
loop {
- let rx = start_monitor(&mut monitor).chain_err(|| "Unable to start OpenVPN")?;
+ let (_monitor, rx) = start_monitor(command.clone(), plugin_path)?;
while let Ok(msg) = rx.recv() {
match msg {
- OpenVpnEvent::Shutdown(clean) => {
- println!("Monitored process exited. clean: {}", clean);
+ OpenVpnEvent::Shutdown(result) => {
+ println!(
+ "Monitored process exited. clean: {}",
+ result.map(|s| s.success()).unwrap_or(false)
+ );
break;
}
OpenVpnEvent::PluginEvent(Ok((event, env))) => {
@@ -67,24 +65,12 @@ fn main_loop(mut monitor: OpenVpnMonitor) -> Result<()> {
}
}
-fn start_monitor(monitor: &mut OpenVpnMonitor)
- -> StdResult<Receiver<OpenVpnEvent>, openvpn::Error> {
+fn start_monitor(command: OpenVpnCommand,
+ plugin_path: &Path)
+ -> Result<(OpenVpnMonitor, Receiver<OpenVpnEvent>)> {
let (tx, rx) = mpsc::channel();
- let callback = move |clean| tx.send(clean).unwrap();
- monitor
- .start(callback)
- .map(
- |(stdout, stderr)| {
- stdout.map(|stream| pass_io(stream, io::stdout()));
- stderr.map(|stream| pass_io(stream, io::stderr()));
- rx
- },
- )
-}
-
-fn pass_io<I, O>(mut input: I, mut output: O)
- where I: Read + Send + 'static,
- O: Write + Send + 'static
-{
- thread::spawn(move || { io::copy(&mut input, &mut output).unwrap(); });
+ let listener = move |event: OpenVpnEvent| tx.send(event).unwrap();
+ OpenVpnMonitor::start(command, plugin_path, listener)
+ .map(|m| (m, rx))
+ .chain_err(|| "Unable to start OpenVPN")
}
diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml
index efaf6838af..c098d204d9 100644
--- a/talpid_core/Cargo.toml
+++ b/talpid_core/Cargo.toml
@@ -5,7 +5,7 @@ authors = ["Linus Färnstrand <linus@mullvad.net>", "Erik Larkö <erik@mullvad.n
description = "Core backend functionality of the Mullvad VPN client"
[dependencies]
-clonablechild = "0.1"
+duct = "0.8"
error-chain = "0.10"
log = "0.3"
diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs
index 8fe76a5a69..1a10532787 100644
--- a/talpid_core/src/lib.rs
+++ b/talpid_core/src/lib.rs
@@ -6,7 +6,7 @@
#[macro_use]
extern crate assert_matches;
-extern crate clonablechild;
+extern crate duct;
#[macro_use]
extern crate log;
diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs
index 3259f90524..a7d76ea9f4 100644
--- a/talpid_core/src/process/mod.rs
+++ b/talpid_core/src/process/mod.rs
@@ -1,31 +1,5 @@
-use std::io;
-
-use std::process::{ChildStderr, ChildStdout};
-
/// A module for monitoring child processes and get notified of events on them.
pub mod monitor;
-use self::monitor::MonitoredChild;
/// A module for all OpenVPN related process management.
pub mod openvpn;
-
-use clonablechild::ClonableChild;
-
-
-impl MonitoredChild for ClonableChild {
- fn wait(&self) -> io::Result<bool> {
- ClonableChild::wait(self).map(|exit_status| exit_status.success())
- }
-
- fn kill(&self) -> io::Result<()> {
- ClonableChild::kill(self)
- }
-
- fn stdout(&mut self) -> Option<ChildStdout> {
- self.stdout()
- }
-
- fn stderr(&mut self) -> Option<ChildStderr> {
- self.stderr()
- }
-}
diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs
index c4e8b402d4..bab2876e05 100644
--- a/talpid_core/src/process/monitor.rs
+++ b/talpid_core/src/process/monitor.rs
@@ -1,144 +1,50 @@
+use duct;
+
use std::io;
-use std::process::{ChildStderr, ChildStdout};
-use std::sync::{Arc, Mutex};
+use std::process;
+use std::sync::Arc;
use std::thread;
-
-error_chain! {
- errors {
- /// The transition could not be made because the state machine was not in a state that
- /// could transition to the desired state.
- InvalidState {
- description("Invalid state for desired transition")
- }
- /// Error representing a failure in spawning the child process
- Spawn {
- description("Unable to spawn child process")
- }
- /// Error representing a failure in sending a kill signal to the child process
- Kill {
- description("Unable to send kill signal to process")
- }
- }
-}
-
-/// Trait for objects that represent child processes that `ChildMonitor` can monitor
-pub trait MonitoredChild: Clone + Send + 'static {
- /// Waits for the child to exit completely, returning if the child exited cleanly or not.
- fn wait(&self) -> io::Result<bool>;
-
- /// Forces the child to exit.
- fn kill(&self) -> io::Result<()>;
-
- /// Retreives the stdout stream for the child.
- fn stdout(&mut self) -> Option<ChildStdout>;
-
- /// Retreives the stderr stream for the child.
- fn stderr(&mut self) -> Option<ChildStderr>;
-}
-
-/// Trait for objects that can spawn any type of child process object implementing `MonitoredChild`.
-pub trait ChildSpawner: Send + 'static {
- /// The type of child being spawned.
- type Child: MonitoredChild;
-
- /// Spawns the child process, returning a handle to it on success.
- fn spawn(&mut self) -> io::Result<Self::Child>;
+/// A child process monitor. Takes care of starting and monitoring a child process and calls the
+/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out
+/// of scope it will kill the child and wait for it to exit properly.
+pub struct ChildMonitor {
+ child: Arc<duct::Handle>,
+ thread: Option<thread::JoinHandle<()>>,
}
-
-enum State<C: MonitoredChild> {
- Stopped,
- Running(RunningState<C>),
-}
-
-struct RunningState<C: MonitoredChild> {
- child: C,
- thread_handle: Option<thread::JoinHandle<()>>,
-}
-
-/// A child process monitor. Takes care of starting and monitoring a child process and runs the
-/// listener on child exit.
-pub struct ChildMonitor<S: ChildSpawner> {
- spawner: S,
- state: Arc<Mutex<State<S::Child>>>,
-}
-
-impl<S: ChildSpawner> ChildMonitor<S> {
- /// Creates a new `ChildMonitor` that spawns processes with the given `spawner`. The new
- /// `ChildMonitor` will be in the stopped state and not start any process until you call
- /// `start()`.
- pub fn new(spawner: S) -> Self {
- ChildMonitor {
- spawner: spawner,
- state: Arc::new(Mutex::new(State::Stopped)),
- }
- }
-
- /// Starts the child process and begins to monitor it. `listener` will be called as soon as the
+impl ChildMonitor {
+ /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the
/// child process exits.
- pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)>
- where L: FnMut(bool) + Send + 'static
+ pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self>
+ where L: FnMut(io::Result<&process::Output>) + Send + 'static
{
- let mut state_lock = self.state.lock().unwrap();
- if let State::Stopped = *state_lock {
- let mut child = self.spawner.spawn().chain_err(|| ErrorKind::Spawn)?;
- let io = (child.stdout(), child.stderr());
- let thread_handle = self.spawn_monitor(child.clone(), listener);
- *state_lock = State::Running(
- RunningState {
- child: child,
- thread_handle: Some(thread_handle),
- },
- );
- Ok(io)
- } else {
- bail!(ErrorKind::InvalidState);
- }
+ let child = Arc::new(expression.start()?);
+ let child_clone = child.clone();
+ let thread = Some(thread::spawn(move || on_exit(child_clone.wait())));
+ Ok(ChildMonitor { child, thread })
}
- fn spawn_monitor<L>(&self, child: S::Child, mut listener: L) -> thread::JoinHandle<()>
- where L: FnMut(bool) + Send + 'static
- {
- let state_mutex = self.state.clone();
- thread::spawn(
- move || {
- let success = child.wait().unwrap_or(false);
- {
- let mut state_lock = state_mutex.lock().unwrap();
- *state_lock = State::Stopped;
- }
- listener(success);
- },
- )
+ /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is
+ /// guaranteed to fire before this method returns.
+ pub fn wait(&mut self) -> io::Result<&process::Output> {
+ if let Some(thread) = self.thread.take() {
+ let _ = thread.join();
+ }
+ self.child.wait()
}
- /// Sends a kill signal to the child process.
- pub fn stop(&self) -> Result<()> {
- let state_lock = self.state.lock().unwrap();
- if let State::Running(ref running_state) = *state_lock {
- running_state.child.kill().chain_err(|| ErrorKind::Kill)?;
- Ok(())
- } else {
- bail!(ErrorKind::InvalidState);
- }
+ /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor
+ /// will wait for the process for you.
+ pub fn kill(&self) -> io::Result<()> {
+ self.child.kill()
}
}
-impl<S: ChildSpawner> Drop for ChildMonitor<S> {
+impl Drop for ChildMonitor {
fn drop(&mut self) {
- let thread_handle = {
- let mut state_lock = self.state.lock().unwrap();
- if let State::Running(ref mut state) = *state_lock {
- let _ = state.child.kill();
- state.thread_handle.take()
- } else {
- None
- }
- };
- if let Some(thread_handle) = thread_handle {
- let _ = thread_handle.join();
- }
+ let _ = self.kill();
+ let _ = self.wait();
}
}
@@ -146,77 +52,11 @@ impl<S: ChildSpawner> Drop for ChildMonitor<S> {
#[cfg(test)]
mod child_monitor_tests {
use super::*;
- use std::io;
- use std::process::{ChildStderr, ChildStdout};
- use std::sync::{Arc, Mutex};
+ use duct::{Expression, cmd};
+
use std::sync::mpsc;
- use std::thread;
use std::time::Duration;
- #[derive(Clone)]
- struct MockChild {
- died: Arc<Mutex<bool>>,
- }
-
- impl MockChild {
- pub fn instant_exit() -> Self {
- Self::new(true)
- }
-
- pub fn alive_until_kill() -> Self {
- Self::new(false)
- }
-
- fn new(died: bool) -> Self {
- MockChild { died: Arc::new(Mutex::new(died)) }
- }
- }
-
- impl MonitoredChild for MockChild {
- fn wait(&self) -> io::Result<bool> {
- loop {
- if *self.died.lock().unwrap() {
- break;
- }
- thread::sleep(Duration::new(0, 1_000_000));
- }
- Ok(true)
- }
-
- fn kill(&self) -> io::Result<()> {
- *self.died.lock().unwrap() = true;
- Ok(())
- }
-
- fn stdout(&mut self) -> Option<ChildStdout> {
- None
- }
-
- fn stderr(&mut self) -> Option<ChildStderr> {
- None
- }
- }
-
- struct MockChildSpawner {
- spawn_result: Option<MockChild>,
- }
-
- impl MockChildSpawner {
- pub fn new(spawn_result: Option<MockChild>) -> Self {
- MockChildSpawner { spawn_result: spawn_result }
- }
- }
-
- impl ChildSpawner for MockChildSpawner {
- type Child = MockChild;
-
- fn spawn(&mut self) -> io::Result<MockChild> {
- self.spawn_result
- .clone()
- .ok_or(io::Error::new(io::ErrorKind::Other, "Mocking a failed process spawn"),)
- }
- }
-
/// Tries to recv a message from the given `$rx` for one second and tries to match it with the
/// given expected value, `$expected`
macro_rules! assert_event {
@@ -226,46 +66,55 @@ mod child_monitor_tests {
}}
}
- #[test]
- fn normal_start() {
- let spawner = MockChildSpawner::new(Some(MockChild::instant_exit()));
- let mut testee = ChildMonitor::new(spawner);
+ fn echo_cmd(s: &str) -> Expression {
+ cmd("echo", &[s]).stdout_capture().unchecked()
+ }
- let (tx, rx) = mpsc::channel();
- assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok());
- assert_event!(rx, Ok(true));
+ fn invalid_cmd() -> Expression {
+ cmd("this command does not exist", &[""]).unchecked()
}
- #[test]
- fn start_failed() {
- let spawner = MockChildSpawner::new(None);
- let mut testee = ChildMonitor::new(spawner);
+ fn sleep_cmd(secs: u32) -> Expression {
+ if cfg!(windows) {
+ cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked()
+ } else {
+ cmd("sleep", &[secs.to_string()]).unchecked()
+ }
+ }
+ fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) {
let (tx, rx) = mpsc::channel();
- assert!(testee.start(move |success| tx.send(success).unwrap()).is_err());
- // Make sure that the listener is not kept anywhere. Failing to start should drop the
- // listener
- assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected));
+ let child =
+ ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap())
+ .expect("Unable to start process");
+ (child, rx)
}
#[test]
- fn normal_stop() {
- let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill()));
- let mut testee = ChildMonitor::new(spawner);
+ fn echo() {
+ let (mut child, rx) = spawn(&echo_cmd("foobar"));
+ let wait_output = child.wait().unwrap();
+ let callback_output = rx.try_recv().unwrap().unwrap();
- let (tx, rx) = mpsc::channel();
- assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok());
- assert_event!(rx, Err(mpsc::RecvTimeoutError::Timeout));
+ assert!(callback_output.status.success());
+ assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]);
+ assert_eq!(wait_output.status, callback_output.status);
+ assert_eq!(wait_output.stdout, callback_output.stdout);
+ }
- assert!(testee.stop().is_ok());
- assert_event!(rx, Ok(true));
+ #[test]
+ fn invalid_command() {
+ assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err());
}
#[test]
- fn stop_without_start() {
- let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill()));
- let testee = ChildMonitor::new(spawner);
+ fn callback_after_kill() {
+ let (child, rx) = spawn(&sleep_cmd(100000));
+ // Make sure on_exit is not triggered within the first second. It should not be called
+ // until we kill the process.
+ assert!(rx.recv_timeout(Duration::from_secs(1)).is_err());
- assert_matches!(testee.stop(), Err(Error(ErrorKind::InvalidState, _)));
+ child.kill().unwrap();
+ assert!(!rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap().status.success());
}
}
diff --git a/talpid_core/src/process/openvpn.rs b/talpid_core/src/process/openvpn.rs
index 231898c6f8..d565b085a8 100644
--- a/talpid_core/src/process/openvpn.rs
+++ b/talpid_core/src/process/openvpn.rs
@@ -1,8 +1,8 @@
extern crate openvpn_ffi;
-use super::monitor::{ChildMonitor, ChildSpawner};
+use super::monitor::ChildMonitor;
-use clonablechild::{ChildExt, ClonableChild};
+use duct;
use net::{RemoteAddr, ToRemoteAddrs};
@@ -12,19 +12,21 @@ use std::fmt;
use std::io;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
-use std::process::{Child, ChildStderr, ChildStdout, Command, Stdio};
+use std::process;
use std::sync::{Arc, Mutex};
use talpid_ipc;
error_chain!{
errors {
- /// Error while communicating with the OpenVPN plugin
- PluginCommunicationError
- }
- links {
- ChildMonitorError(::process::monitor::Error, ::process::monitor::ErrorKind)
- #[doc="Something went wrong in the underlying ChildMonitor"];
+ /// Error while communicating with the OpenVPN plugin.
+ PluginCommunicationError {
+ description("Error while communicating with the OpenVPN plugin")
+ }
+ /// Error while trying to spawn OpenVPN process.
+ ChildSpawnError {
+ description("Error while trying to spawn OpenVPN process")
+ }
}
}
@@ -36,7 +38,6 @@ pub struct OpenVpnCommand {
config: Option<PathBuf>,
remotes: Vec<RemoteAddr>,
plugin: Option<(PathBuf, Vec<String>)>,
- pipe_output: bool,
}
impl OpenVpnCommand {
@@ -48,7 +49,6 @@ impl OpenVpnCommand {
config: None,
remotes: vec![],
plugin: None,
- pipe_output: true,
}
}
@@ -71,38 +71,10 @@ impl OpenVpnCommand {
self
}
- /// If piping the standard streams, stdout and stderr will be available to the parent process.
- /// This is the default behavior. If you want the equivalence of attaching the child streams to
- /// /dev/null, invoke this method with false.
- pub fn pipe_output(&mut self, pipe_output: bool) -> &mut Self {
- self.pipe_output = pipe_output;
- self
- }
-
- /// Executes the OpenVPN process as a child process, returning a handle to it.
- pub fn spawn(&self) -> io::Result<Child> {
- let mut command = self.create_command();
- let args = self.get_arguments();
- command.args(&args);
- debug!("Spawning: {}", &self);
- command.spawn()
- }
-
- fn create_command(&self) -> Command {
- let mut command = Command::new(&self.openvpn_bin);
- command
- .stdin(Stdio::null())
- .stdout(self.get_output_pipe_policy())
- .stderr(self.get_output_pipe_policy());
- command
- }
-
- fn get_output_pipe_policy(&self) -> Stdio {
- if self.pipe_output {
- Stdio::piped()
- } else {
- Stdio::null()
- }
+ /// Build a runnable expression from the current state of the command.
+ pub fn build(&self) -> duct::Expression {
+ debug!("Building expression: {}", &self);
+ duct::cmd(&self.openvpn_bin, self.get_arguments())
}
/// Returns all arguments that the subprocess would be spawned with.
@@ -152,81 +124,60 @@ fn write_argument(fmt: &mut fmt::Formatter, arg: &str) -> fmt::Result {
}
-impl ChildSpawner for OpenVpnCommand {
- type Child = ClonableChild;
-
- fn spawn(&mut self) -> io::Result<ClonableChild> {
- OpenVpnCommand::spawn(self).map(|child| child.into_clonable())
- }
-}
-
-
/// Possible events from OpenVPN
pub enum OpenVpnEvent {
/// An event from the plugin loaded into OpenVPN.
- PluginEvent(Result<(openvpn_ffi::OpenVpnPluginEvent, HashMap<String, String>)>),
- /// The OpenVPN process exited. The bool indicates if the process exited cleanly.
- Shutdown(bool),
+ PluginEvent(talpid_ipc::Result<(openvpn_ffi::OpenVpnPluginEvent, HashMap<String, String>)>),
+ /// The OpenVPN process exited. Containing the result of waiting for the process.
+ Shutdown(io::Result<process::ExitStatus>),
}
/// A struct able to start and monitor OpenVPN processes.
pub struct OpenVpnMonitor {
- command: OpenVpnCommand,
- plugin_path: PathBuf,
- monitor: ChildMonitor<OpenVpnCommand>,
+ child: ChildMonitor,
}
impl OpenVpnMonitor {
- /// Creates a new `OpenVpnMonitor` based on the given command
- pub fn new<P: AsRef<Path>>(command: OpenVpnCommand, plugin_path: P) -> Self {
- OpenVpnMonitor {
- command: command.clone(),
- plugin_path: plugin_path.as_ref().to_path_buf(),
- monitor: ChildMonitor::new(command),
- }
- }
-
- /// Starts OpenVPN and begins to monitor it.
- pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)>
- where L: FnMut(OpenVpnEvent) + Send + 'static
+ /// Spawns a new OpenVPN process and monitors it for exit and events.
+ pub fn start<P, L>(mut cmd: OpenVpnCommand, plugin_path: P, listener: L) -> Result<Self>
+ where P: AsRef<Path>,
+ L: FnMut(OpenVpnEvent) + Send + 'static
{
let shared_listener = Arc::new(Mutex::new(listener));
- self.start_plugin_listener(shared_listener.clone())?;
- self.start_child_monitor(shared_listener)
+ let server_id = Self::start_plugin_listener(shared_listener.clone())?;
+ cmd.plugin(plugin_path, vec![server_id]);
+ let child = Self::start_child_monitor(&cmd, shared_listener)?;
+ Ok(OpenVpnMonitor { child })
}
- fn start_plugin_listener<L>(&mut self, shared_listener: Arc<Mutex<L>>) -> Result<()>
+ fn start_plugin_listener<L>(shared_listener: Arc<Mutex<L>>) -> Result<String>
where L: FnMut(OpenVpnEvent) + Send + 'static
{
- let server_id = talpid_ipc::start_new_server(
+ talpid_ipc::start_new_server(
move |msg| {
- let chained_msg = msg.chain_err(|| ErrorKind::PluginCommunicationError);
let mut listener = shared_listener.lock().unwrap();
- (listener.deref_mut())(OpenVpnEvent::PluginEvent(chained_msg));
+ (listener.deref_mut())(OpenVpnEvent::PluginEvent(msg));
},
)
- .chain_err(|| ErrorKind::PluginCommunicationError)?;
- self.command.plugin(&self.plugin_path, vec![server_id]);
- Ok(())
+ .chain_err(|| ErrorKind::PluginCommunicationError)
}
- fn start_child_monitor<L>(&mut self,
+ fn start_child_monitor<L>(cmd: &OpenVpnCommand,
shared_listener: Arc<Mutex<L>>)
- -> Result<(Option<ChildStdout>, Option<ChildStderr>)>
+ -> Result<ChildMonitor>
where L: FnMut(OpenVpnEvent) + Send + 'static
{
- let callback = move |clean_exit| {
+ let on_exit = move |result: io::Result<&process::Output>| {
+ let status = result.map(|out: &process::Output| out.status.clone());
let mut listener = shared_listener.lock().unwrap();
- (listener.deref_mut())(OpenVpnEvent::Shutdown(clean_exit));
+ (listener.deref_mut())(OpenVpnEvent::Shutdown(status));
};
-
- self.monitor = ChildMonitor::new(self.command.clone());
- Ok(self.monitor.start(callback)?)
+ ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildSpawnError)
}
- /// Forwards a stop call to the underlying `ChildMonitor`.
- pub fn stop(&self) -> Result<()> {
- Ok(self.monitor.stop()?)
+ /// Send a kill signal to the OpenVPN process.
+ pub fn kill(&self) -> io::Result<()> {
+ self.child.kill()
}
}
@@ -300,18 +251,3 @@ mod openvpn_command_tests {
assert!(testee_args.contains(&OsString::from("cde")));
}
}
-
-
-#[cfg(test)]
-mod openvpn_monitor_tests {
- use super::*;
-
- #[test]
- fn stop_without_start() {
- let command = OpenVpnCommand::new("");
- let testee = OpenVpnMonitor::new(command, "");
-
- use super::super::monitor::ErrorKind::InvalidState as MInvalidState;
- assert_matches!(testee.stop(), Err(Error(ErrorKind::ChildMonitorError(MInvalidState), _)));
- }
-}