diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-09-12 10:13:57 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-09-19 23:40:21 +0200 |
| commit | 0eb4a545cafb1ba73706bed99068e415c4a630b7 (patch) | |
| tree | 5f242775b9729bd1fae3e11197d13d411e6a94ce | |
| parent | c7199bd140f4769dde66dc6044ab0921d1330378 (diff) | |
| download | mullvadvpn-0eb4a545cafb1ba73706bed99068e415c4a630b7.tar.xz mullvadvpn-0eb4a545cafb1ba73706bed99068e415c4a630b7.zip | |
Add socket-relay crate with udp support
| -rw-r--r-- | socket-relay/Cargo.toml | 15 | ||||
| -rw-r--r-- | socket-relay/src/lib.rs | 7 | ||||
| -rw-r--r-- | socket-relay/src/main.rs | 48 | ||||
| -rw-r--r-- | socket-relay/src/udp.rs | 221 |
4 files changed, 291 insertions, 0 deletions
diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml new file mode 100644 index 0000000000..b7141bec8f --- /dev/null +++ b/socket-relay/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "socket-relay" +version = "0.1.0" +authors = ["Mullvad VPN <admin@mullvad.net>", "Linus Färnstrand <linus@mullvad.net>"] +description = "A relay for bidirectional sockets. Listens to one address and relays all traffic to another." +license = "MIT/Apache-2.0" +keywords = ["socket", "network", "forward", "relay", "proxy"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +env_logger = "0.4.3" +error-chain = "0.11" +futures = "0.1.15" +log = "0.3" +tokio-core = "0.1" diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs new file mode 100644 index 0000000000..a342cce2dc --- /dev/null +++ b/socket-relay/src/lib.rs @@ -0,0 +1,7 @@ +extern crate futures; +#[macro_use] +extern crate log; +extern crate tokio_core; + +/// UDP based relaying. +pub mod udp; diff --git a/socket-relay/src/main.rs b/socket-relay/src/main.rs new file mode 100644 index 0000000000..7eb6fd50db --- /dev/null +++ b/socket-relay/src/main.rs @@ -0,0 +1,48 @@ +extern crate env_logger; +#[macro_use] +extern crate error_chain; +extern crate tokio_core; + +extern crate socket_relay; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::thread; +use std::time::Duration; + +use tokio_core::reactor::Core; + +error_chain!{} + +quick_main!(run); +fn run() -> Result<()> { + env_logger::init().chain_err(|| "Failed to init logging")?; + + let listen_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let listen_port = 53; + let listen_addr = SocketAddr::new(listen_ip, listen_port); + + let forward_bind_ip = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + + let forward_ip = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)); + let forward_port = 53; + let forward_addr = SocketAddr::new(forward_ip, forward_port); + + + let mut core = Core::new().chain_err(|| "Unable to create Tokio core")?; + let handle = core.handle(); + + let relay = socket_relay::udp::Relay::new(listen_addr, forward_bind_ip, forward_addr, handle) + .chain_err(|| "Unable to init forwarder")?; + println!("Forwarder listening on {}", relay.listen_addr()); + + let close_handle = relay.close_handle(); + thread::spawn(move || { + thread::sleep(Duration::from_secs(20)); + println!("Closing relay"); + close_handle.close(); + }); + + let result = core.run(relay); + println!("result: {:?}", result); + Ok(()) +} diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs new file mode 100644 index 0000000000..cc9a9139a1 --- /dev/null +++ b/socket-relay/src/udp.rs @@ -0,0 +1,221 @@ +use futures; +use futures::future::{self, Future}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::mpsc::{channel as sync_channel, Sender as SyncSender}; +use futures::unsync::mpsc::{channel as unsync_channel, Sender}; + +use std::io; +use std::net::{IpAddr, SocketAddr}; +use std::result::Result as StdResult; +use std::time::Duration; + +use tokio_core::net::{UdpCodec, UdpSocket}; +use tokio_core::reactor::{Handle, Timeout}; + +/// The amount of time the forwarding socket is open for replies. +pub static FORWARD_TIMEOUT_MS: u64 = 60000; + +/// Number of slots in internal channel transfering responses back to clients. +pub static CLIENT_SINK_CHANNEL_SIZE: usize = 10; + + +pub struct Relay { + listen_addr: SocketAddr, + forwarding_future: Box<Future<Item = (), Error = io::Error>>, + close_handle: SyncSender<()>, +} + +impl Relay { + /// Sets up relaying from `listen_addr` to `destination_addr`. + /// + /// `forward_bind_ip` is the local IP the socket that sends to the destination binds to. + pub fn new( + listen_addr: SocketAddr, + forward_bind_ip: IpAddr, + destination_addr: SocketAddr, + handle: Handle, + ) -> Result<Relay, io::Error> { + let listen_socket = UdpSocket::bind(&listen_addr, &handle)?; + let listen_addr = listen_socket.local_addr()?; + debug!("Bound relay listening socket to {}", listen_addr); + + // Split the listening socket into a stream of incoming and a sink for outgoing datagrams. + let (client_sink, client_stream) = listen_socket.framed(ServerCodec).split(); + let (closable_client_stream, close_handle) = closable_stream(client_stream); + let client_sink_channel = create_client_sink_channel(client_sink, &handle); + + let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| { + let response_sink = client_sink_channel + .clone() + .sink_map_err(|_| ()) + .with(move |data| future::ok((client_addr, data))); + + if let Err(e) = Self::forward( + forward_bind_ip, + destination_addr, + data, + response_sink, + &handle, + ) { + error!("Unable to perform forwarding for {}: {}", client_addr, e); + }; + future::ok(()) + }); + + Ok(Relay { + listen_addr, + forwarding_future: Box::new(forwarding_future), + close_handle, + }) + } + + /// Forwards `data` to `destination` and streams all replies into `response_sink`. + fn forward<S>( + bind_ip: IpAddr, + destination: SocketAddr, + data: Vec<u8>, + response_sink: S, + handle: &Handle, + ) -> io::Result<()> + where + S: Sink<SinkItem = Vec<u8>, SinkError = ()> + 'static, + { + let bind_addr = SocketAddr::new(bind_ip, 0); + let socket = UdpSocket::bind(&bind_addr, &handle)?; + trace!( + "Datagram with {} bytes forwarded from {} to {}", + data.len(), + socket.local_addr().unwrap_or(bind_addr), + destination + ); + + let (forward_sink, forward_stream) = socket.framed(ServerCodec).split(); + + let send_future = forward_sink.send((destination, data)).map_err(|e| { + error!("Error while forwarding to destination addr: {}", e); + }); + + let recv_future = forward_stream + .filter_map(move |(addr, data)| if addr == destination { + Some(data) + } else { + None + }) + .map_err(|e| { + error!("Error reading datagrams from forward socket: {}", e) + }) + .forward(response_sink) + .map(|_| ()); + + let network_future = send_future.and_then(|_| recv_future); + + let timeout = + Timeout::new(Duration::from_millis(FORWARD_TIMEOUT_MS), &handle)?.map_err(|_| ()); + + handle.spawn(network_future.select(timeout).map(|_| ()).map_err(|_| ())); + Ok(()) + } + + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn close_handle(&self) -> RelayCloseHandle { + RelayCloseHandle(self.close_handle.clone()) + } +} + +impl Future for Relay { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { + self.forwarding_future.poll() + } +} + +impl Drop for Relay { + fn drop(&mut self) { + self.close_handle().close(); + } +} + +pub struct RelayCloseHandle(SyncSender<()>); + +impl RelayCloseHandle { + pub fn close(self) { + if self.0.send(()).wait().is_err() { + warn!("Relay already closed"); + } + } +} + + +fn closable_stream<S: Stream + 'static>( + stream: S, +) -> ( + Box<Stream<Item = S::Item, Error = S::Error>>, + SyncSender<()>, +) { + let (close_handle_sink, close_signal_stream) = sync_channel(0); + let close_signal_stream = close_signal_stream.map(|_| None).map_err(|_| None); + let mapped_stream = stream.map(|t| Some(t)).map_err(|e| Some(e)); + + let output_stream = mapped_stream + .select(close_signal_stream) + // Map close_signal_stream error as Ok(None) and stream error back to S::Error + .then(|element| match element { + Err(None) => Ok(None), + Err(Some(e)) => Err(e), + Ok(item) => Ok(item), + }) + // Make the stream end when signaled by close_signal_stream. + .take_while(|item| Ok(item.is_some())) + // Map Option<S::Item> to S::Item, we know it is a Some from the take_while above. + .map(|item| item.unwrap()); + (Box::new(output_stream), close_handle_sink) +} + + +/// Create a channel accepting tuples of `SocketAddr` and binary data and forward anything coming +/// on this channel to the `client_sink`. Returns the sender half of the channel. +fn create_client_sink_channel<S>(client_sink: S, handle: &Handle) -> Sender<(SocketAddr, Vec<u8>)> +where + S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = io::Error> + 'static, +{ + let (channel_sink, channel_stream) = unsync_channel(CLIENT_SINK_CHANNEL_SIZE); + + let forward_future = channel_stream + .map_err(|_| None) + .forward(client_sink.sink_map_err(|e| Some(e))) + .and_then(|_| Ok(())) + .map_err(|error: Option<io::Error>| match error { + Some(sink_error) => { + error!("Error sending response back to client: {}", sink_error); + } + None => debug!("Closing relay socket sink"), + }); + handle.spawn(forward_future); + + channel_sink +} + + +/// Internal struct implementing `Codec`. Just so it becomes possible to split a `UdpSocket` into +/// a sink and a stream. +struct ServerCodec; + +impl UdpCodec for ServerCodec { + type In = (SocketAddr, Vec<u8>); + type Out = (SocketAddr, Vec<u8>); + + fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> StdResult<Self::In, io::Error> { + Ok((*addr, buf.to_vec())) + } + + fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { + into.extend(buf); + addr + } +} |
