summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-09-21 08:11:28 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-09-21 08:11:28 +0200
commitf617dd08e8aaa5747b6a1c6b884af4c37d00344f (patch)
treef4e58dba9d575774ce4840f9db46400cfd073fb1
parentc7199bd140f4769dde66dc6044ab0921d1330378 (diff)
parent300055f81a3d32ecda0029a86f9dc2c8bfc19290 (diff)
downloadmullvadvpn-f617dd08e8aaa5747b6a1c6b884af4c37d00344f.tar.xz
mullvadvpn-f617dd08e8aaa5747b6a1c6b884af4c37d00344f.zip
Merge branch 'socket-relay'
-rw-r--r--Cargo.lock22
-rw-r--r--Cargo.toml1
-rw-r--r--socket-relay/Cargo.toml16
-rw-r--r--socket-relay/src/lib.rs8
-rw-r--r--socket-relay/src/main.rs53
-rw-r--r--socket-relay/src/udp.rs230
-rw-r--r--socket-relay/tests/forward_self.rs55
7 files changed, 385 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index fc8263b78d..1a42817892 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1097,6 +1097,18 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "socket-relay"
+version = "0.1.0"
+dependencies = [
+ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "stable_deref_trait"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1335,6 +1347,15 @@ dependencies = [
]
[[package]]
+name = "tokio-timer"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "tokio-tls"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1616,6 +1637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db"
"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389"
"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162"
+"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc"
"checksum tokio-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d88e411cac1c87e405e4090be004493c5d8072a370661033b1a64ea205ec2e13"
"checksum toml 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b0601da6c97135c8d330c7a13a013ca6cd4143221b01de2f8d4edc50a9e551c7"
"checksum unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e01da42520092d0cd2d6ac3ae69eb21a22ad43ff195676b86f8c37f487d6b80"
diff --git a/Cargo.toml b/Cargo.toml
index a1d53676c5..e5bf17fc21 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,4 +6,5 @@ members = [
"talpid-openvpn-plugin",
"talpid-core",
"talpid-ipc",
+ "socket-relay",
]
diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml
new file mode 100644
index 0000000000..b40c72f6f2
--- /dev/null
+++ b/socket-relay/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "socket-relay"
+version = "0.1.0"
+authors = ["Mullvad VPN <admin@mullvad.net>", "Linus Färnstrand <linus@mullvad.net>"]
+description = "A relay for bidirectional sockets. Listens to one address and relays all traffic to another."
+license = "MIT/Apache-2.0"
+keywords = ["socket", "network", "forward", "relay", "proxy"]
+categories = ["network-programming", "asynchronous"]
+
+[dependencies]
+env_logger = "0.4.3"
+error-chain = "0.11"
+futures = "0.1.15"
+log = "0.3"
+tokio-core = "0.1"
+tokio-timer = "0.1"
diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs
new file mode 100644
index 0000000000..c0b930e2dc
--- /dev/null
+++ b/socket-relay/src/lib.rs
@@ -0,0 +1,8 @@
+extern crate futures;
+#[macro_use]
+extern crate log;
+extern crate tokio_core;
+extern crate tokio_timer;
+
+/// UDP based relaying.
+pub mod udp;
diff --git a/socket-relay/src/main.rs b/socket-relay/src/main.rs
new file mode 100644
index 0000000000..851904b499
--- /dev/null
+++ b/socket-relay/src/main.rs
@@ -0,0 +1,53 @@
+extern crate env_logger;
+#[macro_use]
+extern crate error_chain;
+extern crate tokio_core;
+
+extern crate socket_relay;
+
+use std::env;
+use std::thread;
+use std::time::Duration;
+
+use tokio_core::reactor::Core;
+
+error_chain!{}
+
+quick_main!(run);
+fn run() -> Result<()> {
+ env_logger::init().chain_err(|| "Failed to init logging")?;
+
+ let listen_addr = env::args()
+ .nth(1)
+ .expect("Listen address as first argument")
+ .parse()
+ .expect("Invalid listen address format");
+ let destination = env::args()
+ .nth(2)
+ .expect("Relay destination address as second argument")
+ .parse()
+ .expect("Invalid destination address format");
+ let forward_bind_ip = env::args()
+ .nth(3)
+ .unwrap_or(String::from("0.0.0.0"))
+ .parse()
+ .unwrap();
+
+ let mut core = Core::new().chain_err(|| "Unable to create Tokio core")?;
+ let handle = core.handle();
+
+ let relay = socket_relay::udp::Relay::new(listen_addr, forward_bind_ip, destination, handle)
+ .chain_err(|| "Unable to init forwarder")?;
+ println!("Forwarder listening on {}", relay.listen_addr());
+
+ let close_handle = relay.close_handle();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_secs(20));
+ println!("Closing relay");
+ close_handle.close();
+ });
+
+ let result = core.run(relay);
+ println!("result: {:?}", result);
+ Ok(())
+}
diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs
new file mode 100644
index 0000000000..4e3a2ba600
--- /dev/null
+++ b/socket-relay/src/udp.rs
@@ -0,0 +1,230 @@
+use futures;
+use futures::future::{self, Future};
+use futures::sink::Sink;
+use futures::stream::Stream;
+use futures::sync::mpsc::{channel as sync_channel, Sender as SyncSender};
+use futures::unsync::mpsc::{channel as unsync_channel, Sender};
+
+use std::io;
+use std::net::{IpAddr, SocketAddr};
+use std::result::Result as StdResult;
+use std::time::Duration;
+
+use tokio_core::net::{UdpCodec, UdpSocket};
+use tokio_core::reactor::Handle;
+use tokio_timer::Timer;
+
+/// The amount of idle (no replies) time needed for the forwarding socket to close.
+pub static FORWARD_TIMEOUT_MS: u64 = 60000;
+
+/// Number of slots in internal channel transfering responses back to clients.
+pub static CLIENT_SINK_CHANNEL_SIZE: usize = 10;
+
+
+pub struct Relay {
+ listen_addr: SocketAddr,
+ forwarding_future: Box<Future<Item = (), Error = io::Error>>,
+ close_handle: SyncSender<()>,
+}
+
+impl Relay {
+ /// Sets up relaying from `listen_addr` to `destination_addr`.
+ ///
+ /// `forward_bind_ip` is the local IP the socket that sends to the destination binds to.
+ pub fn new(
+ listen_addr: SocketAddr,
+ forward_bind_ip: IpAddr,
+ destination_addr: SocketAddr,
+ handle: Handle,
+ ) -> Result<Relay, io::Error> {
+ let listen_socket = UdpSocket::bind(&listen_addr, &handle)?;
+ let listen_addr = listen_socket.local_addr()?;
+ debug!("Bound relay listening socket to {}", listen_addr);
+
+ // Split the listening socket into a stream of incoming and a sink for outgoing datagrams.
+ let (client_sink, client_stream) = listen_socket.framed(ServerCodec).split();
+ let (closable_client_stream, close_handle) = closable_stream(client_stream);
+ let client_sink_channel = create_client_sink_channel(client_sink, &handle);
+
+ let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| {
+ let response_sink = client_sink_channel.clone().sink_map_err(|_| ());
+
+ if let Err(e) = Self::forward(
+ client_addr,
+ forward_bind_ip,
+ destination_addr,
+ data,
+ response_sink,
+ &handle,
+ ) {
+ error!("Unable to perform forwarding for {}: {}", client_addr, e);
+ };
+ future::ok(())
+ });
+
+ Ok(Relay {
+ listen_addr,
+ forwarding_future: Box::new(forwarding_future),
+ close_handle,
+ })
+ }
+
+ /// Forwards `data` to `destination` and streams all replies into `response_sink`.
+ fn forward<S>(
+ client_addr: SocketAddr,
+ bind_ip: IpAddr,
+ destination: SocketAddr,
+ data: Vec<u8>,
+ response_sink: S,
+ handle: &Handle,
+ ) -> io::Result<()>
+ where
+ S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = ()> + 'static,
+ {
+ let bind_addr = SocketAddr::new(bind_ip, 0);
+ let socket = UdpSocket::bind(&bind_addr, &handle)?;
+ trace!(
+ "Relaying {} byte datagram from {} to {}",
+ data.len(),
+ client_addr,
+ destination
+ );
+
+ let (forward_sink, forward_stream) = socket.framed(ServerCodec).split();
+
+ let send_future = forward_sink.send((destination, data)).map_err(|e| {
+ error!("Error while forwarding to destination addr: {}", e);
+ });
+
+ let recv_stream = forward_stream
+ .filter_map(move |(addr, data)| if addr == destination {
+ trace!(
+ "Returning {} byte response from {} to {}",
+ data.len(),
+ addr,
+ client_addr
+ );
+ Some((client_addr, data))
+ } else {
+ trace!(
+ "Discarding data from {}, expecting data from {}",
+ addr,
+ destination
+ );
+ None
+ })
+ .map_err(|e| {
+ error!("Error reading datagrams from forward socket: {}", e)
+ });
+
+ let timeout_recv_future = Timer::default()
+ .timeout_stream(recv_stream, Duration::from_millis(FORWARD_TIMEOUT_MS))
+ .forward(response_sink)
+ .map(|_| ());
+
+ handle.spawn(send_future.and_then(|_| timeout_recv_future));
+ Ok(())
+ }
+
+ pub fn listen_addr(&self) -> SocketAddr {
+ self.listen_addr
+ }
+
+ pub fn close_handle(&self) -> RelayCloseHandle {
+ RelayCloseHandle(self.close_handle.clone())
+ }
+}
+
+impl Future for Relay {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
+ self.forwarding_future.poll()
+ }
+}
+
+impl Drop for Relay {
+ fn drop(&mut self) {
+ self.close_handle().close();
+ }
+}
+
+pub struct RelayCloseHandle(SyncSender<()>);
+
+impl RelayCloseHandle {
+ pub fn close(self) {
+ if self.0.send(()).wait().is_err() {
+ warn!("Relay already closed");
+ }
+ }
+}
+
+
+fn closable_stream<S: Stream + 'static>(
+ stream: S,
+) -> (
+ Box<Stream<Item = S::Item, Error = S::Error>>,
+ SyncSender<()>,
+) {
+ let (close_handle_sink, close_signal_stream) = sync_channel(0);
+ let close_signal_stream = close_signal_stream.map(|_| None).map_err(|_| None);
+ let mapped_stream = stream.map(|t| Some(t)).map_err(|e| Some(e));
+
+ let output_stream = mapped_stream
+ .select(close_signal_stream)
+ // Map close_signal_stream error as Ok(None) and stream error back to S::Error
+ .then(|element| match element {
+ Err(None) => Ok(None),
+ Err(Some(e)) => Err(e),
+ Ok(item) => Ok(item),
+ })
+ // Make the stream end when signaled by close_signal_stream.
+ .take_while(|item| Ok(item.is_some()))
+ // Map Option<S::Item> to S::Item, we know it is a Some from the take_while above.
+ .map(|item| item.unwrap());
+ (Box::new(output_stream), close_handle_sink)
+}
+
+
+/// Create a channel accepting tuples of `SocketAddr` and binary data and forward anything coming
+/// on this channel to the `client_sink`. Returns the sender half of the channel.
+fn create_client_sink_channel<S>(client_sink: S, handle: &Handle) -> Sender<(SocketAddr, Vec<u8>)>
+where
+ S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = io::Error> + 'static,
+{
+ let (channel_sink, channel_stream) = unsync_channel(CLIENT_SINK_CHANNEL_SIZE);
+
+ let forward_future = channel_stream
+ .map_err(|_| None)
+ .forward(client_sink.sink_map_err(|e| Some(e)))
+ .and_then(|_| Ok(()))
+ .map_err(|error: Option<io::Error>| match error {
+ Some(sink_error) => {
+ error!("Error sending response back to client: {}", sink_error);
+ }
+ None => debug!("Closing relay socket sink"),
+ });
+ handle.spawn(forward_future);
+
+ channel_sink
+}
+
+
+/// Internal struct implementing `Codec`. Just so it becomes possible to split a `UdpSocket` into
+/// a sink and a stream.
+struct ServerCodec;
+
+impl UdpCodec for ServerCodec {
+ type In = (SocketAddr, Vec<u8>);
+ type Out = (SocketAddr, Vec<u8>);
+
+ fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> StdResult<Self::In, io::Error> {
+ Ok((*addr, buf.to_vec()))
+ }
+
+ fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
+ into.extend(buf);
+ addr
+ }
+}
diff --git a/socket-relay/tests/forward_self.rs b/socket-relay/tests/forward_self.rs
new file mode 100644
index 0000000000..4714e52404
--- /dev/null
+++ b/socket-relay/tests/forward_self.rs
@@ -0,0 +1,55 @@
+extern crate env_logger;
+extern crate socket_relay;
+extern crate tokio_core;
+
+use std::net::{SocketAddr, UdpSocket};
+use std::sync::mpsc;
+use std::thread;
+use std::time::Duration;
+
+use socket_relay::udp::Relay;
+use tokio_core::reactor::Core;
+
+#[test]
+fn test() {
+ env_logger::init().unwrap();
+
+ let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
+ socket
+ .set_read_timeout(Some(Duration::from_secs(1)))
+ .unwrap();
+ let mut buffer = [0; 100];
+
+ let relay_listen_addr = spawn_relay(socket.local_addr().unwrap());
+
+ let test_data = [9, 88, 5, 2];
+ socket.send_to(&test_data, relay_listen_addr).unwrap();
+ let (len1, relay_src1) = socket.recv_from(&mut buffer).unwrap();
+ assert_eq!(&buffer[..len1], &test_data);
+
+ let reply_test_data = [1, 2, 6, 100];
+ socket.send_to(&reply_test_data, relay_src1).unwrap();
+ let (len2, relay_src2) = socket.recv_from(&mut buffer).unwrap();
+ assert_eq!(relay_src2, relay_listen_addr);
+ assert_eq!(&buffer[..len2], &reply_test_data);
+}
+
+fn spawn_relay(destination_addr: SocketAddr) -> SocketAddr {
+ let (tx, rx) = mpsc::channel();
+ thread::spawn(move || {
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+
+ let relay = Relay::new(
+ "127.0.0.1:0".parse().unwrap(),
+ "127.0.0.1".parse().unwrap(),
+ destination_addr,
+ handle,
+ ).unwrap();
+ println!("Relay listening on {}", relay.listen_addr());
+ tx.send(relay.listen_addr()).unwrap();
+ let _ = core.run(relay);
+ println!("Relay exiting")
+ });
+ rx.recv().unwrap()
+}