diff options
| -rw-r--r-- | socket-relay/Cargo.toml | 1 | ||||
| -rw-r--r-- | socket-relay/src/lib.rs | 1 | ||||
| -rw-r--r-- | socket-relay/src/udp.rs | 34 |
3 files changed, 19 insertions, 17 deletions
diff --git a/socket-relay/Cargo.toml b/socket-relay/Cargo.toml index b7141bec8f..b40c72f6f2 100644 --- a/socket-relay/Cargo.toml +++ b/socket-relay/Cargo.toml @@ -13,3 +13,4 @@ error-chain = "0.11" futures = "0.1.15" log = "0.3" tokio-core = "0.1" +tokio-timer = "0.1" diff --git a/socket-relay/src/lib.rs b/socket-relay/src/lib.rs index a342cce2dc..c0b930e2dc 100644 --- a/socket-relay/src/lib.rs +++ b/socket-relay/src/lib.rs @@ -2,6 +2,7 @@ extern crate futures; #[macro_use] extern crate log; extern crate tokio_core; +extern crate tokio_timer; /// UDP based relaying. pub mod udp; diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs index cc9a9139a1..00c5a95417 100644 --- a/socket-relay/src/udp.rs +++ b/socket-relay/src/udp.rs @@ -11,9 +11,10 @@ use std::result::Result as StdResult; use std::time::Duration; use tokio_core::net::{UdpCodec, UdpSocket}; -use tokio_core::reactor::{Handle, Timeout}; +use tokio_core::reactor::Handle; +use tokio_timer::Timer; -/// The amount of time the forwarding socket is open for replies. +/// The amount of idle (no replies) time needed for the forwarding socket to close. pub static FORWARD_TIMEOUT_MS: u64 = 60000; /// Number of slots in internal channel transfering responses back to clients. @@ -96,24 +97,23 @@ impl Relay { error!("Error while forwarding to destination addr: {}", e); }); - let recv_future = forward_stream - .filter_map(move |(addr, data)| if addr == destination { - Some(data) - } else { - None - }) - .map_err(|e| { - error!("Error reading datagrams from forward socket: {}", e) - }) + let recv_future = Timer::default() + .timeout_stream( + forward_stream + .filter_map(move |(addr, data)| if addr == destination { + Some(data) + } else { + None + }) + .map_err(|e| { + error!("Error reading datagrams from forward socket: {}", e) + }), + Duration::from_millis(FORWARD_TIMEOUT_MS), + ) .forward(response_sink) .map(|_| ()); - let network_future = send_future.and_then(|_| recv_future); - - let timeout = - Timeout::new(Duration::from_millis(FORWARD_TIMEOUT_MS), &handle)?.map_err(|_| ()); - - handle.spawn(network_future.select(timeout).map(|_| ()).map_err(|_| ())); + handle.spawn(send_future.and_then(|_| recv_future)); Ok(()) } |
