diff options
Diffstat (limited to 'talpid-core/src')
| -rw-r--r-- | talpid-core/src/lib.rs | 36 | ||||
| -rw-r--r-- | talpid-core/src/mpsc.rs | 76 | ||||
| -rw-r--r-- | talpid-core/src/net.rs | 219 | ||||
| -rw-r--r-- | talpid-core/src/process/mod.rs | 6 | ||||
| -rw-r--r-- | talpid-core/src/process/openvpn.rs | 223 | ||||
| -rw-r--r-- | talpid-core/src/process/unix.rs | 33 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/mod.rs | 171 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/openvpn.rs | 258 |
8 files changed, 1022 insertions, 0 deletions
diff --git a/talpid-core/src/lib.rs b/talpid-core/src/lib.rs new file mode 100644 index 0000000000..1fc92f8fe4 --- /dev/null +++ b/talpid-core/src/lib.rs @@ -0,0 +1,36 @@ +#![deny(missing_docs)] + +//! The core components of the talpidaemon VPN client. + +#[cfg(test)] +#[macro_use] +extern crate assert_matches; + +extern crate duct; + +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate log; +extern crate mktemp; + +#[macro_use] +extern crate error_chain; +extern crate jsonrpc_core; +#[macro_use] +extern crate jsonrpc_macros; + +extern crate talpid_ipc; +extern crate openvpn_ffi; + +/// Working with processes. +pub mod process; + +/// Network primitives. +pub mod net; + +/// Abstracts over different VPN tunnel technologies +pub mod tunnel; + +/// Abstractions and extra features on `std::mpsc` +pub mod mpsc; diff --git a/talpid-core/src/mpsc.rs b/talpid-core/src/mpsc.rs new file mode 100644 index 0000000000..d63956c452 --- /dev/null +++ b/talpid-core/src/mpsc.rs @@ -0,0 +1,76 @@ +use std::marker::PhantomData; +use std::sync::mpsc; + +/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending. +#[derive(Debug, Clone)] +pub struct IntoSender<T, U> { + sender: mpsc::Sender<U>, + _marker: PhantomData<T>, +} + +impl<T, U> IntoSender<T, U> + where T: Into<U> +{ + /// Converts the `T` into a `U` and sends it on the channel. + pub fn send(&self, t: T) -> Result<(), mpsc::SendError<U>> { + self.sender.send(t.into()) + } +} + +impl<T, U> From<mpsc::Sender<U>> for IntoSender<T, U> + where T: Into<U> +{ + fn from(sender: mpsc::Sender<U>) -> Self { + IntoSender { + sender: sender, + _marker: PhantomData, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + use std::thread; + + #[derive(Debug, Eq, PartialEq)] + enum Inner { + One, + Two, + } + + #[derive(Debug, Eq, PartialEq)] + enum Outer { + Inner(Inner), + Other, + } + + impl From<Inner> for Outer { + fn from(o: Inner) -> Self { + Outer::Inner(o) + } + } + + #[test] + fn sender() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); + + tx.send(Outer::Other).unwrap(); + inner_tx.send(Inner::Two).unwrap(); + + assert_eq!(Outer::Other, rx.recv().unwrap()); + assert_eq!(Outer::Inner(Inner::Two), rx.recv().unwrap()); + } + + #[test] + fn send_between_thread() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); + + thread::spawn(move || { inner_tx.send(Inner::One).unwrap(); }); + + assert_eq!(Outer::Inner(Inner::One), rx.recv().unwrap()); + } +} diff --git a/talpid-core/src/net.rs b/talpid-core/src/net.rs new file mode 100644 index 0000000000..310adee6ca --- /dev/null +++ b/talpid-core/src/net.rs @@ -0,0 +1,219 @@ +use std::fmt; +use std::net::SocketAddr; +use std::str::FromStr; + + +error_chain! { + errors { + /// Error indicating parsing the address failed + AddrParse(s: String) { + description("Invalid address format") + display("Unable to parse address. {}", s) + } + } +} + + +/// Represents a network layer IP address together with the transport layer protocol and port. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Endpoint { + /// The address part of this endpoint, contains the IP and port. + pub address: RemoteAddr, + /// The protocol part of this endpoint. + pub protocol: TransportProtocol, +} + +impl Endpoint { + /// Constructs a new `Endpoint` from the given parameters. + pub fn new(address: &str, port: u16, protocol: TransportProtocol) -> Self { + Endpoint { + address: RemoteAddr::new(address, port), + protocol: protocol, + } + } +} + +/// Representation of a transport protocol, either UDP or TCP. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub enum TransportProtocol { + /// Represents the UDP transport protocol. + Udp, + /// Represents the TCP transport protocol. + Tcp, +} + +/// Representation of a TCP or UDP endpoint. The IP level address is represented by either an IP +/// directly or a hostname/domain. The IP level address together with a port becomes a socket +/// address. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum RemoteAddr { + /// Endpoint represented by an IP and a port. + SocketAddr(SocketAddr), + /// Endpoint represented by a hostname or domain and a port. + Domain(String, u16), +} + +impl RemoteAddr { + /// Constructs a new `RemoteAddr` from the given address (hostname or domain) and port. To + /// construct a `RemoteAddr` based on IP rather than domain, use the From<SocketAddr> impl. + pub fn new(address: &str, port: u16) -> Self { + RemoteAddr::Domain(address.to_owned(), port) + } + + /// Returns the address associated with this `RemoteAddr`. If it is backed by an IP that will + /// be formatted as a string. + pub fn address(&self) -> String { + match *self { + RemoteAddr::SocketAddr(ref addr) => addr.ip().to_string(), + RemoteAddr::Domain(ref address, _) => address.to_owned(), + } + } + + /// Returns the port associated with this `RemoteAddr`. + pub fn port(&self) -> u16 { + match *self { + RemoteAddr::SocketAddr(addr) => addr.port(), + RemoteAddr::Domain(_, port) => port, + } + } + + fn from_domain_str(s: &str) -> Result<Self> { + let (address, port_str) = Self::split_at_last_colon(s)?; + let port = + u16::from_str(port_str) + .chain_err(|| ErrorKind::AddrParse(format!("Invalid port: \"{}\"", port_str)),)?; + if address.is_empty() || address.contains(':') { + let msg = format!("Invalid IP or domain: \"{}\"", address); + bail!(ErrorKind::AddrParse(msg)); + } + Ok(RemoteAddr::Domain(address.to_owned(), port)) + } + + fn split_at_last_colon(s: &str) -> Result<(&str, &str)> { + let mut iter = s.rsplitn(2, ':'); + let port = iter.next().unwrap(); + let address = iter.next() + .ok_or_else(|| Error::from(ErrorKind::AddrParse("No colon".to_owned())))?; + Ok((address, port)) + } +} + +impl From<SocketAddr> for RemoteAddr { + fn from(socket_addr: SocketAddr) -> Self { + RemoteAddr::SocketAddr(socket_addr) + } +} + +impl FromStr for RemoteAddr { + type Err = Error; + fn from_str(s: &str) -> Result<Self> { + if let Ok(addr) = SocketAddr::from_str(s) { + Ok(RemoteAddr::from(addr)) + } else { + Self::from_domain_str(s) + } + } +} + +impl fmt::Display for RemoteAddr { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + RemoteAddr::SocketAddr(ref addr) => addr.fmt(fmt), + RemoteAddr::Domain(ref address, ref port) => write!(fmt, "{}:{}", address, port), + } + } +} + + +#[cfg(test)] +mod remote_addr_tests { + use super::*; + + use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; + use std::str::FromStr; + + #[test] + fn new_and_getters() { + let testee = RemoteAddr::new("a_domain", 543); + assert_eq!("a_domain", testee.address()); + assert_eq!(543, testee.port()); + } + + #[test] + fn from_socket_addr() { + let socket_addr = SocketAddr::from_str("10.0.1.1:76").unwrap(); + let testee: RemoteAddr = socket_addr.into(); + assert_eq!("10.0.1.1", testee.address()); + assert_eq!(76, testee.port()); + } + + #[test] + fn from_str() { + let testee = RemoteAddr::from_str("example.com:3333").unwrap(); + assert_eq!("example.com", testee.address()); + assert_eq!(3333, testee.port()); + } + + #[test] + fn from_ipv6_str_without_brackets() { + let result = RemoteAddr::from_str("fe80::1:1337"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn from_ipv6_str_with_brackets() { + let testee = RemoteAddr::from_str("[fe80::1]:1337").unwrap(); + assert_eq!("fe80::1", testee.address()); + assert_eq!(1337, testee.port()); + } + + #[test] + fn from_ipv6_str_without_port() { + let result = RemoteAddr::from_str("fe80::1"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn from_str_no_colon() { + let result = RemoteAddr::from_str("example.com"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn from_str_invalid_port_large() { + let result = RemoteAddr::from_str("example.com:99999"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn from_str_empty_address() { + let result = RemoteAddr::from_str(":100"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn from_str_empty_port() { + let result = RemoteAddr::from_str("example.com:"); + assert_matches!(result, Err(Error(ErrorKind::AddrParse(_), _))); + } + + #[test] + fn to_string_domain() { + let testee = RemoteAddr::new("example.com", 3333); + assert_eq!("example.com:3333", testee.to_string()); + } + + #[test] + fn to_string_ipv4() { + let socket_addr = SocketAddr::V4(SocketAddrV4::from_str("127.1.2.3:1337").unwrap()); + let testee = RemoteAddr::from(socket_addr); + assert_eq!("127.1.2.3:1337", testee.to_string()); + } + + #[test] + fn to_string_ipv6() { + let socket_addr = SocketAddr::V6(SocketAddrV6::from_str("[2001:beef::1]:9876").unwrap()); + let testee = RemoteAddr::from(socket_addr); + assert_eq!("[2001:beef::1]:9876", testee.to_string()); + } +} diff --git a/talpid-core/src/process/mod.rs b/talpid-core/src/process/mod.rs new file mode 100644 index 0000000000..88bdc12ea4 --- /dev/null +++ b/talpid-core/src/process/mod.rs @@ -0,0 +1,6 @@ +/// A module for all OpenVPN related process management. +pub mod openvpn; + +/// Unix specific process management features. +#[cfg(unix)] +pub mod unix; diff --git a/talpid-core/src/process/openvpn.rs b/talpid-core/src/process/openvpn.rs new file mode 100644 index 0000000000..ea66ef894c --- /dev/null +++ b/talpid-core/src/process/openvpn.rs @@ -0,0 +1,223 @@ +extern crate openvpn_ffi; + +use duct; + +use net; + +use std::ffi::{OsStr, OsString}; +use std::fmt; +use std::path::{Path, PathBuf}; + +static BASE_ARGUMENTS: &[&[&str]] = &[ + &["--client"], + &["--nobind"], + &["--dev", "tun"], + &["--ping", "3"], + &["--ping-exit", "15"], + &["--connect-retry", "0", "0"], + &["--connect-retry-max", "1"], + &["--comp-lzo"], +]; + +static ALLOWED_TLS_CIPHERS: &[&str] = &[ + "TLS-DHE-RSA-WITH-AES-256-GCM-SHA384", + "TLS-DHE-RSA-WITH-AES-256-CBC-SHA", + "TLS-DHE-RSA-WITH-CAMELLIA-256-CBC-SHA", + "TLS-DHE-RSA-WITH-AES-128-CBC-SHA", + "TLS-DHE-RSA-WITH-SEED-CBC-SHA", + "TLS-DHE-RSA-WITH-CAMELLIA-128-CBC-SHA", +]; + +/// An OpenVPN process builder, providing control over the different arguments that the OpenVPN +/// binary accepts. +#[derive(Clone)] +pub struct OpenVpnCommand { + openvpn_bin: OsString, + config: Option<PathBuf>, + remote: Option<net::Endpoint>, + user_pass_path: Option<PathBuf>, + ca: Option<PathBuf>, + plugin: Option<(PathBuf, Vec<String>)>, +} + +impl OpenVpnCommand { + /// Constructs a new `OpenVpnCommand` for launching OpenVPN processes from the binary at + /// `openvpn_bin`. + pub fn new<P: AsRef<OsStr>>(openvpn_bin: P) -> Self { + OpenVpnCommand { + openvpn_bin: OsString::from(openvpn_bin.as_ref()), + config: None, + remote: None, + user_pass_path: None, + ca: None, + plugin: None, + } + } + + /// Sets what configuration file will be given to OpenVPN + pub fn config<P: AsRef<Path>>(&mut self, path: P) -> &mut Self { + self.config = Some(path.as_ref().to_path_buf()); + self + } + + /// Sets the address and protocol that OpenVPN will connect to. + pub fn remote(&mut self, remote: net::Endpoint) -> &mut Self { + self.remote = Some(remote); + self + } + + /// Sets the path to the file where the username and password for user-pass authentication is + /// stored. See the `--auth-user-pass` OpenVPN documentation for details. + pub fn user_pass<P: AsRef<Path>>(&mut self, path: P) -> &mut Self { + self.user_pass_path = Some(path.as_ref().to_path_buf()); + self + } + + /// Sets the path to the CA certificate file. + pub fn ca<P: AsRef<Path>>(&mut self, path: P) -> &mut Self { + self.ca = Some(path.as_ref().to_path_buf()); + self + } + + /// Sets a plugin and its arguments that OpenVPN will be started with. + pub fn plugin<P: AsRef<Path>>(&mut self, path: P, args: Vec<String>) -> &mut Self { + self.plugin = Some((path.as_ref().to_path_buf(), args)); + self + } + + /// Build a runnable expression from the current state of the command. + pub fn build(&self) -> duct::Expression { + debug!("Building expression: {}", &self); + duct::cmd(&self.openvpn_bin, self.get_arguments()).unchecked() + } + + /// Returns all arguments that the subprocess would be spawned with. + pub fn get_arguments(&self) -> Vec<OsString> { + let mut args: Vec<OsString> = Self::base_arguments().iter().map(OsString::from).collect(); + + if let Some(ref config) = self.config { + args.push(OsString::from("--config")); + args.push(OsString::from(config.as_os_str())); + } + + args.extend(self.remote_arguments().iter().map(OsString::from)); + args.extend(self.authentication_arguments()); + + if let Some(ref ca) = self.ca { + args.push(OsString::from("--ca")); + args.push(OsString::from(ca.as_os_str())); + } + + if let Some((ref path, ref plugin_args)) = self.plugin { + args.push(OsString::from("--plugin")); + args.push(OsString::from(path)); + args.extend(plugin_args.iter().map(OsString::from)); + } + + args.extend(Self::security_arguments().iter().map(OsString::from)); + + args + } + + fn base_arguments() -> Vec<&'static str> { + let mut args = vec![]; + for arglist in BASE_ARGUMENTS.iter() { + for arg in arglist.iter() { + args.push(*arg); + } + } + args + } + + fn security_arguments() -> Vec<String> { + let mut args = vec![]; + args.push("--tls-cipher".to_owned()); + args.push(ALLOWED_TLS_CIPHERS.join(":")); + args + } + + fn remote_arguments(&self) -> Vec<String> { + let mut args: Vec<String> = vec![]; + if let Some(ref endpoint) = self.remote { + args.push("--proto".to_owned()); + args.push( + match endpoint.protocol { + net::TransportProtocol::Udp => "udp".to_owned(), + net::TransportProtocol::Tcp => "tcp-client".to_owned(), + }, + ); + args.push("--remote".to_owned()); + args.push(endpoint.address.address()); + args.push(endpoint.address.port().to_string()); + } + args + } + + fn authentication_arguments(&self) -> Vec<OsString> { + let mut args = vec![]; + if let Some(ref user_pass_path) = self.user_pass_path { + args.push(OsString::from("--auth-user-pass")); + args.push(OsString::from(user_pass_path)); + } + args + } +} + +impl fmt::Display for OpenVpnCommand { + /// Format the program and arguments of an `OpenVpnCommand` for display. Any non-utf8 data is + /// lossily converted using the utf8 replacement character. + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str(&self.openvpn_bin.to_string_lossy())?; + for arg in self.get_arguments().iter().map(|arg| arg.to_string_lossy()) { + write_argument(fmt, &arg)?; + } + Ok(()) + } +} + +fn write_argument(fmt: &mut fmt::Formatter, arg: &str) -> fmt::Result { + fmt.write_str(" ")?; + let quote = arg.contains(char::is_whitespace); + if quote { + fmt.write_str("\"")?; + } + fmt.write_str(arg)?; + if quote { + fmt.write_str("\"")?; + } + Ok(()) +} + + +#[cfg(test)] +mod tests { + use super::OpenVpnCommand; + use net::{Endpoint, TransportProtocol}; + use std::ffi::OsString; + + #[test] + fn passes_one_remote() { + let remote = Endpoint::new("example.com", 3333, TransportProtocol::Udp); + + let testee_args = OpenVpnCommand::new("").remote(remote).get_arguments(); + + assert!(testee_args.contains(&OsString::from("udp"))); + assert!(testee_args.contains(&OsString::from("example.com"))); + assert!(testee_args.contains(&OsString::from("3333"))); + } + + #[test] + fn passes_plugin_path() { + let path = "./a/path"; + let testee_args = OpenVpnCommand::new("").plugin(path, vec![]).get_arguments(); + assert!(testee_args.contains(&OsString::from("./a/path"))); + } + + #[test] + fn passes_plugin_args() { + let args = vec![String::from("123"), String::from("cde")]; + let testee_args = OpenVpnCommand::new("").plugin("", args).get_arguments(); + assert!(testee_args.contains(&OsString::from("123"))); + assert!(testee_args.contains(&OsString::from("cde"))); + } +} diff --git a/talpid-core/src/process/unix.rs b/talpid-core/src/process/unix.rs new file mode 100644 index 0000000000..64599a6930 --- /dev/null +++ b/talpid-core/src/process/unix.rs @@ -0,0 +1,33 @@ +extern crate libc; + +use duct; +use duct::unix::HandleExt; + +use std::io; +use std::sync::{Arc, mpsc}; +use std::thread; +use std::time::Duration; + +/// Kills a process by first sending it the `SIGTERM` signal and then wait up to `timeout`. If the +/// process has not died after the timeout has expired it is killed. +pub fn nice_kill(handle: Arc<duct::Handle>, timeout: Duration) -> io::Result<()> { + trace!("Sending SIGTERM to child process"); + handle.send_signal(libc::SIGTERM)?; + + if wait_timeout(handle.clone(), timeout) { + debug!("Child process exited from SIGTERM"); + Ok(()) + } else { + debug!("Child process did not exit from SIGTERM, sending SIGKILL"); + handle.kill() + } +} + +/// Wait for a process to die for a maximum of `timeout`. Returns true if the process died within +/// the timeout. Warning, if the process does not exit in the given time, this function will leave +/// a thread running until it does exit. +fn wait_timeout(handle: Arc<duct::Handle>, timeout: Duration) -> bool { + let (stop, stopped) = mpsc::channel(); + thread::spawn(move || { let _ = stop.send(handle.wait().is_ok()); }); + stopped.recv_timeout(timeout).unwrap_or(false) +} diff --git a/talpid-core/src/tunnel/mod.rs b/talpid-core/src/tunnel/mod.rs new file mode 100644 index 0000000000..b05f1283f3 --- /dev/null +++ b/talpid-core/src/tunnel/mod.rs @@ -0,0 +1,171 @@ +use mktemp; +use net; +use openvpn_ffi::OpenVpnPluginEvent; +use process::openvpn::OpenVpnCommand; +use std::fs; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; + +/// A module for all OpenVPN related tunnel management. +pub mod openvpn; + +use self::openvpn::{OpenVpnCloseHandle, OpenVpnMonitor}; + +mod errors { + error_chain!{ + errors { + /// An error indicating there was an error listening for events from the VPN tunnel. + TunnelMonitoringError { + description("Error while setting up or processing events from the VPN tunnel") + } + /// The OpenVPN plugin was not found. + PluginNotFound { + description("No OpenVPN plugin found") + } + /// There was an error when writing authentication credentials to temporary file. + CredentialsWriteError { + description("Error while writing credentials to temporary file") + } + } + } +} +pub use self::errors::*; + + +/// Possible events from the VPN tunnel and the child process managing it. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum TunnelEvent { + /// Sent when the tunnel comes up and is ready for traffic. + Up, + /// Sent when the tunnel goes down. + Down, +} + +impl TunnelEvent { + /// Converts an `OpenVpnPluginEvent` to a `TunnelEvent`. + /// Returns `None` if there is no corresponding `TunnelEvent`. + fn from_openvpn_event(event: &OpenVpnPluginEvent) -> Option<TunnelEvent> { + match *event { + OpenVpnPluginEvent::Up => Some(TunnelEvent::Up), + OpenVpnPluginEvent::RoutePredown => Some(TunnelEvent::Down), + _ => None, + } + } +} + + +/// Abstraction for monitoring a generic VPN tunnel. +pub struct TunnelMonitor { + monitor: OpenVpnMonitor, + _user_pass_file: mktemp::Temp, +} + +impl TunnelMonitor { + /// Creates a new `TunnelMonitor` that connects to the given remote and notifies `on_event` + /// on tunnel state changes. + pub fn new<L>(remote: net::Endpoint, account_token: &str, on_event: L) -> Result<Self> + where L: Fn(TunnelEvent) + Send + Sync + 'static + { + let on_openvpn_event = move |event, _env| match TunnelEvent::from_openvpn_event(&event) { + Some(tunnel_event) => on_event(tunnel_event), + None => debug!("Ignoring OpenVpnEvent {:?}", event), + }; + let user_pass_file = Self::create_user_pass_file(account_token) + .chain_err(|| ErrorKind::CredentialsWriteError)?; + let cmd = Self::create_openvpn_cmd(remote, user_pass_file.as_ref()); + let monitor = openvpn::OpenVpnMonitor::new(cmd, on_openvpn_event, get_plugin_path()?) + .chain_err(|| ErrorKind::TunnelMonitoringError)?; + Ok( + TunnelMonitor { + monitor, + _user_pass_file: user_pass_file, + }, + ) + } + + fn create_openvpn_cmd(remote: net::Endpoint, user_pass_file: &Path) -> OpenVpnCommand { + let mut cmd = OpenVpnCommand::new("openvpn"); + if let Some(config) = get_config_path() { + cmd.config(config); + } + cmd.remote(remote).user_pass(user_pass_file).ca("ca.crt"); + cmd + } + + fn create_user_pass_file(account_token: &str) -> io::Result<mktemp::Temp> { + let path = mktemp::Temp::new_file()?; + debug!( + "Writing user-pass credentials to {}", + path.as_ref().to_string_lossy() + ); + let mut file = fs::File::create(&path)?; + Self::set_user_pass_file_permissions(&file)?; + write!(file, "{}\n-\n", account_token)?; + Ok(path) + } + + #[cfg(unix)] + fn set_user_pass_file_permissions(file: &fs::File) -> io::Result<()> { + use std::os::unix::fs::PermissionsExt; + file.set_permissions(PermissionsExt::from_mode(0o400)) + } + + #[cfg(windows)] + fn set_user_pass_file_permissions(file: &fs::File) -> io::Result<()> { + // TODO(linus): Lock permissions correctly on Windows. + Ok(()) + } + + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle(self.monitor.close_handle()) + } + + /// Consumes the monitor and block until the tunnel exits or there is an error. + pub fn wait(self) -> Result<()> { + self.monitor.wait().chain_err(|| ErrorKind::TunnelMonitoringError) + } +} + + +/// A handle to a `TunnelMonitor` +pub struct CloseHandle(OpenVpnCloseHandle); + +impl CloseHandle { + /// Closes the underlying tunnel, making the `TunnelMonitor::wait` method return. + pub fn close(self) -> io::Result<()> { + self.0.close() + } +} + + +// TODO(linus): Temporary implementation for getting plugin path during development. +fn get_plugin_path() -> Result<PathBuf> { + let dirs = &["./target/debug", "."]; + let filename = if cfg!(target_os = "macos") { + "libtalpid_openvpn_plugin.dylib" + } else if cfg!(unix) { + "libtalpid_openvpn_plugin.so" + } else if cfg!(windows) { + "libtalpid_openvpn_plugin.dll" + } else { + bail!(ErrorKind::PluginNotFound); + }; + + for dir in dirs { + let path = Path::new(dir).join(filename); + if path.exists() { + debug!("Using OpenVPN plugin at {}", path.to_string_lossy()); + return Ok(path); + } + } + Err(ErrorKind::PluginNotFound.into()) +} + +// TODO(linus): Temporary implementation for getting hold of a config location. +// Manually place a working config here or change this string in order to test +fn get_config_path() -> Option<&'static Path> { + let path = Path::new("./openvpn.conf"); + if path.exists() { Some(path) } else { None } +} diff --git a/talpid-core/src/tunnel/openvpn.rs b/talpid-core/src/tunnel/openvpn.rs new file mode 100644 index 0000000000..08c2c14a32 --- /dev/null +++ b/talpid-core/src/tunnel/openvpn.rs @@ -0,0 +1,258 @@ +use duct; +use jsonrpc_core::{Error, IoHandler}; +use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent}; +use process::openvpn::OpenVpnCommand; + +use std::io; +use std::path::Path; +use std::result::Result as StdResult; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::Duration; + +use talpid_ipc; + +mod errors { + error_chain!{ + errors { + /// Unable to start, wait for or kill the OpenVPN process. + ChildProcessError(msg: &'static str) { + description("Unable to start, wait for or kill the OpenVPN process") + display("OpenVPN process error: {}", msg) + } + /// Unable to start or manage the IPC server listening for events from OpenVPN. + EventDispatcherError { + description("Unable to start or manage the event dispatcher IPC server") + } + } + } +} +pub use self::errors::*; + + +lazy_static!{ + static ref OPENVPN_DIE_TIMEOUT: Duration = Duration::from_secs(2); +} + + +/// Struct for monitoring an OpenVPN process. +pub struct OpenVpnMonitor { + child: Arc<duct::Handle>, + event_dispatcher: Option<OpenVpnEventDispatcher>, + closed: Arc<AtomicBool>, +} + +impl OpenVpnMonitor { + /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given + /// path. + pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static, + P: AsRef<Path> + { + let event_dispatcher = OpenVpnEventDispatcher::start(on_event) + .chain_err(|| ErrorKind::EventDispatcherError)?; + + cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]); + let child = cmd.build() + .start() + .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?; + + Ok( + OpenVpnMonitor { + child: Arc::new(child), + event_dispatcher: Some(event_dispatcher), + closed: Arc::new(AtomicBool::new(false)), + }, + ) + } + + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> OpenVpnCloseHandle { + OpenVpnCloseHandle { + child: self.child.clone(), + closed: self.closed.clone(), + } + } + + /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting + /// for the process or in the event dispatcher. + pub fn wait(mut self) -> Result<()> { + match self.wait_result() { + WaitResult::Child(Ok(exit_status)) => { + if exit_status.success() || self.closed.load(Ordering::SeqCst) { + debug!( + "OpenVPN exited, as expected, with exit status: {}", + exit_status + ); + Ok(()) + } else { + error!("OpenVPN died unexpectedly with status: {}", exit_status); + Err(ErrorKind::ChildProcessError("Died unexpectedly").into()) + } + } + WaitResult::Child(Err(e)) => { + error!("OpenVPN process wait error: {}", e); + Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting")) + } + WaitResult::EventDispatcher(result) => { + error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result); + match result { + Ok(()) => Err(ErrorKind::EventDispatcherError.into()), + Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError), + } + } + } + } + + /// Waits for both the child process and the event dispatcher in parallel. After both have + /// returned this returns the earliest result. + fn wait_result(&mut self) -> WaitResult { + let child_wait_handle = self.child.clone(); + let child_close_handle = self.close_handle(); + let event_dispatcher = self.event_dispatcher.take().unwrap(); + let dispatcher_handle = event_dispatcher.close_handle(); + + let (child_tx, rx) = mpsc::channel(); + let dispatcher_tx = child_tx.clone(); + + thread::spawn( + move || { + let result = child_wait_handle.wait().map(|output| output.status); + child_tx.send(WaitResult::Child(result)).unwrap(); + dispatcher_handle.close(); + }, + ); + thread::spawn( + move || { + let result = event_dispatcher.wait(); + dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap(); + let _ = child_close_handle.close(); + }, + ); + + let result = rx.recv().unwrap(); + let _ = rx.recv().unwrap(); + result + } +} + +/// A handle to an `OpenVpnMonitor` for closing it. +pub struct OpenVpnCloseHandle { + child: Arc<duct::Handle>, + closed: Arc<AtomicBool>, +} + +impl OpenVpnCloseHandle { + /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. + pub fn close(self) -> io::Result<()> { + if !self.closed.swap(true, Ordering::SeqCst) { + self.kill_openvpn() + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn kill_openvpn(self) -> io::Result<()> { + ::process::unix::nice_kill(self.child, *OPENVPN_DIE_TIMEOUT) + } + + #[cfg(not(unix))] + fn kill_openvpn(self) -> io::Result<()> { + self.child.kill() + } +} + +/// Internal enum to differentiate between if the child process or the event dispatcher died first. +enum WaitResult { + Child(io::Result<::std::process::ExitStatus>), + EventDispatcher(talpid_ipc::Result<()>), +} + + +/// IPC server for listening to events coming from plugin loaded into OpenVPN. +pub struct OpenVpnEventDispatcher { + server: talpid_ipc::IpcServer, +} + +impl OpenVpnEventDispatcher { + /// Construct and start the IPC server with the given event listener callback. + pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static + { + let rpc = OpenVpnEventApiImpl { on_event }; + let mut io = IoHandler::new(); + io.extend_with(rpc.to_delegate()); + let server = talpid_ipc::IpcServer::start(io.into())?; + Ok(OpenVpnEventDispatcher { server }) + } + + /// Returns the local address this server is listening on. + pub fn address(&self) -> &str { + self.server.address() + } + + /// Creates a handle to this event dispatcher, allowing the listening server to be closed while + /// some other thread is blocked in `wait`. + pub fn close_handle(&self) -> talpid_ipc::CloseHandle { + self.server.close_handle() + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. + pub fn wait(self) -> talpid_ipc::Result<()> { + self.server.wait() + } +} + + +mod api { + use super::*; + build_rpc_trait! { + pub trait OpenVpnEventApi { + #[rpc(name = "openvpn_event")] + fn openvpn_event(&self, + OpenVpnPluginEvent, + OpenVpnEnv) + -> StdResult<(), Error>; + } + } +} +use self::api::*; + +struct OpenVpnEventApiImpl<L> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static +{ + on_event: L, +} + +impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static +{ + fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> { + debug!("OpenVPN event {:?}", event); + (self.on_event)(event, env); + Ok(()) + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[ignore] + fn openvpn_event_dispatcher_server() { + let server = OpenVpnEventDispatcher::start( + |event, env| { + println!("event: {:?}. env: {:?}", event, env); + }, + ) + .unwrap(); + println!("plugin server listening on {}", server.address()); + server.wait().unwrap(); + } +} |
