summaryrefslogtreecommitdiffhomepage
path: root/socket-relay
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2018-06-25 13:00:33 +0200
committerLinus Färnstrand <linus@mullvad.net>2018-06-26 15:07:47 +0200
commit90a9859a69621df4f120205ed341cc047689dc71 (patch)
tree24185da431c7338e9c41cf2be52dd5dedad25e6b /socket-relay
parent1d2d4c2605f537d929c5677c8aee5a3d80ae9c90 (diff)
downloadmullvadvpn-90a9859a69621df4f120205ed341cc047689dc71.tar.xz
mullvadvpn-90a9859a69621df4f120205ed341cc047689dc71.zip
Remove unused socket-relay crate
Diffstat (limited to 'socket-relay')
-rw-r--r--socket-relay/Cargo.toml16
-rw-r--r--socket-relay/src/lib.rs8
-rw-r--r--socket-relay/src/main.rs53
-rw-r--r--socket-relay/src/udp.rs230
-rw-r--r--socket-relay/tests/forward_self.rs55
5 files changed, 0 insertions, 362 deletions
diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml
deleted file mode 100644
index b40c72f6f2..0000000000
--- a/socket-relay/Cargo.toml
+++ /dev/null
@@ -1,16 +0,0 @@
-[package]
-name = "socket-relay"
-version = "0.1.0"
-authors = ["Mullvad VPN <admin@mullvad.net>", "Linus Färnstrand <linus@mullvad.net>"]
-description = "A relay for bidirectional sockets. Listens to one address and relays all traffic to another."
-license = "MIT/Apache-2.0"
-keywords = ["socket", "network", "forward", "relay", "proxy"]
-categories = ["network-programming", "asynchronous"]
-
-[dependencies]
-env_logger = "0.4.3"
-error-chain = "0.11"
-futures = "0.1.15"
-log = "0.3"
-tokio-core = "0.1"
-tokio-timer = "0.1"
diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs
deleted file mode 100644
index c0b930e2dc..0000000000
--- a/socket-relay/src/lib.rs
+++ /dev/null
@@ -1,8 +0,0 @@
-extern crate futures;
-#[macro_use]
-extern crate log;
-extern crate tokio_core;
-extern crate tokio_timer;
-
-/// UDP based relaying.
-pub mod udp;
diff --git a/socket-relay/src/main.rs b/socket-relay/src/main.rs
deleted file mode 100644
index 851904b499..0000000000
--- a/socket-relay/src/main.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-extern crate env_logger;
-#[macro_use]
-extern crate error_chain;
-extern crate tokio_core;
-
-extern crate socket_relay;
-
-use std::env;
-use std::thread;
-use std::time::Duration;
-
-use tokio_core::reactor::Core;
-
-error_chain!{}
-
-quick_main!(run);
-fn run() -> Result<()> {
- env_logger::init().chain_err(|| "Failed to init logging")?;
-
- let listen_addr = env::args()
- .nth(1)
- .expect("Listen address as first argument")
- .parse()
- .expect("Invalid listen address format");
- let destination = env::args()
- .nth(2)
- .expect("Relay destination address as second argument")
- .parse()
- .expect("Invalid destination address format");
- let forward_bind_ip = env::args()
- .nth(3)
- .unwrap_or(String::from("0.0.0.0"))
- .parse()
- .unwrap();
-
- let mut core = Core::new().chain_err(|| "Unable to create Tokio core")?;
- let handle = core.handle();
-
- let relay = socket_relay::udp::Relay::new(listen_addr, forward_bind_ip, destination, handle)
- .chain_err(|| "Unable to init forwarder")?;
- println!("Forwarder listening on {}", relay.listen_addr());
-
- let close_handle = relay.close_handle();
- thread::spawn(move || {
- thread::sleep(Duration::from_secs(20));
- println!("Closing relay");
- close_handle.close();
- });
-
- let result = core.run(relay);
- println!("result: {:?}", result);
- Ok(())
-}
diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs
deleted file mode 100644
index f4417d15f0..0000000000
--- a/socket-relay/src/udp.rs
+++ /dev/null
@@ -1,230 +0,0 @@
-use futures;
-use futures::future::{self, Future};
-use futures::sink::Sink;
-use futures::stream::Stream;
-use futures::sync::mpsc::{channel as sync_channel, Sender as SyncSender};
-use futures::unsync::mpsc::{channel as unsync_channel, Sender};
-
-use std::io;
-use std::net::{IpAddr, SocketAddr};
-use std::result::Result as StdResult;
-use std::time::Duration;
-
-use tokio_core::net::{UdpCodec, UdpSocket};
-use tokio_core::reactor::Handle;
-use tokio_timer::Timer;
-
-/// The amount of idle (no replies) time needed for the forwarding socket to close.
-pub static FORWARD_TIMEOUT_MS: u64 = 8000;
-
-/// Number of slots in internal channel transfering responses back to clients.
-pub static CLIENT_SINK_CHANNEL_SIZE: usize = 10;
-
-
-pub struct Relay {
- listen_addr: SocketAddr,
- forwarding_future: Box<Future<Item = (), Error = io::Error>>,
- close_handle: SyncSender<()>,
-}
-
-impl Relay {
- /// Sets up relaying from `listen_addr` to `destination_addr`.
- ///
- /// `forward_bind_ip` is the local IP the socket that sends to the destination binds to.
- pub fn new(
- listen_addr: SocketAddr,
- forward_bind_ip: IpAddr,
- destination_addr: SocketAddr,
- handle: Handle,
- ) -> Result<Relay, io::Error> {
- let listen_socket = UdpSocket::bind(&listen_addr, &handle)?;
- let listen_addr = listen_socket.local_addr()?;
- debug!("Bound relay listening socket to {}", listen_addr);
-
- // Split the listening socket into a stream of incoming and a sink for outgoing datagrams.
- let (client_sink, client_stream) = listen_socket.framed(ServerCodec).split();
- let (closable_client_stream, close_handle) = closable_stream(client_stream);
- let client_sink_channel = create_client_sink_channel(client_sink, &handle);
-
- let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| {
- let response_sink = client_sink_channel.clone().sink_map_err(|_| ());
-
- if let Err(e) = Self::forward(
- client_addr,
- forward_bind_ip,
- destination_addr,
- data,
- response_sink,
- &handle,
- ) {
- error!("Unable to perform forwarding for {}: {}", client_addr, e);
- };
- future::ok(())
- });
-
- Ok(Relay {
- listen_addr,
- forwarding_future: Box::new(forwarding_future),
- close_handle,
- })
- }
-
- /// Forwards `data` to `destination` and streams all replies into `response_sink`.
- fn forward<S>(
- client_addr: SocketAddr,
- bind_ip: IpAddr,
- destination: SocketAddr,
- data: Vec<u8>,
- response_sink: S,
- handle: &Handle,
- ) -> io::Result<()>
- where
- S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = ()> + 'static,
- {
- let bind_addr = SocketAddr::new(bind_ip, 0);
- let socket = UdpSocket::bind(&bind_addr, &handle)?;
- trace!(
- "Relaying {} byte datagram from {} to {}",
- data.len(),
- client_addr,
- destination
- );
-
- let (forward_sink, forward_stream) = socket.framed(ServerCodec).split();
-
- let send_future = forward_sink.send((destination, data)).map_err(|e| {
- error!("Error while forwarding to destination addr: {}", e);
- });
-
- let recv_stream = forward_stream
- .filter_map(move |(addr, data)| {
- if addr == destination {
- trace!(
- "Returning {} byte response from {} to {}",
- data.len(),
- addr,
- client_addr
- );
- Some((client_addr, data))
- } else {
- trace!(
- "Discarding data from {}, expecting data from {}",
- addr,
- destination
- );
- None
- }
- })
- .map_err(|e| error!("Error reading datagrams from forward socket: {}", e));
-
- let timeout_recv_future = Timer::default()
- .timeout_stream(recv_stream, Duration::from_millis(FORWARD_TIMEOUT_MS))
- .forward(response_sink)
- .map(|_| ());
-
- handle.spawn(send_future.and_then(|_| timeout_recv_future));
- Ok(())
- }
-
- pub fn listen_addr(&self) -> SocketAddr {
- self.listen_addr
- }
-
- pub fn close_handle(&self) -> RelayCloseHandle {
- RelayCloseHandle(self.close_handle.clone())
- }
-}
-
-impl Future for Relay {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
- self.forwarding_future.poll()
- }
-}
-
-impl Drop for Relay {
- fn drop(&mut self) {
- self.close_handle().close();
- }
-}
-
-pub struct RelayCloseHandle(SyncSender<()>);
-
-impl RelayCloseHandle {
- pub fn close(self) {
- if self.0.send(()).wait().is_err() {
- warn!("Relay already closed");
- }
- }
-}
-
-
-fn closable_stream<S: Stream + 'static>(
- stream: S,
-) -> (
- Box<Stream<Item = S::Item, Error = S::Error>>,
- SyncSender<()>,
-) {
- let (close_handle_sink, close_signal_stream) = sync_channel(0);
- let close_signal_stream = close_signal_stream.map(|_| None).map_err(|_| None);
- let mapped_stream = stream.map(|t| Some(t)).map_err(|e| Some(e));
-
- let output_stream = mapped_stream
- .select(close_signal_stream)
- // Map close_signal_stream error as Ok(None) and stream error back to S::Error
- .then(|element| match element {
- Err(None) => Ok(None),
- Err(Some(e)) => Err(e),
- Ok(item) => Ok(item),
- })
- // Make the stream end when signaled by close_signal_stream.
- .take_while(|item| Ok(item.is_some()))
- // Map Option<S::Item> to S::Item, we know it is a Some from the take_while above.
- .map(|item| item.unwrap());
- (Box::new(output_stream), close_handle_sink)
-}
-
-
-/// Create a channel accepting tuples of `SocketAddr` and binary data and forward anything coming
-/// on this channel to the `client_sink`. Returns the sender half of the channel.
-fn create_client_sink_channel<S>(client_sink: S, handle: &Handle) -> Sender<(SocketAddr, Vec<u8>)>
-where
- S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = io::Error> + 'static,
-{
- let (channel_sink, channel_stream) = unsync_channel(CLIENT_SINK_CHANNEL_SIZE);
-
- let forward_future = channel_stream
- .map_err(|_| None)
- .forward(client_sink.sink_map_err(|e| Some(e)))
- .and_then(|_| Ok(()))
- .map_err(|error: Option<io::Error>| match error {
- Some(sink_error) => {
- error!("Error sending response back to client: {}", sink_error);
- }
- None => debug!("Closing relay socket sink"),
- });
- handle.spawn(forward_future);
-
- channel_sink
-}
-
-
-/// Internal struct implementing `Codec`. Just so it becomes possible to split a `UdpSocket` into
-/// a sink and a stream.
-struct ServerCodec;
-
-impl UdpCodec for ServerCodec {
- type In = (SocketAddr, Vec<u8>);
- type Out = (SocketAddr, Vec<u8>);
-
- fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> StdResult<Self::In, io::Error> {
- Ok((*addr, buf.to_vec()))
- }
-
- fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
- into.extend(buf);
- addr
- }
-}
diff --git a/socket-relay/tests/forward_self.rs b/socket-relay/tests/forward_self.rs
deleted file mode 100644
index 4714e52404..0000000000
--- a/socket-relay/tests/forward_self.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-extern crate env_logger;
-extern crate socket_relay;
-extern crate tokio_core;
-
-use std::net::{SocketAddr, UdpSocket};
-use std::sync::mpsc;
-use std::thread;
-use std::time::Duration;
-
-use socket_relay::udp::Relay;
-use tokio_core::reactor::Core;
-
-#[test]
-fn test() {
- env_logger::init().unwrap();
-
- let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
- socket
- .set_read_timeout(Some(Duration::from_secs(1)))
- .unwrap();
- let mut buffer = [0; 100];
-
- let relay_listen_addr = spawn_relay(socket.local_addr().unwrap());
-
- let test_data = [9, 88, 5, 2];
- socket.send_to(&test_data, relay_listen_addr).unwrap();
- let (len1, relay_src1) = socket.recv_from(&mut buffer).unwrap();
- assert_eq!(&buffer[..len1], &test_data);
-
- let reply_test_data = [1, 2, 6, 100];
- socket.send_to(&reply_test_data, relay_src1).unwrap();
- let (len2, relay_src2) = socket.recv_from(&mut buffer).unwrap();
- assert_eq!(relay_src2, relay_listen_addr);
- assert_eq!(&buffer[..len2], &reply_test_data);
-}
-
-fn spawn_relay(destination_addr: SocketAddr) -> SocketAddr {
- let (tx, rx) = mpsc::channel();
- thread::spawn(move || {
- let mut core = Core::new().unwrap();
- let handle = core.handle();
-
- let relay = Relay::new(
- "127.0.0.1:0".parse().unwrap(),
- "127.0.0.1".parse().unwrap(),
- destination_addr,
- handle,
- ).unwrap();
- println!("Relay listening on {}", relay.listen_addr());
- tx.send(relay.listen_addr()).unwrap();
- let _ = core.run(relay);
- println!("Relay exiting")
- });
- rx.recv().unwrap()
-}