summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2024-02-07 17:50:24 +0100
committerDavid Lönnhager <david.l@mullvad.net>2024-02-07 17:50:24 +0100
commit0d4ee241b523a7d024cb2aebfcbbf3924a8f3bb5 (patch)
treec095e7b898d4d736322d267965948b30fde3bf50
parent20d9c98f5ec44166b461730fec9ca292b622265f (diff)
parenteed7234599253f3d742be8bb4b6b1ecbf1299dc3 (diff)
downloadmullvadvpn-0d4ee241b523a7d024cb2aebfcbbf3924a8f3bb5.tar.xz
mullvadvpn-0d4ee241b523a7d024cb2aebfcbbf3924a8f3bb5.zip
Merge branch 'testing-add-socks-server'
-rw-r--r--test/Cargo.lock26
-rw-r--r--test/Cargo.toml1
-rw-r--r--test/socks-server/Cargo.toml18
-rw-r--r--test/socks-server/src/lib.rs58
-rw-r--r--test/test-manager/Cargo.toml1
-rw-r--r--test/test-manager/src/main.rs9
-rw-r--r--test/test-manager/src/vm/network/mod.rs3
-rw-r--r--test/test-rpc/src/client.rs10
-rw-r--r--test/test-rpc/src/lib.rs12
-rw-r--r--test/test-rpc/src/net.rs57
-rw-r--r--test/test-runner/src/forward.rs127
-rw-r--r--test/test-runner/src/main.rs19
12 files changed, 340 insertions, 1 deletions
diff --git a/test/Cargo.lock b/test/Cargo.lock
index f48fec100d..64d3552540 100644
--- a/test/Cargo.lock
+++ b/test/Cargo.lock
@@ -828,6 +828,20 @@ dependencies = [
]
[[package]]
+name = "fast-socks5"
+version = "0.9.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cbcc731f3c17a5053e07e6a2290918da75cd8b9b1217b419721f715674ac520c"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "log",
+ "thiserror",
+ "tokio",
+ "tokio-stream",
+]
+
+[[package]]
name = "fastrand"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2876,6 +2890,17 @@ dependencies = [
]
[[package]]
+name = "socks-server"
+version = "0.0.0"
+dependencies = [
+ "err-derive",
+ "fast-socks5",
+ "futures",
+ "log",
+ "tokio",
+]
+
+[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3111,6 +3136,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
+ "socks-server",
"ssh2",
"talpid-types",
"tarpc",
diff --git a/test/Cargo.toml b/test/Cargo.toml
index a6653530b5..27c0f94469 100644
--- a/test/Cargo.toml
+++ b/test/Cargo.toml
@@ -11,6 +11,7 @@ members = [
"test-manager",
"test-runner",
"test-rpc",
+ "socks-server",
]
[workspace.lints.rust]
diff --git a/test/socks-server/Cargo.toml b/test/socks-server/Cargo.toml
new file mode 100644
index 0000000000..ba6d1ba4f8
--- /dev/null
+++ b/test/socks-server/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "socks-server"
+description = "Contains a simple SOCKS5 server"
+authors.workspace = true
+repository.workspace = true
+license.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+fast-socks5 = "0.9.5"
+err-derive = { workspace = true }
+tokio = { workspace = true }
+log = { workspace = true }
+futures = { workspace = true }
diff --git a/test/socks-server/src/lib.rs b/test/socks-server/src/lib.rs
new file mode 100644
index 0000000000..eed676ac8e
--- /dev/null
+++ b/test/socks-server/src/lib.rs
@@ -0,0 +1,58 @@
+use futures::StreamExt;
+use std::io;
+use std::net::SocketAddr;
+
+#[derive(err_derive::Error, Debug)]
+#[error(no_from)]
+pub enum Error {
+ #[error(display = "Failed to start SOCKS5 server")]
+ StartSocksServer(#[error(source)] io::Error),
+}
+
+pub struct Handle {
+ handle: tokio::task::JoinHandle<()>,
+}
+
+/// Spawn a SOCKS server bound to `bind_addr`
+pub async fn spawn(bind_addr: SocketAddr) -> Result<Handle, Error> {
+ let socks_server: fast_socks5::server::Socks5Server =
+ fast_socks5::server::Socks5Server::bind(bind_addr)
+ .await
+ .map_err(Error::StartSocksServer)?;
+
+ let handle = tokio::spawn(async move {
+ let mut incoming = socks_server.incoming();
+
+ while let Some(new_client) = incoming.next().await {
+ match new_client {
+ Ok(socket) => {
+ let fut = socket.upgrade_to_socks5();
+
+ // Act as normal SOCKS server
+ tokio::spawn(async move {
+ match fut.await {
+ Ok(_socket) => log::info!("socks client disconnected"),
+ Err(error) => log::error!("socks client failed: {error}"),
+ }
+ });
+ }
+ Err(error) => {
+ log::error!("failed to accept socks client: {error}");
+ }
+ }
+ }
+ });
+ Ok(Handle { handle })
+}
+
+impl Handle {
+ pub fn close(&self) {
+ self.handle.abort();
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
diff --git a/test/test-manager/Cargo.toml b/test/test-manager/Cargo.toml
index 53c1c355f1..72230a2edd 100644
--- a/test/test-manager/Cargo.toml
+++ b/test/test-manager/Cargo.toml
@@ -42,6 +42,7 @@ pcap = { version = "0.10.1", features = ["capture-stream"] }
pnet_packet = "0.31.0"
test-rpc = { path = "../test-rpc" }
+socks-server = { path = "../socks-server" }
env_logger = { workspace = true }
diff --git a/test/test-manager/src/main.rs b/test/test-manager/src/main.rs
index f5e7a70d9f..f81bf77594 100644
--- a/test/test-manager/src/main.rs
+++ b/test/test-manager/src/main.rs
@@ -14,6 +14,7 @@ use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
+use std::net::SocketAddr;
use tests::config::DEFAULT_MULLVAD_HOST;
/// Test manager for Mullvad VPN app
@@ -248,6 +249,13 @@ async fn main() -> Result<()> {
.await
.context("Failed to run provisioning for VM")?;
+ // For convenience, spawn a SOCKS5 server that is reachable for tests that need it
+ let socks = socks_server::spawn(SocketAddr::new(
+ crate::vm::network::NON_TUN_GATEWAY.into(),
+ crate::vm::network::SOCKS5_PORT,
+ ))
+ .await?;
+
let skip_wait = vm_config.provisioner != config::Provisioner::Noop;
let result = run_tests::run(
@@ -291,6 +299,7 @@ async fn main() -> Result<()> {
if display {
instance.wait().await;
}
+ socks.close();
result
}
Commands::FormatTestReports { reports } => {
diff --git a/test/test-manager/src/vm/network/mod.rs b/test/test-manager/src/vm/network/mod.rs
index e12a95c713..944e241013 100644
--- a/test/test-manager/src/vm/network/mod.rs
+++ b/test/test-manager/src/vm/network/mod.rs
@@ -15,3 +15,6 @@ pub use platform::{
CUSTOM_TUN_REMOTE_REAL_PORT, CUSTOM_TUN_REMOTE_TUN_ADDR, DUMMY_LAN_INTERFACE_IP,
NON_TUN_GATEWAY,
};
+
+/// Port on NON_TUN_GATEWAY that hosts a SOCKS5 server
+pub const SOCKS5_PORT: u16 = 54321;
diff --git a/test/test-rpc/src/client.rs b/test/test-rpc/src/client.rs
index 2c47328e00..4d103ed44e 100644
--- a/test/test-rpc/src/client.rs
+++ b/test/test-rpc/src/client.rs
@@ -213,6 +213,16 @@ impl ServiceClient {
.await?
}
+ /// Start forwarding TCP from a server listening on `bind_addr` to the given address, and return a handle that closes the
+ /// server when dropped
+ pub async fn start_tcp_forward(
+ &self,
+ bind_addr: SocketAddr,
+ via_addr: SocketAddr,
+ ) -> Result<crate::net::SockHandle, Error> {
+ crate::net::SockHandle::start_tcp_forward(self.client.clone(), bind_addr, via_addr).await
+ }
+
/// Restarts the app.
///
/// Shuts down a running app, making it disconnect from any current tunnel
diff --git a/test/test-rpc/src/lib.rs b/test/test-rpc/src/lib.rs
index 5919a894d1..d2bee40dbb 100644
--- a/test/test-rpc/src/lib.rs
+++ b/test/test-rpc/src/lib.rs
@@ -53,6 +53,8 @@ pub enum Error {
InvalidUrl,
#[error(display = "Timeout")]
Timeout,
+ #[error(display = "TCP forward error")]
+ TcpForward,
}
/// Response from am.i.mullvad.net
@@ -148,6 +150,16 @@ mod service {
/// Perform DNS resolution.
async fn resolve_hostname(hostname: String) -> Result<Vec<SocketAddr>, Error>;
+ /// Start forwarding TCP bound to the given address. Return an ID that can be used with
+ /// `stop_tcp_forward`, and the address that the listening socket was actually bound to.
+ async fn start_tcp_forward(
+ bind_addr: SocketAddr,
+ via_addr: SocketAddr,
+ ) -> Result<(net::SockHandleId, SocketAddr), Error>;
+
+ /// Stop forwarding TCP that was previously started with `start_tcp_forward`.
+ async fn stop_tcp_forward(id: net::SockHandleId) -> Result<(), Error>;
+
/// Restart the Mullvad VPN application.
async fn restart_mullvad_daemon() -> Result<(), Error>;
diff --git a/test/test-rpc/src/net.rs b/test/test-rpc/src/net.rs
index b4e114ea47..77aa5c938a 100644
--- a/test/test-rpc/src/net.rs
+++ b/test/test-rpc/src/net.rs
@@ -1,6 +1,8 @@
+use futures::channel::oneshot;
use hyper::{Client, Uri};
use once_cell::sync::Lazy;
-use serde::de::DeserializeOwned;
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use std::net::SocketAddr;
use tokio_rustls::rustls::ClientConfig;
use crate::{AmIMullvad, Error};
@@ -17,6 +19,59 @@ static CLIENT_CONFIG: Lazy<ClientConfig> = Lazy::new(|| {
.with_no_client_auth()
});
+#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
+pub struct SockHandleId(pub usize);
+
+pub struct SockHandle {
+ stop_tx: Option<oneshot::Sender<()>>,
+ bind_addr: SocketAddr,
+}
+
+impl SockHandle {
+ pub(crate) async fn start_tcp_forward(
+ client: crate::service::ServiceClient,
+ bind_addr: SocketAddr,
+ via_addr: SocketAddr,
+ ) -> Result<Self, Error> {
+ let (stop_tx, stop_rx) = oneshot::channel();
+
+ let (id, bind_addr) = client
+ .start_tcp_forward(tarpc::context::current(), bind_addr, via_addr)
+ .await??;
+
+ tokio::spawn(async move {
+ let _ = stop_rx.await;
+
+ log::trace!("Stopping TCP forward");
+
+ if let Err(error) = client.stop_tcp_forward(tarpc::context::current(), id).await {
+ log::error!("Failed to stop TCP forward: {error}");
+ }
+ });
+
+ Ok(SockHandle {
+ stop_tx: Some(stop_tx),
+ bind_addr,
+ })
+ }
+
+ pub fn stop(&mut self) {
+ if let Some(stop_tx) = self.stop_tx.take() {
+ let _ = stop_tx.send(());
+ }
+ }
+
+ pub fn bind_addr(&self) -> SocketAddr {
+ self.bind_addr
+ }
+}
+
+impl Drop for SockHandle {
+ fn drop(&mut self) {
+ self.stop()
+ }
+}
+
pub async fn geoip_lookup(mullvad_host: String) -> Result<AmIMullvad, Error> {
let uri = Uri::try_from(format!("https://ipv4.am.i.{mullvad_host}/json"))
.map_err(|_| Error::InvalidUrl)?;
diff --git a/test/test-runner/src/forward.rs b/test/test-runner/src/forward.rs
new file mode 100644
index 0000000000..ec9e8a98f1
--- /dev/null
+++ b/test/test-runner/src/forward.rs
@@ -0,0 +1,127 @@
+use once_cell::sync::Lazy;
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use test_rpc::net::SockHandleId;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+
+static SERVERS: Lazy<Mutex<HashMap<SockHandleId, Handle>>> =
+ Lazy::new(|| Mutex::new(HashMap::new()));
+
+/// Spawn a TCP forwarder that sends TCP via `via_addr`
+pub async fn start_server(
+ bind_addr: SocketAddr,
+ via_addr: SocketAddr,
+) -> Result<(SockHandleId, SocketAddr), test_rpc::Error> {
+ let next_nonce = {
+ static NONCE: AtomicUsize = AtomicUsize::new(0);
+ || NONCE.fetch_add(1, Ordering::Relaxed)
+ };
+ let id = SockHandleId(next_nonce());
+
+ let handle = tcp_forward(bind_addr, via_addr).await.map_err(|error| {
+ log::error!("Failed to start TCP forwarder listener: {error}");
+ test_rpc::Error::TcpForward
+ })?;
+
+ let bind_addr = handle.local_addr();
+
+ let mut servers = SERVERS.lock().unwrap();
+ servers.insert(id, handle);
+
+ Ok((id, bind_addr))
+}
+
+/// Stop TCP forwarder given some ID returned by `start_server`
+pub fn stop_server(id: SockHandleId) -> Result<(), test_rpc::Error> {
+ let handle = {
+ let mut servers = SERVERS.lock().unwrap();
+ servers.remove(&id)
+ };
+
+ if let Some(handle) = handle {
+ handle.close();
+ }
+ Ok(())
+}
+
+struct Handle {
+ handle: tokio::task::JoinHandle<()>,
+ bind_addr: SocketAddr,
+ clients: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
+}
+
+impl Handle {
+ pub fn close(&self) {
+ self.handle.abort();
+
+ let mut clients = self.clients.lock().unwrap();
+ for client in clients.drain(..) {
+ client.abort();
+ }
+ }
+
+ pub fn local_addr(&self) -> SocketAddr {
+ self.bind_addr
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
+
+/// Forward TCP traffic via `proxy_addr`
+async fn tcp_forward(
+ bind_addr: SocketAddr,
+ proxy_addr: SocketAddr,
+) -> Result<Handle, test_rpc::Error> {
+ let listener = TcpListener::bind(&bind_addr).await.map_err(|error| {
+ log::error!("Failed to bind TCP forward socket: {error}");
+ test_rpc::Error::TcpForward
+ })?;
+ let bind_addr = listener.local_addr().map_err(|error| {
+ log::error!("Failed to get TCP socket addr: {error}");
+ test_rpc::Error::TcpForward
+ })?;
+
+ let clients = Arc::new(Mutex::new(vec![]));
+
+ let clients_copy = clients.clone();
+
+ let handle = tokio::spawn(async move {
+ loop {
+ match listener.accept().await {
+ Ok((mut client, _addr)) => {
+ let client_handle = tokio::spawn(async move {
+ let mut proxy = match TcpStream::connect(proxy_addr).await {
+ Ok(proxy) => proxy,
+ Err(error) => {
+ log::error!("failed to connect to TCP proxy: {error}");
+ return;
+ }
+ };
+
+ if let Err(error) =
+ tokio::io::copy_bidirectional(&mut client, &mut proxy).await
+ {
+ log::error!("copy_directional failed: {error}");
+ }
+ });
+ clients_copy.lock().unwrap().push(client_handle);
+ }
+ Err(error) => {
+ log::error!("failed to accept TCP client: {error}");
+ }
+ }
+ }
+ });
+ Ok(Handle {
+ handle,
+ bind_addr,
+ clients,
+ })
+}
diff --git a/test/test-runner/src/main.rs b/test/test-runner/src/main.rs
index 1c2c301b27..74f7761cc2 100644
--- a/test/test-runner/src/main.rs
+++ b/test/test-runner/src/main.rs
@@ -10,6 +10,7 @@ use tarpc::context;
use tarpc::server::Channel;
use test_rpc::{
mullvad_daemon::{ServiceStatus, SOCKET_PATH},
+ net::SockHandleId,
package::Package,
transport::GrpcForwarder,
AppTrace, Service,
@@ -22,6 +23,7 @@ use tokio::{
use tokio_util::codec::{Decoder, LengthDelimitedCodec};
mod app;
+mod forward;
mod logging;
mod net;
mod package;
@@ -167,6 +169,23 @@ impl Service for TestServer {
.collect())
}
+ async fn start_tcp_forward(
+ self,
+ _: context::Context,
+ bind_addr: SocketAddr,
+ via_addr: SocketAddr,
+ ) -> Result<(SockHandleId, SocketAddr), test_rpc::Error> {
+ forward::start_server(bind_addr, via_addr).await
+ }
+
+ async fn stop_tcp_forward(
+ self,
+ _: context::Context,
+ id: SockHandleId,
+ ) -> Result<(), test_rpc::Error> {
+ forward::stop_server(id)
+ }
+
async fn get_interface_ip(
self,
_: context::Context,