diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2018-06-25 13:00:33 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2018-06-26 15:07:47 +0200 |
| commit | 90a9859a69621df4f120205ed341cc047689dc71 (patch) | |
| tree | 24185da431c7338e9c41cf2be52dd5dedad25e6b /socket-relay | |
| parent | 1d2d4c2605f537d929c5677c8aee5a3d80ae9c90 (diff) | |
| download | mullvadvpn-90a9859a69621df4f120205ed341cc047689dc71.tar.xz mullvadvpn-90a9859a69621df4f120205ed341cc047689dc71.zip | |
Remove unused socket-relay crate
Diffstat (limited to 'socket-relay')
| -rw-r--r-- | socket-relay/Cargo.toml | 16 | ||||
| -rw-r--r-- | socket-relay/src/lib.rs | 8 | ||||
| -rw-r--r-- | socket-relay/src/main.rs | 53 | ||||
| -rw-r--r-- | socket-relay/src/udp.rs | 230 | ||||
| -rw-r--r-- | socket-relay/tests/forward_self.rs | 55 |
5 files changed, 0 insertions, 362 deletions
diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml deleted file mode 100644 index b40c72f6f2..0000000000 --- a/socket-relay/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "socket-relay" -version = "0.1.0" -authors = ["Mullvad VPN <admin@mullvad.net>", "Linus Färnstrand <linus@mullvad.net>"] -description = "A relay for bidirectional sockets. Listens to one address and relays all traffic to another." -license = "MIT/Apache-2.0" -keywords = ["socket", "network", "forward", "relay", "proxy"] -categories = ["network-programming", "asynchronous"] - -[dependencies] -env_logger = "0.4.3" -error-chain = "0.11" -futures = "0.1.15" -log = "0.3" -tokio-core = "0.1" -tokio-timer = "0.1" diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs deleted file mode 100644 index c0b930e2dc..0000000000 --- a/socket-relay/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -extern crate futures; -#[macro_use] -extern crate log; -extern crate tokio_core; -extern crate tokio_timer; - -/// UDP based relaying. -pub mod udp; diff --git a/socket-relay/src/main.rs b/socket-relay/src/main.rs deleted file mode 100644 index 851904b499..0000000000 --- a/socket-relay/src/main.rs +++ /dev/null @@ -1,53 +0,0 @@ -extern crate env_logger; -#[macro_use] -extern crate error_chain; -extern crate tokio_core; - -extern crate socket_relay; - -use std::env; -use std::thread; -use std::time::Duration; - -use tokio_core::reactor::Core; - -error_chain!{} - -quick_main!(run); -fn run() -> Result<()> { - env_logger::init().chain_err(|| "Failed to init logging")?; - - let listen_addr = env::args() - .nth(1) - .expect("Listen address as first argument") - .parse() - .expect("Invalid listen address format"); - let destination = env::args() - .nth(2) - .expect("Relay destination address as second argument") - .parse() - .expect("Invalid destination address format"); - let forward_bind_ip = env::args() - .nth(3) - .unwrap_or(String::from("0.0.0.0")) - .parse() - .unwrap(); - - let mut core = Core::new().chain_err(|| "Unable to create Tokio core")?; - let handle = core.handle(); - - let relay = socket_relay::udp::Relay::new(listen_addr, forward_bind_ip, destination, handle) - .chain_err(|| "Unable to init forwarder")?; - println!("Forwarder listening on {}", relay.listen_addr()); - - let close_handle = relay.close_handle(); - thread::spawn(move || { - thread::sleep(Duration::from_secs(20)); - println!("Closing relay"); - close_handle.close(); - }); - - let result = core.run(relay); - println!("result: {:?}", result); - Ok(()) -} diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs deleted file mode 100644 index f4417d15f0..0000000000 --- a/socket-relay/src/udp.rs +++ /dev/null @@ -1,230 +0,0 @@ -use futures; -use futures::future::{self, Future}; -use futures::sink::Sink; -use futures::stream::Stream; -use futures::sync::mpsc::{channel as sync_channel, Sender as SyncSender}; -use futures::unsync::mpsc::{channel as unsync_channel, Sender}; - -use std::io; -use std::net::{IpAddr, SocketAddr}; -use std::result::Result as StdResult; -use std::time::Duration; - -use tokio_core::net::{UdpCodec, UdpSocket}; -use tokio_core::reactor::Handle; -use tokio_timer::Timer; - -/// The amount of idle (no replies) time needed for the forwarding socket to close. -pub static FORWARD_TIMEOUT_MS: u64 = 8000; - -/// Number of slots in internal channel transfering responses back to clients. -pub static CLIENT_SINK_CHANNEL_SIZE: usize = 10; - - -pub struct Relay { - listen_addr: SocketAddr, - forwarding_future: Box<Future<Item = (), Error = io::Error>>, - close_handle: SyncSender<()>, -} - -impl Relay { - /// Sets up relaying from `listen_addr` to `destination_addr`. - /// - /// `forward_bind_ip` is the local IP the socket that sends to the destination binds to. - pub fn new( - listen_addr: SocketAddr, - forward_bind_ip: IpAddr, - destination_addr: SocketAddr, - handle: Handle, - ) -> Result<Relay, io::Error> { - let listen_socket = UdpSocket::bind(&listen_addr, &handle)?; - let listen_addr = listen_socket.local_addr()?; - debug!("Bound relay listening socket to {}", listen_addr); - - // Split the listening socket into a stream of incoming and a sink for outgoing datagrams. - let (client_sink, client_stream) = listen_socket.framed(ServerCodec).split(); - let (closable_client_stream, close_handle) = closable_stream(client_stream); - let client_sink_channel = create_client_sink_channel(client_sink, &handle); - - let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| { - let response_sink = client_sink_channel.clone().sink_map_err(|_| ()); - - if let Err(e) = Self::forward( - client_addr, - forward_bind_ip, - destination_addr, - data, - response_sink, - &handle, - ) { - error!("Unable to perform forwarding for {}: {}", client_addr, e); - }; - future::ok(()) - }); - - Ok(Relay { - listen_addr, - forwarding_future: Box::new(forwarding_future), - close_handle, - }) - } - - /// Forwards `data` to `destination` and streams all replies into `response_sink`. - fn forward<S>( - client_addr: SocketAddr, - bind_ip: IpAddr, - destination: SocketAddr, - data: Vec<u8>, - response_sink: S, - handle: &Handle, - ) -> io::Result<()> - where - S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = ()> + 'static, - { - let bind_addr = SocketAddr::new(bind_ip, 0); - let socket = UdpSocket::bind(&bind_addr, &handle)?; - trace!( - "Relaying {} byte datagram from {} to {}", - data.len(), - client_addr, - destination - ); - - let (forward_sink, forward_stream) = socket.framed(ServerCodec).split(); - - let send_future = forward_sink.send((destination, data)).map_err(|e| { - error!("Error while forwarding to destination addr: {}", e); - }); - - let recv_stream = forward_stream - .filter_map(move |(addr, data)| { - if addr == destination { - trace!( - "Returning {} byte response from {} to {}", - data.len(), - addr, - client_addr - ); - Some((client_addr, data)) - } else { - trace!( - "Discarding data from {}, expecting data from {}", - addr, - destination - ); - None - } - }) - .map_err(|e| error!("Error reading datagrams from forward socket: {}", e)); - - let timeout_recv_future = Timer::default() - .timeout_stream(recv_stream, Duration::from_millis(FORWARD_TIMEOUT_MS)) - .forward(response_sink) - .map(|_| ()); - - handle.spawn(send_future.and_then(|_| timeout_recv_future)); - Ok(()) - } - - pub fn listen_addr(&self) -> SocketAddr { - self.listen_addr - } - - pub fn close_handle(&self) -> RelayCloseHandle { - RelayCloseHandle(self.close_handle.clone()) - } -} - -impl Future for Relay { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { - self.forwarding_future.poll() - } -} - -impl Drop for Relay { - fn drop(&mut self) { - self.close_handle().close(); - } -} - -pub struct RelayCloseHandle(SyncSender<()>); - -impl RelayCloseHandle { - pub fn close(self) { - if self.0.send(()).wait().is_err() { - warn!("Relay already closed"); - } - } -} - - -fn closable_stream<S: Stream + 'static>( - stream: S, -) -> ( - Box<Stream<Item = S::Item, Error = S::Error>>, - SyncSender<()>, -) { - let (close_handle_sink, close_signal_stream) = sync_channel(0); - let close_signal_stream = close_signal_stream.map(|_| None).map_err(|_| None); - let mapped_stream = stream.map(|t| Some(t)).map_err(|e| Some(e)); - - let output_stream = mapped_stream - .select(close_signal_stream) - // Map close_signal_stream error as Ok(None) and stream error back to S::Error - .then(|element| match element { - Err(None) => Ok(None), - Err(Some(e)) => Err(e), - Ok(item) => Ok(item), - }) - // Make the stream end when signaled by close_signal_stream. - .take_while(|item| Ok(item.is_some())) - // Map Option<S::Item> to S::Item, we know it is a Some from the take_while above. - .map(|item| item.unwrap()); - (Box::new(output_stream), close_handle_sink) -} - - -/// Create a channel accepting tuples of `SocketAddr` and binary data and forward anything coming -/// on this channel to the `client_sink`. Returns the sender half of the channel. -fn create_client_sink_channel<S>(client_sink: S, handle: &Handle) -> Sender<(SocketAddr, Vec<u8>)> -where - S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = io::Error> + 'static, -{ - let (channel_sink, channel_stream) = unsync_channel(CLIENT_SINK_CHANNEL_SIZE); - - let forward_future = channel_stream - .map_err(|_| None) - .forward(client_sink.sink_map_err(|e| Some(e))) - .and_then(|_| Ok(())) - .map_err(|error: Option<io::Error>| match error { - Some(sink_error) => { - error!("Error sending response back to client: {}", sink_error); - } - None => debug!("Closing relay socket sink"), - }); - handle.spawn(forward_future); - - channel_sink -} - - -/// Internal struct implementing `Codec`. Just so it becomes possible to split a `UdpSocket` into -/// a sink and a stream. -struct ServerCodec; - -impl UdpCodec for ServerCodec { - type In = (SocketAddr, Vec<u8>); - type Out = (SocketAddr, Vec<u8>); - - fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> StdResult<Self::In, io::Error> { - Ok((*addr, buf.to_vec())) - } - - fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { - into.extend(buf); - addr - } -} diff --git a/socket-relay/tests/forward_self.rs b/socket-relay/tests/forward_self.rs deleted file mode 100644 index 4714e52404..0000000000 --- a/socket-relay/tests/forward_self.rs +++ /dev/null @@ -1,55 +0,0 @@ -extern crate env_logger; -extern crate socket_relay; -extern crate tokio_core; - -use std::net::{SocketAddr, UdpSocket}; -use std::sync::mpsc; -use std::thread; -use std::time::Duration; - -use socket_relay::udp::Relay; -use tokio_core::reactor::Core; - -#[test] -fn test() { - env_logger::init().unwrap(); - - let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - socket - .set_read_timeout(Some(Duration::from_secs(1))) - .unwrap(); - let mut buffer = [0; 100]; - - let relay_listen_addr = spawn_relay(socket.local_addr().unwrap()); - - let test_data = [9, 88, 5, 2]; - socket.send_to(&test_data, relay_listen_addr).unwrap(); - let (len1, relay_src1) = socket.recv_from(&mut buffer).unwrap(); - assert_eq!(&buffer[..len1], &test_data); - - let reply_test_data = [1, 2, 6, 100]; - socket.send_to(&reply_test_data, relay_src1).unwrap(); - let (len2, relay_src2) = socket.recv_from(&mut buffer).unwrap(); - assert_eq!(relay_src2, relay_listen_addr); - assert_eq!(&buffer[..len2], &reply_test_data); -} - -fn spawn_relay(destination_addr: SocketAddr) -> SocketAddr { - let (tx, rx) = mpsc::channel(); - thread::spawn(move || { - let mut core = Core::new().unwrap(); - let handle = core.handle(); - - let relay = Relay::new( - "127.0.0.1:0".parse().unwrap(), - "127.0.0.1".parse().unwrap(), - destination_addr, - handle, - ).unwrap(); - println!("Relay listening on {}", relay.listen_addr()); - tx.send(relay.listen_addr()).unwrap(); - let _ = core.run(relay); - println!("Relay exiting") - }); - rx.recv().unwrap() -} |
