diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-09-21 08:11:28 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-09-21 08:11:28 +0200 |
| commit | f617dd08e8aaa5747b6a1c6b884af4c37d00344f (patch) | |
| tree | f4e58dba9d575774ce4840f9db46400cfd073fb1 | |
| parent | c7199bd140f4769dde66dc6044ab0921d1330378 (diff) | |
| parent | 300055f81a3d32ecda0029a86f9dc2c8bfc19290 (diff) | |
| download | mullvadvpn-f617dd08e8aaa5747b6a1c6b884af4c37d00344f.tar.xz mullvadvpn-f617dd08e8aaa5747b6a1c6b884af4c37d00344f.zip | |
Merge branch 'socket-relay'
| -rw-r--r-- | Cargo.lock | 22 | ||||
| -rw-r--r-- | Cargo.toml | 1 | ||||
| -rw-r--r-- | socket-relay/Cargo.toml | 16 | ||||
| -rw-r--r-- | socket-relay/src/lib.rs | 8 | ||||
| -rw-r--r-- | socket-relay/src/main.rs | 53 | ||||
| -rw-r--r-- | socket-relay/src/udp.rs | 230 | ||||
| -rw-r--r-- | socket-relay/tests/forward_self.rs | 55 |
7 files changed, 385 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock index fc8263b78d..1a42817892 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1097,6 +1097,18 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "socket-relay" +version = "0.1.0" +dependencies = [ + "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "stable_deref_trait" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1335,6 +1347,15 @@ dependencies = [ ] [[package]] +name = "tokio-timer" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "tokio-tls" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1616,6 +1637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc" "checksum tokio-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d88e411cac1c87e405e4090be004493c5d8072a370661033b1a64ea205ec2e13" "checksum toml 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b0601da6c97135c8d330c7a13a013ca6cd4143221b01de2f8d4edc50a9e551c7" "checksum unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e01da42520092d0cd2d6ac3ae69eb21a22ad43ff195676b86f8c37f487d6b80" diff --git a/Cargo.toml b/Cargo.toml index a1d53676c5..e5bf17fc21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,5 @@ members = [ "talpid-openvpn-plugin", "talpid-core", "talpid-ipc", + "socket-relay", ] diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml new file mode 100644 index 0000000000..b40c72f6f2 --- /dev/null +++ b/socket-relay/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "socket-relay" +version = "0.1.0" +authors = ["Mullvad VPN <admin@mullvad.net>", "Linus Färnstrand <linus@mullvad.net>"] +description = "A relay for bidirectional sockets. Listens to one address and relays all traffic to another." +license = "MIT/Apache-2.0" +keywords = ["socket", "network", "forward", "relay", "proxy"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +env_logger = "0.4.3" +error-chain = "0.11" +futures = "0.1.15" +log = "0.3" +tokio-core = "0.1" +tokio-timer = "0.1" diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs new file mode 100644 index 0000000000..c0b930e2dc --- /dev/null +++ b/socket-relay/src/lib.rs @@ -0,0 +1,8 @@ +extern crate futures; +#[macro_use] +extern crate log; +extern crate tokio_core; +extern crate tokio_timer; + +/// UDP based relaying. +pub mod udp; diff --git a/socket-relay/src/main.rs b/socket-relay/src/main.rs new file mode 100644 index 0000000000..851904b499 --- /dev/null +++ b/socket-relay/src/main.rs @@ -0,0 +1,53 @@ +extern crate env_logger; +#[macro_use] +extern crate error_chain; +extern crate tokio_core; + +extern crate socket_relay; + +use std::env; +use std::thread; +use std::time::Duration; + +use tokio_core::reactor::Core; + +error_chain!{} + +quick_main!(run); +fn run() -> Result<()> { + env_logger::init().chain_err(|| "Failed to init logging")?; + + let listen_addr = env::args() + .nth(1) + .expect("Listen address as first argument") + .parse() + .expect("Invalid listen address format"); + let destination = env::args() + .nth(2) + .expect("Relay destination address as second argument") + .parse() + .expect("Invalid destination address format"); + let forward_bind_ip = env::args() + .nth(3) + .unwrap_or(String::from("0.0.0.0")) + .parse() + .unwrap(); + + let mut core = Core::new().chain_err(|| "Unable to create Tokio core")?; + let handle = core.handle(); + + let relay = socket_relay::udp::Relay::new(listen_addr, forward_bind_ip, destination, handle) + .chain_err(|| "Unable to init forwarder")?; + println!("Forwarder listening on {}", relay.listen_addr()); + + let close_handle = relay.close_handle(); + thread::spawn(move || { + thread::sleep(Duration::from_secs(20)); + println!("Closing relay"); + close_handle.close(); + }); + + let result = core.run(relay); + println!("result: {:?}", result); + Ok(()) +} diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs new file mode 100644 index 0000000000..4e3a2ba600 --- /dev/null +++ b/socket-relay/src/udp.rs @@ -0,0 +1,230 @@ +use futures; +use futures::future::{self, Future}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::mpsc::{channel as sync_channel, Sender as SyncSender}; +use futures::unsync::mpsc::{channel as unsync_channel, Sender}; + +use std::io; +use std::net::{IpAddr, SocketAddr}; +use std::result::Result as StdResult; +use std::time::Duration; + +use tokio_core::net::{UdpCodec, UdpSocket}; +use tokio_core::reactor::Handle; +use tokio_timer::Timer; + +/// The amount of idle (no replies) time needed for the forwarding socket to close. +pub static FORWARD_TIMEOUT_MS: u64 = 60000; + +/// Number of slots in internal channel transfering responses back to clients. +pub static CLIENT_SINK_CHANNEL_SIZE: usize = 10; + + +pub struct Relay { + listen_addr: SocketAddr, + forwarding_future: Box<Future<Item = (), Error = io::Error>>, + close_handle: SyncSender<()>, +} + +impl Relay { + /// Sets up relaying from `listen_addr` to `destination_addr`. + /// + /// `forward_bind_ip` is the local IP the socket that sends to the destination binds to. + pub fn new( + listen_addr: SocketAddr, + forward_bind_ip: IpAddr, + destination_addr: SocketAddr, + handle: Handle, + ) -> Result<Relay, io::Error> { + let listen_socket = UdpSocket::bind(&listen_addr, &handle)?; + let listen_addr = listen_socket.local_addr()?; + debug!("Bound relay listening socket to {}", listen_addr); + + // Split the listening socket into a stream of incoming and a sink for outgoing datagrams. + let (client_sink, client_stream) = listen_socket.framed(ServerCodec).split(); + let (closable_client_stream, close_handle) = closable_stream(client_stream); + let client_sink_channel = create_client_sink_channel(client_sink, &handle); + + let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| { + let response_sink = client_sink_channel.clone().sink_map_err(|_| ()); + + if let Err(e) = Self::forward( + client_addr, + forward_bind_ip, + destination_addr, + data, + response_sink, + &handle, + ) { + error!("Unable to perform forwarding for {}: {}", client_addr, e); + }; + future::ok(()) + }); + + Ok(Relay { + listen_addr, + forwarding_future: Box::new(forwarding_future), + close_handle, + }) + } + + /// Forwards `data` to `destination` and streams all replies into `response_sink`. + fn forward<S>( + client_addr: SocketAddr, + bind_ip: IpAddr, + destination: SocketAddr, + data: Vec<u8>, + response_sink: S, + handle: &Handle, + ) -> io::Result<()> + where + S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = ()> + 'static, + { + let bind_addr = SocketAddr::new(bind_ip, 0); + let socket = UdpSocket::bind(&bind_addr, &handle)?; + trace!( + "Relaying {} byte datagram from {} to {}", + data.len(), + client_addr, + destination + ); + + let (forward_sink, forward_stream) = socket.framed(ServerCodec).split(); + + let send_future = forward_sink.send((destination, data)).map_err(|e| { + error!("Error while forwarding to destination addr: {}", e); + }); + + let recv_stream = forward_stream + .filter_map(move |(addr, data)| if addr == destination { + trace!( + "Returning {} byte response from {} to {}", + data.len(), + addr, + client_addr + ); + Some((client_addr, data)) + } else { + trace!( + "Discarding data from {}, expecting data from {}", + addr, + destination + ); + None + }) + .map_err(|e| { + error!("Error reading datagrams from forward socket: {}", e) + }); + + let timeout_recv_future = Timer::default() + .timeout_stream(recv_stream, Duration::from_millis(FORWARD_TIMEOUT_MS)) + .forward(response_sink) + .map(|_| ()); + + handle.spawn(send_future.and_then(|_| timeout_recv_future)); + Ok(()) + } + + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn close_handle(&self) -> RelayCloseHandle { + RelayCloseHandle(self.close_handle.clone()) + } +} + +impl Future for Relay { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { + self.forwarding_future.poll() + } +} + +impl Drop for Relay { + fn drop(&mut self) { + self.close_handle().close(); + } +} + +pub struct RelayCloseHandle(SyncSender<()>); + +impl RelayCloseHandle { + pub fn close(self) { + if self.0.send(()).wait().is_err() { + warn!("Relay already closed"); + } + } +} + + +fn closable_stream<S: Stream + 'static>( + stream: S, +) -> ( + Box<Stream<Item = S::Item, Error = S::Error>>, + SyncSender<()>, +) { + let (close_handle_sink, close_signal_stream) = sync_channel(0); + let close_signal_stream = close_signal_stream.map(|_| None).map_err(|_| None); + let mapped_stream = stream.map(|t| Some(t)).map_err(|e| Some(e)); + + let output_stream = mapped_stream + .select(close_signal_stream) + // Map close_signal_stream error as Ok(None) and stream error back to S::Error + .then(|element| match element { + Err(None) => Ok(None), + Err(Some(e)) => Err(e), + Ok(item) => Ok(item), + }) + // Make the stream end when signaled by close_signal_stream. + .take_while(|item| Ok(item.is_some())) + // Map Option<S::Item> to S::Item, we know it is a Some from the take_while above. + .map(|item| item.unwrap()); + (Box::new(output_stream), close_handle_sink) +} + + +/// Create a channel accepting tuples of `SocketAddr` and binary data and forward anything coming +/// on this channel to the `client_sink`. Returns the sender half of the channel. +fn create_client_sink_channel<S>(client_sink: S, handle: &Handle) -> Sender<(SocketAddr, Vec<u8>)> +where + S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = io::Error> + 'static, +{ + let (channel_sink, channel_stream) = unsync_channel(CLIENT_SINK_CHANNEL_SIZE); + + let forward_future = channel_stream + .map_err(|_| None) + .forward(client_sink.sink_map_err(|e| Some(e))) + .and_then(|_| Ok(())) + .map_err(|error: Option<io::Error>| match error { + Some(sink_error) => { + error!("Error sending response back to client: {}", sink_error); + } + None => debug!("Closing relay socket sink"), + }); + handle.spawn(forward_future); + + channel_sink +} + + +/// Internal struct implementing `Codec`. Just so it becomes possible to split a `UdpSocket` into +/// a sink and a stream. +struct ServerCodec; + +impl UdpCodec for ServerCodec { + type In = (SocketAddr, Vec<u8>); + type Out = (SocketAddr, Vec<u8>); + + fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> StdResult<Self::In, io::Error> { + Ok((*addr, buf.to_vec())) + } + + fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { + into.extend(buf); + addr + } +} diff --git a/socket-relay/tests/forward_self.rs b/socket-relay/tests/forward_self.rs new file mode 100644 index 0000000000..4714e52404 --- /dev/null +++ b/socket-relay/tests/forward_self.rs @@ -0,0 +1,55 @@ +extern crate env_logger; +extern crate socket_relay; +extern crate tokio_core; + +use std::net::{SocketAddr, UdpSocket}; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use socket_relay::udp::Relay; +use tokio_core::reactor::Core; + +#[test] +fn test() { + env_logger::init().unwrap(); + + let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + socket + .set_read_timeout(Some(Duration::from_secs(1))) + .unwrap(); + let mut buffer = [0; 100]; + + let relay_listen_addr = spawn_relay(socket.local_addr().unwrap()); + + let test_data = [9, 88, 5, 2]; + socket.send_to(&test_data, relay_listen_addr).unwrap(); + let (len1, relay_src1) = socket.recv_from(&mut buffer).unwrap(); + assert_eq!(&buffer[..len1], &test_data); + + let reply_test_data = [1, 2, 6, 100]; + socket.send_to(&reply_test_data, relay_src1).unwrap(); + let (len2, relay_src2) = socket.recv_from(&mut buffer).unwrap(); + assert_eq!(relay_src2, relay_listen_addr); + assert_eq!(&buffer[..len2], &reply_test_data); +} + +fn spawn_relay(destination_addr: SocketAddr) -> SocketAddr { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let relay = Relay::new( + "127.0.0.1:0".parse().unwrap(), + "127.0.0.1".parse().unwrap(), + destination_addr, + handle, + ).unwrap(); + println!("Relay listening on {}", relay.listen_addr()); + tx.send(relay.listen_addr()).unwrap(); + let _ = core.run(relay); + println!("Relay exiting") + }); + rx.recv().unwrap() +} |
