diff options
| -rw-r--r-- | .travis.yml | 14 | ||||
| -rw-r--r-- | Cargo.lock | 51 | ||||
| -rwxr-xr-x | build-deps/install-build-deps.sh | 23 | ||||
| -rw-r--r-- | build-deps/zmq/.gitignore | 2 | ||||
| -rwxr-xr-x | build-deps/zmq/build.sh | 60 | ||||
| -rwxr-xr-x | build-deps/zmq/install-build-deps-apt.sh | 6 | ||||
| -rwxr-xr-x | build-deps/zmq/install-build-deps-osx.sh | 6 | ||||
| -rw-r--r-- | mullvad_daemon/src/mock_ipc.rs | 4 | ||||
| -rw-r--r-- | talpid_cli/src/main.rs | 31 | ||||
| -rw-r--r-- | talpid_core/src/process/openvpn.rs | 79 | ||||
| -rw-r--r-- | talpid_core/src/tunnel/openvpn.rs | 117 | ||||
| -rw-r--r-- | talpid_ipc/Cargo.toml | 3 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 10 | ||||
| -rw-r--r-- | talpid_ipc/src/nop_ipc.rs | 34 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 113 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 37 |
16 files changed, 136 insertions, 454 deletions
diff --git a/.travis.yml b/.travis.yml index cb2de1142d..8762ae3a03 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,5 @@ language: rust -cache: - cargo: true - directories: - - $HOME/build-deps - +cache: cargo rust: - stable - beta @@ -16,8 +12,6 @@ os: - osx before_script: - - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then mkdir -p $HOME/.local/bin && ln -s /usr/local/bin/greadlink $HOME/.local/bin/readlink ; fi - - $(./build-deps/install-build-deps.sh $HOME/build-deps) - export PATH=$HOME/.cargo/bin:$HOME/.local/bin:$PATH - env @@ -26,12 +20,6 @@ script: - cargo test --all --verbose - ./format.sh --write-mode=diff -before_cache: - ## zmq-sys caches the location of `libzmq`, if that location ever changes e.g. - ## if the version changes we will get into trouble. This rm will make us - ## rebuild it every time instead - - rm -rf target/debug/build/zmq* - notifications: email: on_success: never diff --git a/Cargo.lock b/Cargo.lock index 5ee0297d2f..9e86a409c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,11 +140,6 @@ dependencies = [ [[package]] name = "error-chain" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "error-chain" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ @@ -313,16 +308,6 @@ dependencies = [ ] [[package]] -name = "metadeps" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "toml 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "mio" version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -445,11 +430,6 @@ dependencies = [ ] [[package]] -name = "pkg-config" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "quote" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -617,7 +597,6 @@ dependencies = [ "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)", - "zmq 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -674,11 +653,6 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "unicode-bidi" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -773,25 +747,6 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "zmq" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "zmq-sys 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "zmq-sys" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", - "metadeps 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" @@ -810,7 +765,6 @@ dependencies = [ "checksum duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e45aa15fe0a8a8f511e6d834626afd55e49b62e5c8802e18328a87e8a8f6065c" "checksum env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e3856f1697098606fc6cb97a93de88ca3f3bc35bb878c725920e6e82ecf05e83" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" -"checksum error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "318cb3c71ee4cdea69fdc9e15c173b245ed6063e1709029e8fd32525a881120f" "checksum error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6930e04918388a9a2e41d518c25cf679ccafe26733fb4127dbf21993f2575d46" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "55f0008e13fc853f79ea8fc86e931486860d4c4c156cdffb59fa5f7fa833660a" @@ -832,7 +786,6 @@ dependencies = [ "checksum log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5141eca02775a762cc6cd564d8d2c50f67c0ea3a372cbf1c51592b3e029e10ad" "checksum matches 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "efd7622e3022e1a6eaa602c4cea8912254e5582c9c692e9167714182244801b1" "checksum memchr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1dbccc0e46f1ea47b9f17e6d67c5a96bd27030519c519c9c91327e31275a47b4" -"checksum metadeps 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829fffe7ea1d747e23f64be972991bc516b2f1ac2ae4a3b33d8bea150c410151" "checksum mio 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f27d38f824a0d267d55b29b171e9e99269a53812e385fa75c1fe700ae254a6a4" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum net2 0.2.27 (registry+https://github.com/rust-lang/crates.io-index)" = "18b9642ad6222faf5ce46f6966f59b71b9775ad5758c9e09fcf0a6c8061972b4" @@ -842,7 +795,6 @@ dependencies = [ "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum parking_lot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8b45855d69c6ad53fdbd2f163b33506cf015befef980f7c13a9d60c12a111241" "checksum parking_lot_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "56a19dcbb5d1e32b6cccb8a9aa1fc2a38418c8699652e735e2bf391a3dc0aa16" -"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d" "checksum regex 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4278c17d0f6d62dfef0ab00028feb45bd7d2102843f80763474eeb1be8a10c01" @@ -866,7 +818,6 @@ dependencies = [ "checksum thread_local 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c85048c6260d17cf486ceae3282d9fb6b90be220bf5b28c400f5485ffc29f0c7" "checksum tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "99e958104a67877907c1454386d5482fe8e965a55d60be834a15a44328e7dc76" "checksum tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "48f55df1341bb92281f229a6030bc2abffde2c7a44c6d6b802b7687dd8be0775" -"checksum toml 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "736b60249cb25337bc196faa43ee12c705e426f3d55c214d73a4e7be06f92cb4" "checksum unicode-bidi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d3a078ebdd62c0e71a709c3d53d2af693fe09fe93fbff8344aebe289b78f9032" "checksum unicode-normalization 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e28fa37426fceeb5cf8f41ee273faa7c82c47dc8fba5853402841e665fcd86ff" "checksum unicode-segmentation 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "18127285758f0e2c6cf325bb3f3d138a12fee27de4f23e146cd6a179f26c2cf3" @@ -881,5 +832,3 @@ dependencies = [ "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" "checksum ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)" = "<none>" "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -"checksum zmq 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9623581b345b8a85fb72cae9b2cb67116d73ccb141e6c02f689e311962b74f9c" -"checksum zmq-sys 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ecf3aafc58984279d9b8f776703ef569d32ae386353192a267cd6e711da70dde" diff --git a/build-deps/install-build-deps.sh b/build-deps/install-build-deps.sh deleted file mode 100755 index fd72129b69..0000000000 --- a/build-deps/install-build-deps.sh +++ /dev/null @@ -1,23 +0,0 @@ -#! /usr/bin/env bash -set -eu - -## Everything printed to stdout will be run by travis. So only print -## stuff that needs to be set, e.g. environment variables. - -SCRIPT_DIR=$(readlink -f $(dirname $0)) -INSTALL_DIR=$(readlink -f ${1:-"$SCRIPT_DIR"}) - -################################################################################ -################################## ZMQ ######################################### - -mkdir -p $INSTALL_DIR/zmq/unix -$(dirname $0)/zmq/build.sh $INSTALL_DIR/zmq/unix >&2 - -echo "# ZeroMQ is installed, now run" >&2 -echo "export LIBZMQ_PREFIX=$INSTALL_DIR/zmq/unix" | tee /dev/stderr -set +u -echo "export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$INSTALL_DIR/zmq/unix/lib" | tee /dev/stderr -set -u - -################################## ZMQ ######################################### -################################################################################ diff --git a/build-deps/zmq/.gitignore b/build-deps/zmq/.gitignore deleted file mode 100644 index d5ef630613..0000000000 --- a/build-deps/zmq/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -libzmq-git -linux diff --git a/build-deps/zmq/build.sh b/build-deps/zmq/build.sh deleted file mode 100755 index 363d0df446..0000000000 --- a/build-deps/zmq/build.sh +++ /dev/null @@ -1,60 +0,0 @@ -#! /usr/bin/env bash -set -eu - -WD=$(pwd)/$(dirname $0) -GIT_DIR=libzmq-git -INSTALL_DIR=$(readlink -f $1) -USE_CACHE=${USE_CACHE:-"1"} - - - - -### TEST IF ZMQ IS BUILT ### -F1=$INSTALL_DIR/include/zmq.h -F2=$INSTALL_DIR/lib/libzmq.so -if [[ -f $F1 && -f $F2 ]]; then - ZMQ_IS_BUILT=1 -else - ZMQ_IS_BUILT=0 - echo "Cannot find $F1 or $F2, will rebuild ZeroMQ" -fi - -if [[ $USE_CACHE == 1 && $ZMQ_IS_BUILT == 1 ]]; then - echo "Using a cached version of ZeroMQ" - exit 0 -fi - - -echo "If this fails, make sure that you have installed the packages needed to \ -build zmq, for ubuntu and OS X they can be found in \ -$WD/install-build-deps-{apt|osx}.sh" -echo "" - - - - -### Get the code ### -if [ -e "$WD/$GIT_DIR/.git" ]; then - ( - cd "$WD/$GIT_DIR" - git fetch - git checkout origin/master - ) -else - git clone --depth=1 git@github.com:zeromq/zeromq4-1.git "$WD/$GIT_DIR" -fi - -### Build ### -## We skip running the tests here as we trust the zmq maintainers to publish -## working code :) Living life on the wild side -trap "cd -" EXIT -cd "$WD/$GIT_DIR" - -set -x -./autogen.sh -./configure --prefix="$INSTALL_DIR" # add other options here -make -make install -set +x - -echo "WOOWZ, it's built now. All the good stuff is in $INSTALL_DIR" diff --git a/build-deps/zmq/install-build-deps-apt.sh b/build-deps/zmq/install-build-deps-apt.sh deleted file mode 100755 index 6a762f603c..0000000000 --- a/build-deps/zmq/install-build-deps-apt.sh +++ /dev/null @@ -1,6 +0,0 @@ -#! /usr/bin/env bash - -set -eux - -apt-get update -apt-get -y install git build-essential libtool autoconf automake uuid-dev pkg-config diff --git a/build-deps/zmq/install-build-deps-osx.sh b/build-deps/zmq/install-build-deps-osx.sh deleted file mode 100755 index c52dc026e8..0000000000 --- a/build-deps/zmq/install-build-deps-osx.sh +++ /dev/null @@ -1,6 +0,0 @@ -#! /usr/bin/env bash - -set -eux - -brew update -brew install libtool autoconf automake pkg-config diff --git a/mullvad_daemon/src/mock_ipc.rs b/mullvad_daemon/src/mock_ipc.rs index dd76f8f8c0..90859401c2 100644 --- a/mullvad_daemon/src/mock_ipc.rs +++ b/mullvad_daemon/src/mock_ipc.rs @@ -31,7 +31,7 @@ impl IpcServer { } pub fn address(&self) -> &str { - &self.server.address() + self.server.address() } pub fn wait(self) -> talpid_ipc::Result<()> { @@ -81,7 +81,7 @@ fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { /// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for /// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to -/// a handle) to the jsonrpc client talking with talpid_core. +/// a handle) to the jsonrpc client talking with `talpid_core`. pub struct MockIpcApi { next_subscription_id: atomic::AtomicUsize, active: ActiveSubscriptions, diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs index cc236677fc..8433fa03e8 100644 --- a/talpid_cli/src/main.rs +++ b/talpid_cli/src/main.rs @@ -10,9 +10,11 @@ extern crate log; extern crate env_logger; use std::path::Path; +use std::sync::Mutex; use std::sync::mpsc::{self, Receiver}; -use talpid_core::process::openvpn::{OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor}; +use talpid_core::process::openvpn::OpenVpnCommand; +use talpid_core::tunnel::openvpn::{OpenVpnEvent, OpenVpnMonitor}; mod cli; @@ -27,7 +29,7 @@ fn run() -> Result<()> { init_logger()?; let args = cli::parse_args_or_exit(); let command = create_openvpn_command(&args); - main_loop(command, args.plugin_path.as_path()) + main_loop(&command, args.plugin_path.as_path()) } pub fn init_logger() -> Result<()> { @@ -43,9 +45,10 @@ fn create_openvpn_command(args: &Args) -> OpenVpnCommand { command } -fn main_loop(command: OpenVpnCommand, plugin_path: &Path) -> Result<()> { +fn main_loop(command: &OpenVpnCommand, plugin_path: &Path) -> Result<()> { + let (monitor, rx) = create_openvpn_monitor(plugin_path)?; loop { - let (_monitor, rx) = start_monitor(command.clone(), plugin_path)?; + monitor.start(command.clone()).chain_err(|| "Unable to start OpenVPN")?; while let Ok(msg) = rx.recv() { match msg { OpenVpnEvent::Shutdown(result) => { @@ -55,22 +58,22 @@ fn main_loop(command: OpenVpnCommand, plugin_path: &Path) -> Result<()> { ); break; } - OpenVpnEvent::PluginEvent(Ok((event, env))) => { + OpenVpnEvent::PluginEvent(event, env) => { println!("OpenVPN event:\nEvent: {:?}\nENV: {:?}", event, env); } - OpenVpnEvent::PluginEvent(Err(e)) => println!("Read error from plugin: {:?}", e), } } std::thread::sleep(std::time::Duration::from_millis(500)); } } -fn start_monitor(command: OpenVpnCommand, - plugin_path: &Path) - -> Result<(OpenVpnMonitor, Receiver<OpenVpnEvent>)> { - let (tx, rx) = mpsc::channel(); - let listener = move |event: OpenVpnEvent| tx.send(event).unwrap(); - OpenVpnMonitor::start(command, plugin_path, listener) - .map(|m| (m, rx)) - .chain_err(|| "Unable to start OpenVPN") +fn create_openvpn_monitor(plugin_path: &Path) -> Result<(OpenVpnMonitor, Receiver<OpenVpnEvent>)> { + let (event_tx, event_rx) = mpsc::channel(); + let event_tx_mutex = Mutex::new(event_tx); + let on_event = move |event: OpenVpnEvent| { + event_tx_mutex.lock().unwrap().send(event).expect("Unable to send on tx_lock"); + }; + let monitor = OpenVpnMonitor::new(on_event, plugin_path) + .chain_err(|| "Unable to start OpenVPN monitor")?; + Ok((monitor, event_rx)) } diff --git a/talpid_core/src/process/openvpn.rs b/talpid_core/src/process/openvpn.rs index 5d40d8e42c..df1e4c7682 100644 --- a/talpid_core/src/process/openvpn.rs +++ b/talpid_core/src/process/openvpn.rs @@ -1,7 +1,5 @@ extern crate openvpn_ffi; -use super::monitor::ChildMonitor; - use duct; use net::{RemoteAddr, ToRemoteAddrs}; @@ -9,25 +7,8 @@ use net::{RemoteAddr, ToRemoteAddrs}; use std::ffi::{OsStr, OsString}; use std::fmt; use std::io; -use std::ops::DerefMut; use std::path::{Path, PathBuf}; -use std::process; -use std::sync::{Arc, Mutex}; - -use talpid_ipc; -error_chain!{ - errors { - /// Error while communicating with the OpenVPN plugin. - PluginCommunicationError { - description("Error while communicating with the OpenVPN plugin") - } - /// Error while trying to spawn OpenVPN process. - ChildSpawnError { - description("Error while trying to spawn OpenVPN process") - } - } -} /// An OpenVPN process builder, providing control over the different arguments that the OpenVPN /// binary accepts. @@ -123,66 +104,8 @@ fn write_argument(fmt: &mut fmt::Formatter, arg: &str) -> fmt::Result { } -/// Possible events from OpenVPN -pub enum OpenVpnEvent { - /// An event from the plugin loaded into OpenVPN. - PluginEvent(talpid_ipc::Result<(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv)>), - /// The OpenVPN process exited. Containing the result of waiting for the process. - Shutdown(io::Result<process::ExitStatus>), -} - -/// A struct able to start and monitor OpenVPN processes. -pub struct OpenVpnMonitor { - child: ChildMonitor, -} - -impl OpenVpnMonitor { - /// Spawns a new OpenVPN process and monitors it for exit and events. - pub fn start<P, L>(mut cmd: OpenVpnCommand, plugin_path: P, listener: L) -> Result<Self> - where P: AsRef<Path>, - L: FnMut(OpenVpnEvent) + Send + 'static - { - let shared_listener = Arc::new(Mutex::new(listener)); - let server_id = Self::start_plugin_listener(shared_listener.clone())?; - cmd.plugin(plugin_path, vec![server_id]); - let child = Self::start_child_monitor(&cmd, shared_listener)?; - Ok(OpenVpnMonitor { child }) - } - - fn start_plugin_listener<L>(shared_listener: Arc<Mutex<L>>) -> Result<String> - where L: FnMut(OpenVpnEvent) + Send + 'static - { - talpid_ipc::start_new_server( - move |msg| { - let mut listener = shared_listener.lock().unwrap(); - (listener.deref_mut())(OpenVpnEvent::PluginEvent(msg)); - }, - ) - .chain_err(|| ErrorKind::PluginCommunicationError) - } - - fn start_child_monitor<L>(cmd: &OpenVpnCommand, - shared_listener: Arc<Mutex<L>>) - -> Result<ChildMonitor> - where L: FnMut(OpenVpnEvent) + Send + 'static - { - let on_exit = move |result: io::Result<&process::Output>| { - let status = result.map(|out: &process::Output| out.status.clone()); - let mut listener = shared_listener.lock().unwrap(); - (listener.deref_mut())(OpenVpnEvent::Shutdown(status)); - }; - ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildSpawnError) - } - - /// Send a kill signal to the OpenVPN process. - pub fn kill(&self) -> io::Result<()> { - self.child.kill() - } -} - - #[cfg(test)] -mod openvpn_command_tests { +mod tests { use super::OpenVpnCommand; use net::RemoteAddr; use std::ffi::OsString; diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs index c0d65ce462..bcf89e3eb0 100644 --- a/talpid_core/src/tunnel/openvpn.rs +++ b/talpid_core/src/tunnel/openvpn.rs @@ -1,8 +1,121 @@ use jsonrpc_core::{Error, IoHandler}; use openvpn_ffi; +use process::monitor::ChildMonitor; +use process::openvpn::OpenVpnCommand; +use std::io; + +use std::path::{Path, PathBuf}; +use std::process; +use std::result::Result as StdResult; +use std::sync::{Arc, Mutex}; use talpid_ipc; +mod errors { + error_chain!{ + errors { + /// The `OpenVpnMonitor` is in an invalid state for the requested operation. + InvalidState { + description("Invalid state. OpenVPN is already running") + } + /// Unable to start or kill the OpenVPN process. + ChildProcessError { + description("Unable to start or kill the OpenVPN process") + } + /// Unable to start or manage the IPC server listening for events from OpenVPN + IpcServerError { + description("Unable to start or manage the IPC server") + } + } + } +} +pub use self::errors::*; + + +/// Possible events from OpenVPN +pub enum OpenVpnEvent { + /// An event from the plugin loaded into OpenVPN. + PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), + /// The OpenVPN process exited. Containing the result of waiting for the process. + Shutdown(io::Result<process::ExitStatus>), +} + +/// Struct for monitoring OpenVPN processes. +pub struct OpenVpnMonitor { + on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>, + plugin_path: PathBuf, + child: Arc<Mutex<Option<ChildMonitor>>>, + event_dispatcher: OpenVpnEventDispatcher, +} + +impl OpenVpnMonitor { + /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given + /// path. + pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self> + where L: Fn(OpenVpnEvent) + Send + Sync + 'static, + P: AsRef<Path> + { + let on_event = Arc::new(on_event); + let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?; + Ok( + OpenVpnMonitor { + on_event, + plugin_path: plugin_path.as_ref().to_owned(), + child: Arc::new(Mutex::new(None)), + event_dispatcher, + }, + ) + } + + fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>) + -> Result<OpenVpnEventDispatcher> { + let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env)); + OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError) + } + + /// Tries to start a new OpenVPN process if one is not already running. + /// If this `OpenVpnMonitor is already monitoring a running process it will return an + /// `InvalidState` error. + pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> { + let mut child_lock = self.child.lock().unwrap(); + if child_lock.is_some() { + bail!(ErrorKind::InvalidState); + } + *child_lock = Some(self.start_child_monitor(cmd)?); + Ok(()) + } + + fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> { + self.set_plugin(&mut cmd); + + let child = self.child.clone(); + let on_event = self.on_event.clone(); + + let on_exit = move |exit_status: io::Result<&process::Output>| { + *child.lock().unwrap() = None; + (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),) + }; + ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError) + } + + fn set_plugin(&self, cmd: &mut OpenVpnCommand) { + let event_dispatcher_address = self.event_dispatcher.address().to_string(); + cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]); + } + + /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does + /// nothing. + pub fn kill(&self) -> Result<()> { + if let Some(ref child) = *self.child.lock().unwrap() { + child.kill().chain_err(|| ErrorKind::ChildProcessError)?; + } + Ok(()) + } +} + + + + /// IPC server for listening to events coming from plugin loaded into OpenVPN. pub struct OpenVpnEventDispatcher { server: talpid_ipc::IpcServer, @@ -41,7 +154,7 @@ mod api { fn openvpn_event(&self, openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) - -> Result<(), Error>; + -> StdResult<(), Error>; } } } @@ -59,7 +172,7 @@ impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L> fn openvpn_event(&self, event: openvpn_ffi::OpenVpnPluginEvent, env: openvpn_ffi::OpenVpnEnv) - -> Result<(), Error> { + -> StdResult<(), Error> { debug!("OpenVPN event {:?}", event); (self.on_event)(event, env); Ok(()) diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml index f7ccfef19f..d1a2bb13e2 100644 --- a/talpid_ipc/Cargo.toml +++ b/talpid_ipc/Cargo.toml @@ -14,9 +14,6 @@ jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-z ws = { git = "https://github.com/tomusdrw/ws-rs" } url = "1.4" -[target.'cfg(not(windows))'.dependencies] -zmq = "0.8" - [dev-dependencies] assert_matches = "1.0" env_logger = "0.4" diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index f0f50e5f2b..f6d4acea19 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -18,16 +18,6 @@ use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -#[cfg(windows)] -#[path = "nop_ipc.rs"] -mod ipc_impl; - -#[cfg(not(windows))] -#[path = "zmq_ipc.rs"] -mod ipc_impl; - -pub use self::ipc_impl::*; - mod client; pub use client::*; diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs deleted file mode 100644 index 8c7a24bc63..0000000000 --- a/talpid_ipc/src/nop_ipc.rs +++ /dev/null @@ -1,34 +0,0 @@ -use super::{ErrorKind, IpcServerId, Result}; - -use serde; - -/// This implementation only exists because we cannot get ZeroMQ to work on -/// Windows. This is not a valid IPC implementation and us using -/// it on Windows will result in a non-functioning client. -/// -/// We plan on trying with ZMQ again in the future. -/// Erik, 2017-02-09 -pub fn start_new_server<T, F>(_on_message: F) -> Result<IpcServerId> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - bail!(ErrorKind::CouldNotStartServer); -} - -pub struct IpcClient<T> - where T: serde::Serialize -{ - _phantom: ::std::marker::PhantomData<T>, -} - -impl<T> IpcClient<T> - where T: serde::Serialize -{ - pub fn new(_server_id: IpcServerId) -> Self { - IpcClient { _phantom: ::std::marker::PhantomData } - } - - pub fn send(&mut self, _message: &T) -> Result<()> { - bail!(ErrorKind::SendError); - } -} diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs deleted file mode 100644 index 35fe5e59a9..0000000000 --- a/talpid_ipc/src/zmq_ipc.rs +++ /dev/null @@ -1,113 +0,0 @@ -extern crate zmq; -extern crate serde_json; - -use super::{ErrorKind, IpcServerId, Result, ResultExt}; - -use serde; - -use std::thread; - -/// Starts the server end of an IPC channel. The returned `IpcServerId` is the unique identifier -/// allowing an `IpcClient` to connect to this server instance. Returns an error if unable to set -/// up the server. -/// -/// Incoming messages are sent as `Ok` results to the `on_message` callback. IO errors will be sent -/// as `Err` results to the `on_message` callback. -/// -/// This function is non-blocking and thus spawns a thread where it listens to messages. -pub fn start_new_server<T, F>(on_message: F) -> Result<IpcServerId> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - for port in 5000..5010 { - let connection_string = format!("tcp://127.0.0.1:{}", port); - if let Ok(socket) = start_zmq_server(&connection_string) { - let _ = start_receive_loop(socket, on_message); - debug!("Listening on {}", connection_string); - return Ok(connection_string); - } - } - bail!(ErrorKind::CouldNotStartServer); -} - -fn start_zmq_server(connection_string: &str) -> zmq::Result<zmq::Socket> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PULL)?; - socket.bind(connection_string)?; - - Ok(socket) -} - -fn start_receive_loop<T, F>(socket: zmq::Socket, mut on_message: F) -> thread::JoinHandle<()> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - thread::spawn( - move || loop { - let read_res = socket - .recv_bytes(0) - .chain_err(|| ErrorKind::ReadFailure) - .and_then(|a| parse_message(&a)); - on_message(read_res); - }, - ) -} - -fn parse_message<'a, T>(message: &'a [u8]) -> Result<T> - where T: serde::Deserialize<'a> + 'static -{ - serde_json::from_slice(message).chain_err(|| ErrorKind::ParseFailure) -} - - -pub struct IpcClient<T> - where T: serde::Serialize -{ - server_id: IpcServerId, - socket: Option<zmq::Socket>, - _phantom: ::std::marker::PhantomData<T>, -} - -impl<T> IpcClient<T> - where T: serde::Serialize -{ - pub fn new(server_id: IpcServerId) -> Self { - IpcClient { - server_id: server_id, - socket: None, - _phantom: ::std::marker::PhantomData, - } - } - - pub fn send(&mut self, message: &T) -> Result<()> { - let bytes = Self::serialize(message)?; - self.send_bytes(bytes.as_slice()) - } - - fn serialize(t: &T) -> Result<Vec<u8>> { - serde_json::to_vec(t).chain_err(|| ErrorKind::ParseFailure) - } - - fn send_bytes(&mut self, message: &[u8]) -> Result<()> { - if self.socket.is_none() { - self.connect().chain_err(|| ErrorKind::SendError)?; - } - - let socket = self.socket.as_ref().unwrap(); - socket.send(message, 0).chain_err(|| ErrorKind::SendError) - } - - fn connect(&mut self) -> Result<()> { - debug!("Trying to establish connection to {}", self.server_id); - let ctx = zmq::Context::new(); - let socket = ctx.socket(zmq::PUSH) - .chain_err(|| "Could not create ZeroMQ PUSH socket".to_owned())?; - socket - .connect(&self.server_id) - .chain_err(|| format!("Could not connect to {:?}", self.server_id))?; - - self.socket = Some(socket); - Ok(()) - } -} diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs deleted file mode 100644 index db9650f7a0..0000000000 --- a/talpid_ipc/tests/zmq_integration_tests.rs +++ /dev/null @@ -1,37 +0,0 @@ -#[cfg(all(test, not(windows)))] -mod zmq_integration_tests { - extern crate serde; - extern crate talpid_ipc; - - use self::talpid_ipc::{IpcClient, IpcServerId, Result}; - - use std::sync::mpsc::{self, Receiver}; - use std::time::Duration; - - #[test] - fn can_connect_and_send_and_receive_messages() { - let (connection_string, new_messages_rx) = start_server::<String>(); - - let mut ipc_client = IpcClient::new(connection_string); - let msg = "Hello".to_owned(); - ipc_client.send(&msg).expect("Could not send message"); - - let message = new_messages_rx - .recv_timeout(Duration::from_millis(1000)) - .expect("Did not receive a message"); - - assert_eq!(message.unwrap(), "Hello", "Got wrong message"); - } - - fn start_server<T>() -> (IpcServerId, Receiver<Result<T>>) - where for<'de> T: serde::Deserialize<'de> + Send + 'static - { - let (tx, rx) = mpsc::channel(); - - let callback = move |message: Result<T>| { let _ = tx.send(message); }; - let connection_string = - talpid_ipc::start_new_server(callback).expect("Could not start the server"); - - (connection_string, rx) - } -} |
