use super::{config::TEST_CONFIG, Error, TestContext, WAIT_FOR_TUNNEL_STATE_TIMEOUT}; use crate::{ mullvad_daemon::RpcClientProvider, network_monitor::{ self, start_packet_monitor, MonitorOptions, MonitorUnexpectedlyStopped, PacketMonitor, }, tests::{ account::{clear_devices, new_device_client}, helpers, }, }; use anyhow::{anyhow, bail, ensure, Context}; use futures::StreamExt; use mullvad_management_interface::{client::DaemonEvent, MullvadProxyClient}; use mullvad_relay_selector::{ query::RelayQuery, GetRelay, RelaySelector, SelectorConfig, WireguardConfig, }; use mullvad_types::{ constraints::Constraint, relay_constraints::{ GeographicLocationConstraint, LocationConstraint, RelayConstraints, RelaySettings, }, relay_list::Relay, states::TunnelState, }; use pcap::Direction; use pnet_packet::ip::IpNextHeaderProtocols; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, time::{Duration, Instant}, }; use talpid_types::net::wireguard::{PeerConfig, PrivateKey, TunnelConfig}; use test_rpc::{ meta::Os, mullvad_daemon::ServiceStatus, package::Package, AmIMullvad, ServiceClient, SpawnOpts, }; use tokio::time::sleep; pub const THROTTLE_RETRY_DELAY: Duration = Duration::from_secs(120); const CHECKER_FILENAME_WINDOWS: &str = "connection-checker.exe"; const CHECKER_FILENAME_UNIX: &str = "connection-checker"; const AM_I_MULLVAD_TIMEOUT_S: u64 = 10; const LEAK_TIMEOUT_S: u64 = 1; /// Timeout of [ConnCheckerHandle::check_connection]. const CONN_CHECKER_TIMEOUT: Duration = Duration::from_secs( AM_I_MULLVAD_TIMEOUT_S // https://am.i.mullvad.net timeout + LEAK_TIMEOUT_S // leak-tcp timeout + LEAK_TIMEOUT_S // leak-icmp timeout + 1, // plus some extra grace time ); #[macro_export] macro_rules! assert_tunnel_state { ($mullvad_client:expr, $pattern:pat) => {{ let state = $mullvad_client.get_tunnel_state().await?; assert!(matches!(state, $pattern), "state: {:?}", state); }}; } /// Install the app cleanly, failing if the installer doesn't succeed /// or if the VPN service is not running afterwards. pub async fn install_app( rpc: &ServiceClient, app_filename: &str, rpc_provider: &RpcClientProvider, ) -> anyhow::Result { // install package log::info!("Installing app '{}'", app_filename); rpc.install_app(get_package_desc(app_filename)).await?; // verify that daemon is running if rpc.mullvad_daemon_get_status().await? != ServiceStatus::Running { bail!(Error::DaemonNotRunning); } // Set the log level to trace rpc.set_daemon_log_level(test_rpc::mullvad_daemon::Verbosity::Trace) .await?; replace_openvpn_certificate(rpc).await?; // Override env vars rpc.set_daemon_environment(get_app_env().await?).await?; // Wait for the relay list to be updated let mut mullvad_client = rpc_provider.new_client().await; helpers::ensure_updated_relay_list(&mut mullvad_client) .await .context("Failed to update relay list")?; Ok(mullvad_client) } /// Replace the OpenVPN CA certificate which is currently used by the installed Mullvad App. /// This needs to be invoked after reach (re)installation to use the custom OpenVPN certificate. async fn replace_openvpn_certificate(rpc: &ServiceClient) -> Result<(), Error> { const DEST_CERT_FILENAME: &str = "ca.crt"; let dest_dir = match TEST_CONFIG.os { Os::Windows => "C:\\Program Files\\Mullvad VPN\\resources", Os::Linux => "/opt/Mullvad VPN/resources", Os::Macos => "/Applications/Mullvad VPN.app/Contents/Resources", }; let dest = Path::new(dest_dir) .join(DEST_CERT_FILENAME) .as_os_str() .to_string_lossy() .into_owned(); rpc.write_file(dest, TEST_CONFIG.openvpn_certificate.to_vec()) .await .map_err(Error::Rpc) } pub fn get_package_desc(name: &str) -> Package { Package { path: Path::new(&TEST_CONFIG.artifacts_dir).join(name), } } /// Reboot the guest virtual machine. /// /// # macOS /// The tunnel must be reconfigured after the virtual machine is up, /// or macOS refuses to assign an IP. The reasons for this are poorly understood. pub async fn reboot(rpc: &mut ServiceClient) -> Result<(), Error> { rpc.reboot().await?; #[cfg(target_os = "macos")] crate::vm::network::macos::configure_tunnel() .await .map_err(|error| Error::Other(format!("Failed to recreate custom wg tun: {error}")))?; Ok(()) } #[derive(Debug, Default)] pub struct ProbeResult { tcp: usize, udp: usize, icmp: usize, } impl ProbeResult { pub fn all(&self) -> bool { self.tcp > 0 && self.udp > 0 && self.icmp > 0 } pub fn none(&self) -> bool { !self.any() } pub fn any(&self) -> bool { self.tcp > 0 || self.udp > 0 || self.icmp > 0 } } /// Return whether the guest exit IP is a Mullvad relay pub async fn using_mullvad_exit(rpc: &ServiceClient) -> bool { log::info!("Test whether exit IP is a mullvad relay"); geoip_lookup_with_retries(rpc) .await .unwrap() .mullvad_exit_ip } /// Get VPN tunnel interface name pub async fn get_tunnel_interface(client: &mut MullvadProxyClient) -> anyhow::Result { match client.get_tunnel_state().await? { TunnelState::Connecting { endpoint, .. } | TunnelState::Connected { endpoint, .. } => { let Some(tunnel_interface) = endpoint.tunnel_interface else { bail!("Unknown tunnel interface"); }; Ok(tunnel_interface) } _ => bail!("Tunnel is not up"), } } /// Sends a number of probes and returns the number of observed packets (UDP, TCP, or ICMP) pub async fn send_guest_probes( rpc: ServiceClient, interface: String, destination: SocketAddr, ) -> ProbeResult { const MONITOR_DURATION: Duration = Duration::from_secs(8); let pktmon = start_packet_monitor( move |packet| packet.destination.ip() == destination.ip(), MonitorOptions { direction: Some(network_monitor::Direction::In), timeout: Some(MONITOR_DURATION), ..Default::default() }, ) .await; let send_handle = tokio::spawn(send_guest_probes_without_monitor( rpc, Some(interface), destination, )); let monitor_result = pktmon.wait().await.unwrap(); send_handle.abort(); let _ = send_handle.await; let mut result = ProbeResult::default(); for pkt in monitor_result.packets { match pkt.protocol { IpNextHeaderProtocols::Tcp => { result.tcp = result.tcp.saturating_add(1); } IpNextHeaderProtocols::Udp => { result.udp = result.udp.saturating_add(1); } IpNextHeaderProtocols::Icmp => { result.icmp = result.icmp.saturating_add(1); } _ => (), } } result } /// Send one probe per transport protocol to `destination` without running a packet monitor pub async fn send_guest_probes_without_monitor( rpc: ServiceClient, interface: Option, destination: SocketAddr, ) { let bind_addr = if let Some(ref interface) = interface { SocketAddr::new( rpc.get_interface_ip(interface.clone()) .await .expect("failed to obtain interface IP"), 0, ) } else { "0.0.0.0:0".parse().unwrap() }; let tcp_rpc = rpc.clone(); let tcp_interface = interface.clone(); let tcp_send = async move { tcp_rpc .send_tcp(tcp_interface, bind_addr, destination) .await }; let udp_rpc = rpc.clone(); let udp_interface = interface.clone(); let udp_send = async move { udp_rpc .send_udp(udp_interface, bind_addr, destination) .await }; let icmp = async move { ping_with_timeout(&rpc, destination.ip(), interface).await }; let _ = tokio::join!(tcp_send, udp_send, icmp); } pub async fn ping_with_timeout( rpc: &ServiceClient, dest: IpAddr, interface: Option, ) -> Result<(), Error> { const DEFAULT_PING_SIZE: usize = 64; rpc.send_ping(dest, interface, DEFAULT_PING_SIZE) .await .map_err(Error::Rpc) } pub async fn ping_sized_with_timeout( rpc: &ServiceClient, dest: IpAddr, interface: Option, size: usize, ) -> Result<(), Error> { rpc.send_ping(dest, interface, size) .await .map_err(Error::Rpc) } /// Return the first address that `host` resolves to pub async fn resolve_hostname_with_retries( host: impl tokio::net::ToSocketAddrs, ) -> std::io::Result { const MAX_ATTEMPTS: usize = 10; const RETRY_DELAY: Duration = Duration::from_secs(5); let mut last_error = None; let mut attempt = 0; loop { if attempt >= MAX_ATTEMPTS { break Err(last_error.unwrap_or(std::io::Error::other("lookup timed out"))); } attempt += 1; match tokio::net::lookup_host(&host).await { Ok(mut addrs) => { if let Some(addr) = addrs.next() { // done break Ok(addr); } } Err(err) => last_error = Some(err), } tokio::time::sleep(RETRY_DELAY).await; } } /// Get the mac address (if any) of a network interface (on the test-manager machine). #[cfg(target_os = "linux")] // not used on macos pub fn get_interface_mac(interface: &str) -> anyhow::Result> { let addrs = nix::ifaddrs::getifaddrs().map_err(|error| { log::error!("Failed to obtain interfaces: {}", error); test_rpc::Error::Syscall })?; let mut interface_exists = false; let mac_addr = addrs .filter(|addr| addr.interface_name == interface) .find_map(|addr| { // sadly, the only way of distinguishing between "iface doesn't exist" and // "iface has no mac addr" is to check if the interface appears anywhere in the list. interface_exists = true; let addr = addr.address.as_ref()?; let link_addr = addr.as_link_addr()?; let mac_addr = link_addr.addr()?; Some(mac_addr) }); if interface_exists { Ok(mac_addr) } else { bail!("Interface not found: {interface:?}") } } /// Get the index of a network interface (on the test-manager machine). #[cfg(target_os = "linux")] // not used on macos pub fn get_interface_index(interface: &str) -> anyhow::Result { use nix::errno::Errno; use std::ffi::CString; let interface = CString::new(interface).context(anyhow!( "Failed to turn interface name {interface:?} into cstr" ))?; match unsafe { libc::if_nametoindex(interface.as_ptr()) } { 0 => { let err = Errno::last(); Err(err).context("Failed to get interface index") } i => Ok(i), } } /// Log in and retry if it fails due to throttling pub async fn login_with_retries( mullvad_client: &mut MullvadProxyClient, ) -> Result<(), mullvad_management_interface::Error> { log::debug!("Logging in/generating device"); loop { match mullvad_client .login_account(TEST_CONFIG.account_number.clone()) .await { Err(mullvad_management_interface::Error::Rpc(status)) if status.message().to_uppercase().contains("THROTTLED") => { // Work around throttling errors by sleeping log::debug!( "Login failed due to throttling. Sleeping for {} seconds", THROTTLE_RETRY_DELAY.as_secs() ); tokio::time::sleep(THROTTLE_RETRY_DELAY).await; } Err(err) => break Err(err), Ok(_) => break Ok(()), } } } /// Ensure that the test runner is logged in to an account. /// /// This will first check whether we are logged in. If not, it will also try to login /// on your behalf. If this function returns without any errors, we are logged in to a valid /// account. pub async fn ensure_logged_in(mullvad_client: &mut MullvadProxyClient) -> anyhow::Result<()> { if !matches!( mullvad_client.update_device().await, Err(mullvad_management_interface::Error::DeviceNotFound) ) && mullvad_client.get_device().await?.is_logged_in() { return Ok(()); } log::info!("Current device not logged in. Clearing devices and logging in."); // We are apparently not logged in already.. Try to log in. clear_devices( &new_device_client() .await .context("Failed to create device client")?, ) .await .context("failed to clear devices")?; login_with_retries(mullvad_client) .await .context("Failed to log in")?; Ok(()) } /// Try to connect to a Mullvad Tunnel. /// /// # Returns /// - `Result::Ok(new_state)` if the daemon successfully connected to a tunnel /// - `Result::Err` if: /// - The daemon failed to even begin connecting. Then [`Error::Rpc`] is returned. /// - The daemon started to connect but ended up in the [`TunnelState::Error`] state. Then /// [`Error::UnexpectedErrorState`] is returned pub async fn connect_and_wait( mullvad_client: &mut MullvadProxyClient, ) -> Result { log::info!("Connecting"); let initial_time = Instant::now(); mullvad_client.connect_tunnel().await?; let new_state = wait_for_tunnel_state(mullvad_client.clone(), |state| { matches!( state, TunnelState::Connected { .. } | TunnelState::Error(..) ) }) .await?; if let TunnelState::Error(error_state) = new_state { return Err(Error::UnexpectedErrorState(error_state)); } log::info!( "Connected after {} seconds", initial_time.elapsed().as_secs() ); Ok(new_state) } pub async fn disconnect_and_wait(mullvad_client: &mut MullvadProxyClient) -> Result<(), Error> { log::debug!("Disconnecting"); mullvad_client.disconnect_tunnel().await?; wait_for_tunnel_state(mullvad_client.clone(), |state| { matches!(state, TunnelState::Disconnected { .. }) }) .await?; log::debug!("Disconnected"); Ok(()) } pub async fn wait_for_tunnel_state( mut rpc: MullvadProxyClient, accept_state_fn: impl Fn(&mullvad_types::states::TunnelState) -> bool, ) -> Result { let events = rpc .events_listen() .await .map_err(|status| Error::Daemon(format!("Failed to get event stream: {}", status)))?; let state = rpc .get_tunnel_state() .await .map_err(|error| Error::Daemon(format!("Failed to get tunnel state: {:?}", error)))?; if accept_state_fn(&state) { return Ok(state); } find_next_tunnel_state(events, accept_state_fn).await } pub async fn find_next_tunnel_state( stream: impl futures::Stream> + Unpin, accept_state_fn: impl Fn(&mullvad_types::states::TunnelState) -> bool, ) -> Result { tokio::time::timeout( WAIT_FOR_TUNNEL_STATE_TIMEOUT, find_daemon_event(stream, |daemon_event| match daemon_event { DaemonEvent::TunnelState(state) if accept_state_fn(&state) => Some(state), _ => None, }), ) .await .map_err(|_error| Error::Daemon(String::from("Tunnel event listener timed out")))? } pub async fn find_daemon_event( mut event_stream: impl futures::Stream> + Unpin, accept_event: Accept, ) -> Result where Accept: Fn(DaemonEvent) -> Option, { loop { match event_stream.next().await { Some(Ok(daemon_event)) => match accept_event(daemon_event) { Some(accepted_event) => break Ok(accepted_event), None => continue, }, Some(Err(status)) => { break Err(Error::Daemon(format!( "Failed to get next event: {}", status ))) } None => break Err(Error::Daemon(String::from("Lost daemon event stream"))), } } } /// Set environment variables specified by `env` and restart the Mullvad daemon. /// Returns a new [rpc client][`MullvadProxyClient`], since the old client *probably* /// can't communicate with the new daemon. /// /// # Note /// This is just a thin wrapper around [`ServiceClient::set_daemon_environment`] which also /// invalidates the old [`MullvadProxyClient`]. pub async fn restart_daemon_with( rpc: &ServiceClient, test_context: &TestContext, _: MullvadProxyClient, // Just consume the old proxy client env: Env, ) -> Result where Env: IntoIterator, K: Into, V: Into, { rpc.set_daemon_environment(env).await?; // Need to create a new `mullvad_client` here after the restart // otherwise we *probably* can't communicate with the daemon. Ok(test_context.rpc_provider.new_client().await) } pub async fn geoip_lookup_with_retries(rpc: &ServiceClient) -> Result { const MAX_ATTEMPTS: usize = 5; const BEFORE_RETRY_DELAY: Duration = Duration::from_secs(2); let mut attempt = 0; loop { let result = rpc .geoip_lookup(TEST_CONFIG.mullvad_host.to_owned()) .await .map_err(Error::GeoipLookup); attempt += 1; if result.is_ok() || attempt >= MAX_ATTEMPTS { return result; } tokio::time::sleep(BEFORE_RETRY_DELAY).await; } } pub struct AbortOnDrop(Option>); impl AbortOnDrop { pub fn new(inner: tokio::task::JoinHandle) -> AbortOnDrop { AbortOnDrop(Some(inner)) } } impl Drop for AbortOnDrop { fn drop(&mut self) { if let Some(task) = self.0.take() { task.abort(); } } } pub async fn apply_settings_from_relay_query( mullvad_client: &mut MullvadProxyClient, query: RelayQuery, ) -> Result<(), Error> { let (constraints, bridge_state, bridge_settings, obfuscation) = query.into_settings(); mullvad_client .set_relay_settings(constraints.into()) .await .map_err(|error| Error::Daemon(format!("Failed to set relay settings: {}", error)))?; mullvad_client .set_bridge_state(bridge_state) .await .map_err(|error| Error::Daemon(format!("Failed to set bridge state: {}", error)))?; mullvad_client .set_bridge_settings(bridge_settings) .await .map_err(|error| Error::Daemon(format!("Failed to set bridge settings: {}", error)))?; mullvad_client .set_obfuscation_settings(obfuscation) .await .map_err(|error| Error::Daemon(format!("Failed to set obfuscation settings: {}", error))) } pub async fn set_relay_settings( mullvad_client: &mut MullvadProxyClient, relay_settings: impl Into, ) -> Result<(), Error> { mullvad_client .set_relay_settings(relay_settings.into()) .await .map_err(|error| Error::Daemon(format!("Failed to set relay settings: {}", error))) } /// Wait for the relay list to be updated, to make sure we have the overridden one. /// Time out after a while. pub async fn ensure_updated_relay_list( mullvad_client: &mut MullvadProxyClient, ) -> Result<(), mullvad_management_interface::Error> { let mut events = mullvad_client.events_listen().await?; mullvad_client.update_relay_locations().await?; let _ = tokio::time::timeout(std::time::Duration::from_secs(3), async move { while let Some(Ok(event)) = events.next().await { if matches!(event, DaemonEvent::RelayList(_)) { log::debug!("Received new relay list"); break; } } }) .await; Ok(()) } pub fn unreachable_wireguard_tunnel() -> talpid_types::net::wireguard::ConnectionConfig { talpid_types::net::wireguard::ConnectionConfig { tunnel: TunnelConfig { private_key: PrivateKey::new_from_random(), addresses: vec![IpAddr::V4(Ipv4Addr::new(10, 64, 10, 1))], }, peer: PeerConfig { public_key: PrivateKey::new_from_random().public_key(), allowed_ips: vec![ "0.0.0.0/0".parse().expect("Failed to parse ipv6 network"), "::0/0".parse().expect("Failed to parse ipv6 network"), ], endpoint: "1.3.3.7:1234".parse().unwrap(), psk: None, constant_packet_size: false, }, exit_peer: None, ipv4_gateway: Ipv4Addr::new(10, 64, 10, 1), ipv6_gateway: None, #[cfg(target_os = "linux")] fwmark: None, } } /// Return the current `MULLVAD_API_HOST` et al. /// /// # Note /// This is independent of the running daemon's environment. /// It is solely dependant on the current value of [`TEST_CONFIG`]. pub async fn get_app_env() -> anyhow::Result> { use mullvad_api::env; let api_host = format!("api.{}", TEST_CONFIG.mullvad_host); let api_host_with_port = format!("{api_host}:443"); let api_addr = resolve_hostname_with_retries(api_host_with_port) .await .context("failed to resolve API host")?; Ok(HashMap::from_iter(vec![ (env::API_HOST_VAR.to_string(), api_host), (env::API_ADDR_VAR.to_string(), api_addr.to_string()), ])) } /// Constrain the daemon to only select the relay selected with `query` when establishing all /// future tunnels (until relay settings are updated, see [`set_relay_settings`]). Returns the /// selected [`Relay`] for future reference. /// /// # Note /// This function does not handle bridges and multihop configurations (currently). There is no /// particular reason for this other than it not being needed at the time, so feel free to extend /// this function :). pub async fn constrain_to_relay( mullvad_client: &mut MullvadProxyClient, query: RelayQuery, ) -> anyhow::Result { /// Convert the result of invoking the relay selector to a relay constraint. fn convert_to_relay_constraints( query: RelayQuery, selected_relay: GetRelay, ) -> anyhow::Result<(Relay, RelayConstraints)> { match selected_relay { GetRelay::Wireguard { inner: WireguardConfig::Singlehop { exit }, .. } | GetRelay::OpenVpn { exit, .. } => { let location = into_constraint(&exit); let (mut relay_constraints, ..) = query.into_settings(); relay_constraints.location = location; Ok((exit, relay_constraints)) } unsupported => bail!("Can not constrain to a {unsupported:?}"), } } let settings = mullvad_client.get_settings().await?; // Construct a relay selector with up-to-date information from the runnin daemon's relay list let relay_list = mullvad_client.get_relay_locations().await?; let relay_selector = get_daemon_relay_selector(&settings, relay_list); // Select an(y) appropriate relay for the given query and constrain the daemon to only connect // to that specific relay (when connecting). let relay = relay_selector.get_relay_by_query(query.clone())?; let (exit, relay_constraints) = convert_to_relay_constraints(query, relay)?; set_relay_settings(mullvad_client, RelaySettings::Normal(relay_constraints)).await?; Ok(exit) } /// Get a mirror of the relay selector used by the daemon. /// /// This can be used to query the relay selector without triggering a tunnel state change in the /// daemon. pub fn get_daemon_relay_selector( settings: &mullvad_types::settings::Settings, relay_list: mullvad_types::relay_list::RelayList, ) -> RelaySelector { RelaySelector::from_list(SelectorConfig::from_settings(settings), relay_list) } /// Convenience function for constructing a constraint from a given [`Relay`]. /// /// # Panics /// /// The relay must have a location set. pub fn into_constraint(relay: &Relay) -> Constraint { let constraint = GeographicLocationConstraint::hostname( relay.location.country_code.clone(), relay.location.city_code.clone(), &relay.hostname, ); Constraint::Only(LocationConstraint::Location(constraint)) } /// Ping monitoring made easy! /// /// Continuously ping some destination while monitoring to detect diverging /// packets. /// /// To customize [`Pinger`] before the pinging and network monitoring starts, /// see [`PingerBuilder`]. Call [`start`](Pinger::start) to start pinging, and /// [`stop`](Pinger::stop) to get the leak test results. #[allow(dead_code)] pub struct Pinger { // These values can be configured with [`PingerBuilder`]. destination: SocketAddr, interval: tokio::time::Interval, // Run-time specific values pub guest_ip: IpAddr, ping_task: AbortOnDrop>, monitor: PacketMonitor, } impl Pinger { /// Create a [`Pinger`] with a default configuration. /// /// See [`PingerBuilder`] for details. pub async fn start(rpc: &test_rpc::ServiceClient) -> Pinger { let defaults = PingerBuilder::default(); Self::start_with(defaults, rpc).await } /// Create a [`Pinger`] using the configuration of `builder`. /// /// See [`PingerBuilder`] for details on how to configure a [`Pinger`] /// before starting it. pub async fn start_with(builder: PingerBuilder, rpc: &test_rpc::ServiceClient) -> Pinger { // Get the associated IP address of the test runner on the default, non-tunnel interface. let guest_ip = obtain_guest_ip(rpc).await; log::debug!("Guest IP: {guest_ip}"); // Start a network monitor log::debug!("Monitoring outgoing traffic"); let monitor = start_packet_monitor( move |packet| { // NOTE: Many packets will likely be observed for API traffic. Rather than filtering // all of those specifically, simply fail if our probes are // observed. packet.source.ip() == guest_ip && packet.destination.ip() == builder.destination.ip() }, MonitorOptions::default(), ) .await; // Start pinging // // Create some network activity for the network monitor to sniff. let ping_rpc = rpc.clone(); let mut interval = tokio::time::interval(builder.interval.period()); #[allow(clippy::async_yields_async)] let ping_task = AbortOnDrop::new(tokio::spawn(async move { loop { send_guest_probes_without_monitor(ping_rpc.clone(), None, builder.destination) .await; interval.tick().await; } })); Pinger { destination: builder.destination, interval: builder.interval, guest_ip, ping_task, monitor, } } /// Stop pinging and extract the result of the network monitor. pub async fn stop(self) -> Result { // Abort the inner probe sender, which is accomplished by dropping the // join handle to the running task. drop(self.ping_task); self.monitor.into_result().await } /// Return the time period determining the cadence of pings that are sent. pub fn period(&self) -> tokio::time::Duration { self.interval.period() } } /// Returns the [`IpAddr`] of the default non-tunnel interface. async fn obtain_guest_ip(rpc: &ServiceClient) -> IpAddr { let guest_iface = rpc .get_default_interface() .await .expect("failed to obtain default interface"); rpc.get_interface_ip(guest_iface) .await .expect("failed to obtain non-tun IP") } /// Configure a [`Pinger`] before starting it. pub struct PingerBuilder { destination: SocketAddr, interval: tokio::time::Interval, } #[allow(dead_code)] impl PingerBuilder { /// Create a default [`PingerBuilder`]. /// /// This is probably good enough for checking network traffic leaks when the /// test-runner is supposed to be blocked from sending or receiving *any* /// packets outside of localhost. pub fn default() -> PingerBuilder { PingerBuilder { destination: "1.1.1.1:1337".parse().unwrap(), interval: tokio::time::interval(Duration::from_secs(1)), } } /// Set the target to ping. pub fn destination(mut self, destination: SocketAddr) -> Self { self.destination = destination; self } /// How often a ping should be sent. pub fn interval(mut self, period: Duration) -> Self { self.interval = tokio::time::interval(period); self } } /// This helper spawns a separate process which checks if we are connected to Mullvad, and tries to /// leak traffic outside the tunnel by sending TCP, UDP, and ICMP packets to [LEAK_DESTINATION]. pub struct ConnChecker { rpc: ServiceClient, mullvad_client: MullvadProxyClient, leak_destination: SocketAddr, /// Path to the process binary. executable_path: String, /// Whether the process should be split when spawned. Needed on Linux. split: bool, /// Some arbitrary payload payload: Option, } pub struct ConnCheckerHandle<'a> { checker: &'a mut ConnChecker, /// ID of the spawned process. pid: u32, } pub struct ConnectionStatus { /// True if reported we are connected. am_i_mullvad: bool, /// True if we sniffed TCP packets going outside the tunnel. leaked_tcp: bool, /// True if we sniffed UDP packets going outside the tunnel. leaked_udp: bool, /// True if we sniffed ICMP packets going outside the tunnel. leaked_icmp: bool, } impl ConnChecker { pub fn new( rpc: ServiceClient, mullvad_client: MullvadProxyClient, leak_destination: SocketAddr, ) -> Self { let artifacts_dir = &TEST_CONFIG.artifacts_dir; let executable_path = match TEST_CONFIG.os { Os::Linux | Os::Macos => format!("{artifacts_dir}/{CHECKER_FILENAME_UNIX}"), Os::Windows => format!("{artifacts_dir}\\{CHECKER_FILENAME_WINDOWS}"), }; Self { rpc, mullvad_client, leak_destination, split: false, executable_path, payload: None, } } /// Set a custom magic payload that the connection checker binary should use when leak-testing. pub fn payload(&mut self, payload: impl Into) { self.payload = Some(payload.into()) } /// Spawn the connection checker process and return a handle to it. /// /// Dropping the handle will stop the process. /// **NOTE**: The handle must be dropped from a tokio runtime context. pub async fn spawn(&mut self) -> anyhow::Result> { log::debug!("spawning connection checker"); let opts = { let mut args = [ "--interactive", "--timeout", &AM_I_MULLVAD_TIMEOUT_S.to_string(), // try to leak traffic to LEAK_DESTINATION "--leak", &self.leak_destination.to_string(), "--leak-timeout", &LEAK_TIMEOUT_S.to_string(), "--leak-tcp", "--leak-udp", "--leak-icmp", "--url", &format!("https://am.i.{}/json", TEST_CONFIG.mullvad_host), ] .map(String::from) .to_vec(); if let Some(payload) = &self.payload { args.push("--payload".to_string()); args.push(payload.clone()); }; SpawnOpts { attach_stdin: true, attach_stdout: true, args, ..SpawnOpts::new(&self.executable_path) } }; let pid = self.rpc.spawn(opts).await?; if self.split && TEST_CONFIG.os == Os::Linux { self.mullvad_client .add_split_tunnel_process(pid as i32) .await?; } // TODO: The ST process monitor is a bit racy on macOS, such that processes aren't // immediately recognized. This is a workaround until fixed. tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(ConnCheckerHandle { pid, checker: self }) } /// Enable split tunneling for the connection checker. pub async fn split(&mut self) -> anyhow::Result<()> { log::debug!("enable split tunnel"); self.split = true; match TEST_CONFIG.os { Os::Linux => { /* linux programs can't be split until they are spawned */ } Os::Macos | Os::Windows => { self.mullvad_client .add_split_tunnel_app(&self.executable_path) .await?; self.mullvad_client.set_split_tunnel_state(true).await?; } } Ok(()) } /// Disable split tunneling for the connection checker. pub async fn unsplit(&mut self) -> anyhow::Result<()> { log::debug!("disable split tunnel"); self.split = false; match TEST_CONFIG.os { Os::Linux => {} Os::Macos | Os::Windows => { self.mullvad_client.set_split_tunnel_state(false).await?; self.mullvad_client .remove_split_tunnel_app(&self.executable_path) .await?; } } Ok(()) } } impl ConnCheckerHandle<'_> { pub async fn split(&mut self) -> anyhow::Result<()> { if TEST_CONFIG.os == Os::Linux { self.checker .mullvad_client .add_split_tunnel_process(self.pid as i32) .await?; } self.checker.split().await } pub async fn unsplit(&mut self) -> anyhow::Result<()> { if TEST_CONFIG.os == Os::Linux { self.checker .mullvad_client .remove_split_tunnel_process(self.pid as i32) .await?; } self.checker.unsplit().await } /// Assert that traffic is flowing through the Mullvad tunnel and that no packets are leaked. pub async fn assert_secure(&mut self) -> anyhow::Result<()> { log::info!("checking that connection is secure"); let status = self.check_connection().await?; ensure!(status.am_i_mullvad); ensure!(!status.leaked_tcp); ensure!(!status.leaked_udp); ensure!(!status.leaked_icmp); Ok(()) } /// Assert that traffic is NOT flowing through the Mullvad tunnel and that packets ARE leaked. pub async fn assert_insecure(&mut self) -> anyhow::Result<()> { log::info!("checking that connection is not secure"); let status = self.check_connection().await?; ensure!(!status.am_i_mullvad); ensure!(status.leaked_tcp); ensure!(status.leaked_udp); ensure!(status.leaked_icmp); Ok(()) } pub async fn check_connection(&mut self) -> anyhow::Result { // Monitor all packets going to LEAK_DESTINATION during the check. let leak_destination = self.checker.leak_destination; let monitor = start_packet_monitor( move |packet| packet.destination.ip() == leak_destination.ip(), MonitorOptions { direction: Some(Direction::In), ..MonitorOptions::default() }, ) .await; // Write a newline to the connection checker to prompt it to perform the check. self.checker .rpc .write_child_stdin(self.pid, "Say the line, Bart!\r\n".into()) .await?; // The checker responds when the check is complete. let line = self.read_stdout_line().await?; let monitor_result = monitor .into_result() .await .map_err(|_e| anyhow!("Packet monitor unexpectedly stopped"))?; Ok(ConnectionStatus { am_i_mullvad: parse_am_i_mullvad(line)?, leaked_tcp: (monitor_result.packets.iter()) .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Tcp), leaked_udp: (monitor_result.packets.iter()) .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Udp), leaked_icmp: (monitor_result.packets.iter()) .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Icmp), }) } /// Try to a single line of output from the spawned process async fn read_stdout_line(&mut self) -> anyhow::Result { // Add a timeout to avoid waiting forever. tokio::time::timeout(CONN_CHECKER_TIMEOUT, async { let mut line = String::new(); // tarpc doesn't support streams, so we poll the checker process in a loop instead loop { let Some(output) = self.checker.rpc.read_child_stdout(self.pid).await? else { bail!("got EOF from connection checker process"); }; if output.is_empty() { sleep(Duration::from_millis(500)).await; continue; } line.push_str(&output); if line.contains('\n') { log::info!("output from child process: {output:?}"); return Ok(line); } } }) .await .with_context(|| "Timeout reading stdout from connection checker")? } } impl Drop for ConnCheckerHandle<'_> { fn drop(&mut self) { let rpc = self.checker.rpc.clone(); let pid = self.pid; let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else { log::error!("ConnCheckerHandle dropped outside of a tokio runtime."); return; }; runtime_handle.spawn(async move { // Make sure child process is stopped when this handle is dropped. // Closing stdin does the trick. let _ = rpc.close_child_stdin(pid).await; }); } } /// Parse output from connection-checker. Returns true if connected to Mullvad. fn parse_am_i_mullvad(result: String) -> anyhow::Result { Ok(if result.contains("You are connected") { true } else if result.contains("You are not connected") { false } else { bail!("Unexpected output from connection-checker: {result:?}") }) } pub mod custom_lists { use super::*; use mullvad_types::custom_list::{CustomList, Id}; use std::sync::{LazyLock, Mutex}; // Expose all custom list variants as a shorthand. pub use List::*; /// The default custom list to use as location for all tests. pub const DEFAULT_LIST: List = List::Nordic; /// Mapping between [List] to daemon custom lists. Since custom list ids are assigned by the /// daemon at the creation of the custom list settings object, we can't map a custom list /// name to a specific list before runtime. static IDS: LazyLock>> = LazyLock::new(|| Mutex::new(HashMap::new())); /// Pre-defined (well-typed) custom lists which may be useful in different test scenarios. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum List { /// A selection of Nordic servers Nordic, /// A selection of European servers Europe, /// This custom list contains relays which are close geographically to the computer running /// the test scenarios, which hopefully means there will be little latency between the test /// machine and these relays LowLatency, /// Antithesis of [List::LowLatency], these relays are located far away from the test /// server. Use this custom list if you want to simulate scenarios where the probability /// of experiencing high latencies is desirable. HighLatency, } impl List { pub fn name(self) -> String { use List::*; match self { Nordic => "Nordic".to_string(), Europe => "Europe".to_string(), LowLatency => "Low Latency".to_string(), HighLatency => "High Latency".to_string(), } } /// Iterator over all custom lists. pub fn all() -> impl Iterator { use List::*; [Nordic, Europe, LowLatency, HighLatency].into_iter() } pub fn locations(self) -> impl Iterator { use List::*; let country = GeographicLocationConstraint::country; let city = GeographicLocationConstraint::city; match self { Nordic => { vec![country("no"), country("se"), country("fi"), country("dk")].into_iter() } Europe => vec![ // North country("se"), // West country("fr"), // East country("ro"), // South country("it"), ] .into_iter(), LowLatency => { // Assumption: Test server is located in Gothenburg, Sweden. vec![city("se", "got")].into_iter() } HighLatency => { // Assumption: Test server is located in Gothenburg, Sweden. vec![country("au"), country("ca"), country("za")].into_iter() } } } pub fn to_constraint(self) -> Option { let ids = IDS.lock().unwrap(); let id = ids.get(&self)?; Some(LocationConstraint::CustomList { list_id: *id }) } } impl From for LocationConstraint { fn from(custom_list: List) -> Self { // TODO: Is this _too_ unsound ?? custom_list.to_constraint().unwrap() } } /// Add a set of custom lists which can be used in different test scenarios. /// /// See [`List`] for available custom lists. pub async fn add_default_lists(mullvad_client: &mut MullvadProxyClient) -> anyhow::Result<()> { for custom_list in List::all() { let id = mullvad_client .create_custom_list(custom_list.name()) .await?; let mut daemon_dito = find_custom_list(mullvad_client, &custom_list.name()).await?; assert_eq!(id, daemon_dito.id); for locations in custom_list.locations() { daemon_dito.locations.insert(locations); } mullvad_client.update_custom_list(daemon_dito).await?; // Associate this custom list variant with a specific, runtime custom list id. IDS.lock().unwrap().insert(custom_list, id); } Ok(()) } /// Set the default location to the custom list specified by `DEFAULT_LIST`. This also includes /// entry location for multihop. It does not, however, affect bridge location for OpenVPN. /// This is for simplify, as bridges default to using the server closest to the exit anyway, and /// OpenVPN is slated for removal. pub async fn set_default_location( mullvad_client: &mut MullvadProxyClient, ) -> anyhow::Result<()> { let constraints = get_custom_list_location_relay_constraints(DEFAULT_LIST); mullvad_client .set_relay_settings(constraints.into()) .await .context("Failed to set relay settings") } fn get_custom_list_location_relay_constraints(custom_list: List) -> RelayConstraints { let wireguard_constraints = mullvad_types::relay_constraints::WireguardConstraints { entry_location: Constraint::Only(custom_list.into()), ..Default::default() }; RelayConstraints { location: Constraint::Only(custom_list.into()), wireguard_constraints, ..Default::default() } } /// Dig out a custom list from the daemon settings based on the custom list's name. /// There should be an rpc for this. async fn find_custom_list( rpc: &mut MullvadProxyClient, name: &str, ) -> anyhow::Result { rpc.get_settings() .await? .custom_lists .into_iter() .find(|list| list.name == name) .ok_or(anyhow!("List '{name}' not found")) } }