//! Manage OpenVPN tunnels. #![deny(missing_docs)] use crate::proxy::ProxyMonitor; use process::openvpn::{OpenVpnCommand, OpenVpnProcHandle}; #[cfg(target_os = "linux")] use std::collections::{HashMap, HashSet}; #[cfg(windows)] use std::sync::LazyLock; #[cfg(target_os = "windows")] use std::{ffi::OsString, sync::Arc}; use std::{ fs, io::{self, Write}, path::{Path, PathBuf}, process::ExitStatus, time::Duration, }; #[cfg(target_os = "linux")] use talpid_routing::RequiredRoute; use talpid_tunnel::EventHook; use talpid_types::{ ErrorExt, net::{openvpn, proxy::CustomProxy}, }; use tokio::task; #[cfg(windows)] use widestring::U16CString; #[cfg(windows)] use windows_sys::{Win32::NetworkManagement::Ndis::NET_LUID_LH, core::GUID}; #[cfg(windows)] mod wintun; mod mktemp; mod process; mod proxy; #[cfg(windows)] static ADAPTER_ALIAS: LazyLock = LazyLock::new(|| U16CString::from_str("Mullvad").unwrap()); #[cfg(windows)] static ADAPTER_TUNNEL_TYPE: LazyLock = LazyLock::new(|| U16CString::from_str("Mullvad").unwrap()); #[cfg(windows)] const ADAPTER_GUID: GUID = GUID { data1: 0xAFE43773, data2: 0xE1F8, data3: 0x4EBB, data4: [0x85, 0x36, 0x57, 0x6A, 0xB8, 0x6A, 0xFE, 0x9A], }; /// Results from fallible operations on the OpenVPN tunnel. pub type Result = std::result::Result; /// Errors that can happen when using the OpenVPN tunnel. #[derive(thiserror::Error, Debug)] pub enum Error { /// Failed to initialize the tokio runtime. #[error("Failed to initialize the tokio runtime")] RuntimeError(#[source] io::Error), /// Unable to start, wait for or kill the OpenVPN process. #[error("Error in OpenVPN process management: {0}")] ChildProcessError(&'static str, #[source] io::Error), /// Unable to start the IPC server. #[error("Unable to start the event dispatcher IPC server")] EventDispatcherError(#[source] event_server::Error), /// The OpenVPN event dispatcher exited unexpectedly #[error("The OpenVPN event dispatcher exited unexpectedly")] EventDispatcherExited, /// cannot load wintun.dll #[cfg(windows)] #[error("Failed to load wintun.dll")] WintunDllError(#[source] io::Error), /// cannot create a wintun interface #[cfg(windows)] #[error("Failed to create Wintun adapter")] WintunCreateAdapterError(#[source] io::Error), /// OpenVPN process died unexpectedly #[error("OpenVPN process died unexpectedly")] ChildProcessDied, /// Failed before OpenVPN started #[error("Failed to start OpenVPN")] StartProcessError, /// The OpenVPN binary was not found. #[error("No OpenVPN binary found at {0}")] OpenVpnNotFound(String), /// The OpenVPN plugin was not found. #[error("No OpenVPN plugin found at {0}")] PluginNotFound(String), /// Error while writing credentials to temporary file. #[error("Error while writing credentials to temporary file")] CredentialsWriteError(#[source] io::Error), /// Failures related to the proxy service. #[error("Proxy service failed")] ProxyError(#[source] proxy::Error), /// The map is missing 'dev' #[cfg(target_os = "linux")] #[error("Failed to obtain tunnel interface name")] MissingTunnelInterface, /// The map has no 'route_n' entries #[cfg(target_os = "linux")] #[error("Failed to obtain OpenVPN server")] MissingRemoteHost, /// Cannot parse the remote_n in the provided map #[cfg(target_os = "linux")] #[error("Cannot parse remote host string")] ParseRemoteHost(#[source] std::net::AddrParseError), } impl Error { /// Return whether retrying the operation that caused this error is likely to succeed. pub fn is_recoverable(&self) -> bool { match self { #[cfg(windows)] _ => self.get_tunnel_device_error().is_some(), #[cfg(not(windows))] _ => false, } } /// Get the inner tunnel device error, if there is one #[cfg(target_os = "windows")] pub fn get_tunnel_device_error(&self) -> Option<&io::Error> { match self { Error::WintunCreateAdapterError(error) => Some(error), _ => None, } } } #[cfg(unix)] static OPENVPN_DIE_TIMEOUT: Duration = Duration::from_secs(4); #[cfg(windows)] static OPENVPN_DIE_TIMEOUT: Duration = Duration::from_secs(30); #[cfg(target_os = "macos")] const OPENVPN_PLUGIN_FILENAME: &str = "libtalpid_openvpn_plugin.dylib"; #[cfg(target_os = "linux")] const OPENVPN_PLUGIN_FILENAME: &str = "libtalpid_openvpn_plugin.so"; #[cfg(windows)] const OPENVPN_PLUGIN_FILENAME: &str = "talpid_openvpn_plugin.dll"; #[cfg(unix)] const OPENVPN_BIN_FILENAME: &str = "openvpn"; #[cfg(windows)] const OPENVPN_BIN_FILENAME: &str = "openvpn.exe"; /// Struct for monitoring an OpenVPN process. #[derive(Debug)] pub struct OpenVpnMonitor { prepare_task: tokio::task::JoinHandle>, proxy_monitor: Option>, /// Keep the `TempFile` for the user-pass file in the struct, so it's removed on drop. _user_pass_file: mktemp::TempFile, /// Keep the 'TempFile' for the proxy user-pass file in the struct, so it's removed on drop. _proxy_auth_file: Option, event_server_abort_tx: triggered::Trigger, server_join_handle: task::JoinHandle>, monitor_abort_tx: triggered::Trigger, monitor_abort_rx: triggered::Listener, #[cfg(windows)] _wintun: Arc>, } #[cfg(windows)] #[async_trait::async_trait] trait WintunContext: Send + Sync { fn luid(&self) -> NET_LUID_LH; fn ipv6(&self) -> bool; async fn wait_for_interfaces(&self) -> io::Result<()>; fn prepare_interface(&self) {} } #[cfg(windows)] impl std::fmt::Debug for dyn WintunContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "WintunContext {{ luid: {}, ipv6: {} }}", // SAFETY: It's always safe to interpet a LUID as an u64 unsafe { self.luid().Value }, self.ipv6() ) } } #[cfg(windows)] #[derive(Debug)] struct WintunContextImpl { adapter: wintun::WintunAdapter, wait_v6_interface: bool, _logger: wintun::WintunLoggerHandle, } #[cfg(windows)] #[async_trait::async_trait] impl WintunContext for WintunContextImpl { fn luid(&self) -> NET_LUID_LH { self.adapter.luid() } fn ipv6(&self) -> bool { self.wait_v6_interface } async fn wait_for_interfaces(&self) -> io::Result<()> { let luid = self.adapter.luid(); talpid_windows::net::wait_for_interfaces(luid, true, self.wait_v6_interface).await } fn prepare_interface(&self) { self.adapter.prepare_interface(); } } #[cfg(windows)] impl WintunContextImpl { fn alias(&self) -> U16CString { self.adapter.name() } } impl OpenVpnMonitor { /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given /// path. pub async fn start( event_hook: EventHook, params: &openvpn::TunnelParameters, log_path: Option, resource_dir: &Path, route_manager: talpid_routing::RouteManagerHandle, ) -> Result { let user_pass_file = Self::create_credentials_file(¶ms.config.username, ¶ms.config.password) .map_err(Error::CredentialsWriteError)?; let proxy_auth_file = Self::create_proxy_auth_file(¶ms.proxy).map_err(Error::CredentialsWriteError)?; let user_pass_file_path = user_pass_file.to_path_buf(); let proxy_auth_file_path = proxy_auth_file.as_ref().map(|file| file.to_path_buf()); let proxy_monitor = Self::start_proxy( ¶ms.proxy, #[cfg(target_os = "linux")] params.fwmark, ) .await?; #[cfg(windows)] let wintun = Self::new_wintun_context(params, resource_dir)?; let cmd = Self::create_openvpn_cmd( params, user_pass_file.as_ref(), proxy_auth_file.as_ref().map(AsRef::as_ref), resource_dir, &proxy_monitor, #[cfg(windows)] wintun.alias().to_os_string(), )?; let plugin_path = Self::get_plugin_path(resource_dir)?; #[cfg(target_os = "linux")] let ipv6_enabled = params.generic_options.enable_ipv6; let (event_server_abort_tx, event_server_abort_rx) = triggered::trigger(); let openvpn_init_args = OpenVpnTunnelInitArgs { event_server_abort_tx: event_server_abort_tx.clone(), event_server_abort_rx, plugin_path, log_path, user_pass_file, proxy_auth_file, proxy_monitor, #[cfg(target_os = "linux")] fwmark: params.fwmark, }; Self::new_internal( cmd, openvpn_init_args, event_server::OpenvpnEventProxyImpl { event_hook, user_pass_file_path: user_pass_file_path.clone(), proxy_auth_file_path: proxy_auth_file_path.clone(), abort_server_tx: event_server_abort_tx, #[cfg(any(target_os = "macos", target_os = "windows"))] proxy: params.proxy.clone(), route_manager, #[cfg(target_os = "linux")] ipv6_enabled, }, #[cfg(windows)] Box::new(wintun), ) } #[cfg(windows)] fn new_wintun_context( params: &openvpn::TunnelParameters, resource_dir: &Path, ) -> Result { let dll = wintun::WintunDll::instance(resource_dir).map_err(Error::WintunDllError)?; let wintun_logger = dll.activate_logging(); let wintun_adapter = wintun::WintunAdapter::create( dll, &ADAPTER_ALIAS, &ADAPTER_TUNNEL_TYPE, Some(ADAPTER_GUID), ) .map_err(Error::WintunCreateAdapterError)?; Ok(WintunContextImpl { adapter: wintun_adapter, wait_v6_interface: params.generic_options.enable_ipv6, _logger: wintun_logger, }) } } #[cfg(target_os = "linux")] fn extract_routes(env: &HashMap) -> Result> { let tun_interface = env.get("dev").ok_or(Error::MissingTunnelInterface)?; let tun_node = talpid_routing::Node::device(tun_interface.clone()); let mut routes = HashSet::new(); for network in &["0.0.0.0/0".parse().unwrap(), "::/0".parse().unwrap()] { routes.insert(RequiredRoute::new(*network, tun_node.clone()).use_main_table(false)); } Ok(routes) } struct OpenVpnTunnelInitArgs { event_server_abort_tx: triggered::Trigger, event_server_abort_rx: triggered::Listener, plugin_path: PathBuf, log_path: Option, user_pass_file: mktemp::TempFile, proxy_auth_file: Option, proxy_monitor: Option>, #[cfg(target_os = "linux")] fwmark: u32, } impl OpenVpnMonitor { fn new_internal( mut cmd: C, init_args: OpenVpnTunnelInitArgs, on_event: L, #[cfg(windows)] wintun: Box, ) -> Result> where L: event_server::OpenvpnEventProxy + Send + Sync + 'static, { let event_server_abort_tx = init_args.event_server_abort_tx; let event_server_abort_rx = init_args.event_server_abort_rx; let plugin_path = init_args.plugin_path; let log_path = init_args.log_path; let user_pass_file = init_args.user_pass_file; let proxy_auth_file = init_args.proxy_auth_file; let proxy_monitor = init_args.proxy_monitor; let (server_join_handle, ipc_path) = event_server::start(on_event, event_server_abort_rx) .map_err(Error::EventDispatcherError)?; #[cfg(windows)] let wintun = Arc::new(wintun); #[cfg(target_os = "linux")] cmd.fwmark(init_args.fwmark); cmd.plugin(plugin_path, vec![ipc_path]) .log(log_path.as_deref()); let prepare_task = tokio::spawn(Self::prepare_process( cmd, #[cfg(windows)] wintun.clone(), )); let (monitor_abort_tx, monitor_abort_rx) = triggered::trigger(); let monitor = OpenVpnMonitor { prepare_task, proxy_monitor, _user_pass_file: user_pass_file, _proxy_auth_file: proxy_auth_file, event_server_abort_tx, server_join_handle, monitor_abort_tx, monitor_abort_rx, #[cfg(windows)] _wintun: wintun, }; Ok(monitor) } #[cfg_attr(not(windows), allow(clippy::unused_async))] async fn prepare_process( cmd: C, #[cfg(windows)] wintun: Arc>, ) -> io::Result { #[cfg(windows)] { log::debug!("Wait for IP interfaces"); wintun.wait_for_interfaces().await?; wintun.prepare_interface(); } cmd.start() } /// Creates a handle to this monitor, allowing the tunnel to be closed while some other /// thread is blocked in `wait`. pub fn close_handle(&self) -> OpenVpnCloseHandle { OpenVpnCloseHandle { monitor_abort_tx: self.monitor_abort_tx.clone(), prepare_task: self.prepare_task.abort_handle(), } } /// Consumes the monitor and waits for both proxy and tunnel, as applicable. pub async fn wait(mut self) -> Result<()> { if let Some(mut proxy_monitor) = self.proxy_monitor.take() { let tunnel_close_handle = self.close_handle(); let proxy_close_handle = proxy_monitor.close_handle(); let tunnel_task = async move { let result = self.wait_tunnel().await; let _ = proxy_close_handle.close(); result }; let proxy_task = async move { let result = proxy_monitor.wait().await; tunnel_close_handle.close(); result.map_err(Error::ProxyError) }; join_return_first(tunnel_task, proxy_task).await } else { // No proxy active, wait only for the tunnel. self.wait_tunnel().await } } /// Supplement `inner_wait_tunnel()` with logging and error handling. async fn wait_tunnel(self) -> Result<()> { match self.inner_wait_tunnel().await { WaitResult::Preparation(result) => match result { Err(error) => { log::debug!( "{}", error.display_chain_with_msg("Failed to start OpenVPN") ); Err(Error::StartProcessError) } _ => Ok(()), }, WaitResult::Child(Ok(exit_status)) => { if exit_status.success() { log::debug!( "OpenVPN exited, as expected, with exit status: {}", exit_status ); Ok(()) } else { log::error!("OpenVPN died unexpectedly with status: {}", exit_status); Err(Error::ChildProcessDied) } } WaitResult::Child(Err(e)) => { log::error!("OpenVPN process wait error: {}", e); Err(Error::ChildProcessError("Error when waiting", e)) } WaitResult::EventDispatcher => { log::error!("OpenVPN Event server exited unexpectedly"); Err(Error::EventDispatcherExited) } } } /// Waits for both the child process and the event dispatcher in parallel. After both have /// returned this returns the earliest result. async fn inner_wait_tunnel(self) -> WaitResult { let mut child = match self.prepare_task.await { Ok(Ok(child)) => child, Ok(Err(error)) => { return WaitResult::Preparation(Err(error)); } Err(_) => return WaitResult::Preparation(Ok(())), }; let kill_child = async move { let result = tokio::select! { result = child.wait() => { log::debug!("OpenVPN process exited"); result } _ = self.monitor_abort_rx => { log::debug!("Killing OpenVPN process"); child.kill(); child.wait().await } }; self.event_server_abort_tx.trigger(); WaitResult::Child(result) }; let kill_event_dispatcher = async move { let _ = self.server_join_handle.await; WaitResult::EventDispatcher }; join_return_first(kill_child, kill_event_dispatcher).await } fn create_proxy_auth_file( proxy_settings: &Option, ) -> std::result::Result, io::Error> { if let Some(CustomProxy::Socks5Remote(remote_proxy)) = proxy_settings && let Some(ref proxy_auth) = remote_proxy.auth { let credentials_file = Self::create_credentials_file(proxy_auth.username(), proxy_auth.password())?; return Ok(Some(credentials_file)); } Ok(None) } /// Starts a proxy service, as applicable. async fn start_proxy( proxy_settings: &Option, #[cfg(target_os = "linux")] fwmark: u32, ) -> Result>> { if let Some(settings) = proxy_settings { let proxy_monitor = proxy::start_proxy( settings, #[cfg(target_os = "linux")] fwmark, ) .await .map_err(Error::ProxyError)?; return Ok(Some(proxy_monitor)); } Ok(None) } fn create_credentials_file(username: &str, password: &str) -> io::Result { let temp_file = mktemp::TempFile::new(); log::debug!("Writing credentials to {}", temp_file.as_ref().display()); let mut file = fs::File::create(&temp_file)?; Self::set_user_pass_file_permissions(&file)?; write!(file, "{username}\n{password}\n")?; Ok(temp_file) } #[cfg(unix)] fn set_user_pass_file_permissions(file: &fs::File) -> io::Result<()> { use std::os::unix::fs::PermissionsExt; file.set_permissions(PermissionsExt::from_mode(0o400)) } #[cfg(windows)] fn set_user_pass_file_permissions(_file: &fs::File) -> io::Result<()> { // TODO(linus): Lock permissions correctly on Windows. Ok(()) } fn get_plugin_path(resource_dir: &Path) -> Result { let path = resource_dir.join(OPENVPN_PLUGIN_FILENAME); if path.exists() { log::trace!("Using OpenVPN plugin at {}", path.display()); Ok(path) } else { Err(Error::PluginNotFound(path.display().to_string())) } } fn create_openvpn_cmd( params: &openvpn::TunnelParameters, user_pass_file: &Path, proxy_auth_file: Option<&Path>, resource_dir: &Path, proxy_monitor: &Option>, #[cfg(windows)] alias: OsString, ) -> Result { let mut cmd = OpenVpnCommand::new(Self::get_openvpn_bin(resource_dir)?); if let Some(config) = Self::get_config_path(resource_dir) { cmd.config(config); } cmd.remote(params.config.endpoint) .user_pass(user_pass_file) .tunnel_options(¶ms.options) .enable_ipv6(params.generic_options.enable_ipv6) .ca(resource_dir.join("ca.crt")); #[cfg(windows)] cmd.tunnel_alias(Some(alias)); if let Some(proxy_settings) = params.proxy.clone() { cmd.proxy_settings(proxy_settings); } if let Some(proxy_auth_file) = proxy_auth_file { cmd.proxy_auth(proxy_auth_file); } if let Some(proxy) = proxy_monitor { cmd.proxy_port(proxy.port()); } Ok(cmd) } fn get_openvpn_bin(resource_dir: &Path) -> Result { let path = resource_dir.join(OPENVPN_BIN_FILENAME); if path.exists() { log::trace!("Using OpenVPN at {}", path.display()); Ok(path) } else { Err(Error::OpenVpnNotFound(path.display().to_string())) } } fn get_config_path(resource_dir: &Path) -> Option { let path = resource_dir.join("openvpn.conf"); if path.exists() { Some(path) } else { None } } } /// A handle to an `OpenVpnMonitor` for closing it. #[derive(Debug)] pub struct OpenVpnCloseHandle { monitor_abort_tx: triggered::Trigger, prepare_task: tokio::task::AbortHandle, } impl OpenVpnCloseHandle { /// Begin killing the OpenVPN monitor, making the `OpenVpnMonitor::wait` method return. pub fn close(self) { self.prepare_task.abort(); self.monitor_abort_tx.trigger(); } } /// Internal enum to differentiate between if the child process or the event dispatcher died first. #[derive(Debug)] enum WaitResult { Preparation(io::Result<()>), Child(io::Result), EventDispatcher, } /// Trait for types acting as OpenVPN process starters for `OpenVpnMonitor`. pub trait OpenVpnBuilder { /// The type of handles to subprocesses this builder produces. type ProcessHandle: ProcessHandle; /// Set the OpenVPN plugin to the given values. fn plugin(&mut self, path: impl AsRef, args: Vec) -> &mut Self; /// Set the OpenVPN log file path to use. fn log(&mut self, log_path: Option>) -> &mut Self; /// Spawn the subprocess and return a handle. fn start(&self) -> io::Result; /// Sets the firewall mark for the connection. #[cfg(target_os = "linux")] fn fwmark(&mut self, fwmark: u32) -> &mut Self; } /// Trait for types acting as handles to subprocesses for `OpenVpnMonitor` #[async_trait::async_trait] pub trait ProcessHandle: Send + Sync + 'static { /// Block until the subprocess exits or there is an error in the wait syscall. async fn wait(&mut self) -> io::Result; /// Kill the subprocess without waiting for it to complete. fn kill(&mut self); } impl OpenVpnBuilder for OpenVpnCommand { type ProcessHandle = OpenVpnProcHandle; fn plugin(&mut self, path: impl AsRef, args: Vec) -> &mut Self { self.plugin(path, args) } fn log(&mut self, log_path: Option>) -> &mut Self { if let Some(log_path) = log_path { self.log(log_path) } else { self } } fn start(&self) -> io::Result { OpenVpnProcHandle::new(&mut self.build()) } #[cfg(target_os = "linux")] fn fwmark(&mut self, fwmark: u32) -> &mut Self { self.fwmark(Some(fwmark)); self } } #[async_trait::async_trait] impl ProcessHandle for OpenVpnProcHandle { async fn wait(&mut self) -> io::Result { OpenVpnProcHandle::wait(self).await } fn kill(&mut self) { OpenVpnProcHandle::kill(self, OPENVPN_DIE_TIMEOUT) } } /// Join two futures and return the result of the first one to complete. async fn join_return_first( future1: impl std::future::Future, future2: impl std::future::Future, ) -> R { futures::pin_mut!(future1); futures::pin_mut!(future2); match futures::future::select(future1, future2).await { futures::future::Either::Left((result, other)) => { let _ = other.await; result } futures::future::Either::Right((result, other)) => { let _ = other.await; result } } } mod event_server { use futures::stream::TryStreamExt; use parity_tokio_ipc::Endpoint as IpcEndpoint; use std::{ collections::{HashMap, HashSet}, pin::Pin, task::{Context, Poll}, }; use talpid_tunnel::{EventHook, TunnelMetadata}; use talpid_types::ErrorExt; #[cfg(any(target_os = "macos", target_os = "windows"))] use talpid_types::net::proxy::CustomProxy; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::{ Request, Response, transport::{Server, server::Connected}, }; #[allow(clippy::derive_partial_eq_without_eq)] mod proto { tonic::include_proto!("talpid_openvpn_plugin"); } pub use proto::{ EventDetails, openvpn_event_proxy_server::{OpenvpnEventProxy, OpenvpnEventProxyServer}, }; #[derive(thiserror::Error, Debug)] pub enum Error { /// Failure to set up the IPC server. #[error("Failed to create pipe or Unix socket")] StartServer(#[from] std::io::Error), /// An error occurred while the server was running. #[error("Tonic error")] TonicError(#[from] tonic::transport::Error), } /// Implements a gRPC service used to process events sent to by OpenVPN. pub struct OpenvpnEventProxyImpl { pub event_hook: EventHook, pub user_pass_file_path: super::PathBuf, pub proxy_auth_file_path: Option, pub abort_server_tx: triggered::Trigger, #[cfg(any(target_os = "macos", target_os = "windows"))] pub proxy: Option, pub route_manager: talpid_routing::RouteManagerHandle, #[cfg(target_os = "linux")] pub ipv6_enabled: bool, } impl OpenvpnEventProxyImpl { async fn up_inner( &self, request: Request, ) -> std::result::Result, tonic::Status> { let env = request.into_inner().env; self.event_hook .clone() .on_event(talpid_tunnel::TunnelEvent::InterfaceUp( Self::get_tunnel_metadata(&env)?, talpid_types::net::AllowedTunnelTraffic::All, )) .await; Ok(Response::new(())) } async fn route_up_inner( &self, request: Request, ) -> std::result::Result, tonic::Status> { let env = request.into_inner().env; let _ = tokio::fs::remove_file(&self.user_pass_file_path).await; if let Some(file_path) = &self.proxy_auth_file_path { let _ = tokio::fs::remove_file(file_path).await; } let mut routes = HashSet::new(); #[cfg(not(target_os = "linux"))] if let Some(CustomProxy::Socks5Local(proxy_settings)) = &self.proxy { let network = proxy_settings.remote_endpoint.address.ip().into(); let node = talpid_routing::NetNode::DefaultNode; let route = talpid_routing::RequiredRoute::new(network, node); routes.insert(route); } let route_handle = self.route_manager.clone(); #[cfg(target_os = "linux")] { let ipv6_enabled = self.ipv6_enabled; if let Err(error) = route_handle.create_routing_rules(ipv6_enabled).await { log::error!("{}", error.display_chain()); return Err(tonic::Status::failed_precondition("Failed to add routes")); } let extracted_routes = super::extract_routes(&env) .map_err(|err| { log::error!("{}", err.display_chain_with_msg("Failed to obtain routes")); tonic::Status::failed_precondition("Failed to obtain routes") })? .into_iter() .filter(|route| route.prefix.is_ipv4() || ipv6_enabled); routes.extend(extracted_routes); } let metadata = Self::get_tunnel_metadata(&env)?; #[cfg(windows)] { let tunnel_device = metadata.interface.clone(); let luid = talpid_windows::net::luid_from_alias(tunnel_device).map_err(|error| { log::error!("{}", error.display_chain_with_msg("luid_from_alias failed")); tonic::Status::unavailable("failed to obtain interface luid") })?; talpid_windows::net::wait_for_addresses(luid) .await .map_err(|error| { log::error!( "{}", error.display_chain_with_msg("wait_for_addresses failed") ); tonic::Status::unavailable("wait_for_addresses failed") })?; } if let Err(error) = route_handle.add_routes(routes).await { log::error!("{}", error.display_chain()); return Err(tonic::Status::failed_precondition("Failed to add routes")); } self.event_hook .clone() .on_event(talpid_tunnel::TunnelEvent::Up(metadata)) .await; Ok(Response::new(())) } #[allow(clippy::result_large_err)] fn get_tunnel_metadata( env: &HashMap, ) -> std::result::Result { let tunnel_alias = env .get("dev") .ok_or_else(|| tonic::Status::invalid_argument("missing tunnel alias"))? .clone(); let mut ips = vec![ env.get("ifconfig_local") .ok_or_else(|| { tonic::Status::invalid_argument("missing \"ifconfig_local\" in up event") })? .parse() .map_err(|_| tonic::Status::invalid_argument("Invalid tunnel IPv4 address"))?, ]; if let Some(ipv6_address) = env.get("ifconfig_ipv6_local") { ips.push( ipv6_address.parse().map_err(|_| { tonic::Status::invalid_argument("Invalid tunnel IPv6 address") })?, ); } let ipv4_gateway = env .get("route_vpn_gateway") .ok_or_else(|| { tonic::Status::invalid_argument("No \"route_vpn_gateway\" in tunnel up event") })? .parse() .map_err(|_| { tonic::Status::invalid_argument("Invalid tunnel gateway IPv4 address") })?; let ipv6_gateway = if let Some(ipv6_address) = env.get("route_ipv6_gateway_1") { Some(ipv6_address.parse().map_err(|_| { tonic::Status::invalid_argument("Invalid tunnel gateway IPv6 address") })?) } else { None }; Ok(TunnelMetadata { interface: tunnel_alias, ips, ipv4_gateway, ipv6_gateway, }) } } #[tonic::async_trait] impl OpenvpnEventProxy for OpenvpnEventProxyImpl { async fn auth_failed( &self, request: Request, ) -> std::result::Result, tonic::Status> { let env = request.into_inner().env; self.event_hook .clone() .on_event(talpid_tunnel::TunnelEvent::AuthFailed( env.get("auth_failed_reason").cloned(), )) .await; Ok(Response::new(())) } async fn up( &self, request: Request, ) -> std::result::Result, tonic::Status> { self.up_inner(request).await.inspect_err(|_| { self.abort_server_tx.trigger(); }) } async fn route_up( &self, request: Request, ) -> std::result::Result, tonic::Status> { self.route_up_inner(request).await.inspect_err(|_| { self.abort_server_tx.trigger(); }) } async fn route_predown( &self, _request: Request, ) -> std::result::Result, tonic::Status> { self.event_hook .clone() .on_event(talpid_tunnel::TunnelEvent::Down) .await; Ok(Response::new(())) } } pub fn start( event_proxy: L, abort_rx: triggered::Listener, ) -> std::result::Result<(tokio::task::JoinHandle>, String), Error> where L: OpenvpnEventProxy + Sync + Send + 'static, { let uuid = uuid::Uuid::new_v4().to_string(); let ipc_path = if cfg!(windows) { format!("//./pipe/talpid-openvpn-{uuid}") } else { format!("/tmp/talpid-openvpn-{uuid}") }; let endpoint = IpcEndpoint::new(ipc_path.clone()); let incoming = endpoint.incoming().map_err(Error::StartServer)?; Ok(( tokio::spawn(async move { Server::builder() .add_service(OpenvpnEventProxyServer::new(event_proxy)) .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx) .await .map_err(Error::TonicError) }), ipc_path, )) } #[derive(Debug)] pub struct StreamBox(pub T); impl Connected for StreamBox { type ConnectInfo = Option<()>; fn connect_info(&self) -> Self::ConnectInfo { None } } impl AsyncRead for StreamBox { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } impl AsyncWrite for StreamBox { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.0).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_shutdown(cx) } } } #[cfg(test)] mod tests { use super::*; use crate::mktemp::TempFile; use std::sync::{Arc, Mutex}; #[cfg(windows)] #[derive(Debug)] struct TestWintunContext {} #[cfg(windows)] #[async_trait::async_trait] impl WintunContext for TestWintunContext { fn luid(&self) -> NET_LUID_LH { NET_LUID_LH { Value: 0u64 } } fn ipv6(&self) -> bool { false } async fn wait_for_interfaces(&self) -> io::Result<()> { Ok(()) } } struct TestOpenvpnEventProxy {} #[async_trait::async_trait] impl event_server::OpenvpnEventProxy for TestOpenvpnEventProxy { async fn auth_failed( &self, _request: tonic::Request, ) -> std::result::Result, tonic::Status> { Ok(tonic::Response::new(())) } async fn up( &self, _request: tonic::Request, ) -> std::result::Result, tonic::Status> { Ok(tonic::Response::new(())) } async fn route_up( &self, _request: tonic::Request, ) -> std::result::Result, tonic::Status> { Ok(tonic::Response::new(())) } async fn route_predown( &self, _request: tonic::Request, ) -> std::result::Result, tonic::Status> { Ok(tonic::Response::new(())) } } #[derive(Debug, Default, Clone)] struct TestOpenVpnBuilder { pub plugin: Arc>>, pub log: Arc>>, pub process_handle: Option, } impl OpenVpnBuilder for TestOpenVpnBuilder { type ProcessHandle = TestProcessHandle; fn plugin(&mut self, path: impl AsRef, _args: Vec) -> &mut Self { *self.plugin.lock().unwrap() = Some(path.as_ref().to_path_buf()); self } fn log(&mut self, log: Option>) -> &mut Self { *self.log.lock().unwrap() = log.as_ref().map(|path| path.as_ref().to_path_buf()); self } #[cfg(target_os = "linux")] fn fwmark(&mut self, _fwmark: u32) -> &mut Self { self } fn start(&self) -> io::Result { self.process_handle .ok_or_else(|| io::Error::other("failed to start")) } } #[derive(Debug, Copy, Clone)] struct TestProcessHandle { exit_code: i32, forever: bool, } impl TestProcessHandle { pub fn immediate(exit_code: i32) -> Self { Self { exit_code, forever: false, } } pub fn run_forever() -> Self { Self { exit_code: 0, forever: true, } } fn status(&self) -> ExitStatus { #[cfg(windows)] { use std::os::windows::process::ExitStatusExt; ExitStatus::from_raw(self.exit_code as u32) } #[cfg(unix)] { use std::os::unix::process::ExitStatusExt; ExitStatus::from_raw(self.exit_code) } } } #[async_trait::async_trait] impl ProcessHandle for TestProcessHandle { async fn wait(&mut self) -> io::Result { if self.forever { let _: () = futures::future::pending().await; } Ok(self.status()) } fn kill(&mut self) {} } fn create_init_args_plugin_log( plugin_path: PathBuf, log_path: Option, ) -> OpenVpnTunnelInitArgs { let (event_server_abort_tx, event_server_abort_rx) = triggered::trigger(); OpenVpnTunnelInitArgs { event_server_abort_tx, event_server_abort_rx, plugin_path, log_path, user_pass_file: TempFile::new(), proxy_auth_file: None, proxy_monitor: None, #[cfg(target_os = "linux")] fwmark: 0, } } fn create_init_args() -> OpenVpnTunnelInitArgs { create_init_args_plugin_log("".into(), None) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn sets_plugin() { let builder = TestOpenVpnBuilder::default(); let openvpn_init_args = create_init_args_plugin_log("./my_test_plugin".into(), None); let _ = OpenVpnMonitor::new_internal( builder.clone(), openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ); assert_eq!( Some(PathBuf::from("./my_test_plugin")), *builder.plugin.lock().unwrap() ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn sets_log() { let builder = TestOpenVpnBuilder::default(); let openvpn_init_args = create_init_args_plugin_log("".into(), Some(PathBuf::from("./my_test_log_file"))); let _ = OpenVpnMonitor::new_internal( builder.clone(), openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ); assert_eq!( Some(PathBuf::from("./my_test_log_file")), *builder.log.lock().unwrap() ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exit_successfully() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle::immediate(0)), ..Default::default() }; let openvpn_init_args = create_init_args(); let testee = OpenVpnMonitor::new_internal( builder, openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ) .unwrap(); assert!(testee.wait().await.is_ok()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exit_error() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle::immediate(1)), ..Default::default() }; let openvpn_init_args = create_init_args(); let testee = OpenVpnMonitor::new_internal( builder, openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ) .unwrap(); assert!(testee.wait().await.is_err()); } /// Test that the `OpenVpnMonitor` stops when the close handle closes it. #[tokio::test(flavor = "current_thread", start_paused = true)] async fn wait_closed() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle::run_forever()), ..Default::default() }; let openvpn_init_args = create_init_args(); let testee = OpenVpnMonitor::new_internal( builder, openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ) .unwrap(); testee.close_handle().close(); tokio::time::timeout(std::time::Duration::from_secs(10), testee.wait()) .await .expect("expected close handle to stop monitor") .expect("expected successful result"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn failed_process_start() { let builder = TestOpenVpnBuilder::default(); let openvpn_init_args = create_init_args(); let result = OpenVpnMonitor::new_internal( builder, openvpn_init_args, TestOpenvpnEventProxy {}, #[cfg(windows)] Box::new(TestWintunContext {}), ) .unwrap(); match result.wait().await { Err(Error::StartProcessError) => (), _ => panic!("Wrong error"), } } }