diff options
| -rw-r--r-- | socket-relay/src/udp.rs | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/socket-relay/src/udp.rs b/socket-relay/src/udp.rs index 00c5a95417..be36982db9 100644 --- a/socket-relay/src/udp.rs +++ b/socket-relay/src/udp.rs @@ -49,10 +49,10 @@ impl Relay { let forwarding_future = closable_client_stream.for_each(move |(client_addr, data)| { let response_sink = client_sink_channel .clone() - .sink_map_err(|_| ()) - .with(move |data| future::ok((client_addr, data))); + .sink_map_err(|_| ()); if let Err(e) = Self::forward( + client_addr, forward_bind_ip, destination_addr, data, @@ -73,6 +73,7 @@ impl Relay { /// Forwards `data` to `destination` and streams all replies into `response_sink`. fn forward<S>( + client_addr: SocketAddr, bind_ip: IpAddr, destination: SocketAddr, data: Vec<u8>, @@ -80,14 +81,14 @@ impl Relay { handle: &Handle, ) -> io::Result<()> where - S: Sink<SinkItem = Vec<u8>, SinkError = ()> + 'static, + S: Sink<SinkItem = (SocketAddr, Vec<u8>), SinkError = ()> + 'static, { let bind_addr = SocketAddr::new(bind_ip, 0); let socket = UdpSocket::bind(&bind_addr, &handle)?; trace!( - "Datagram with {} bytes forwarded from {} to {}", + "Relaying {} byte datagram from {} to {}", data.len(), - socket.local_addr().unwrap_or(bind_addr), + client_addr, destination ); @@ -101,7 +102,13 @@ impl Relay { .timeout_stream( forward_stream .filter_map(move |(addr, data)| if addr == destination { - Some(data) + trace!( + "Returning {} byte response from {} to {}", + data.len(), + addr, + client_addr + ); + Some((client_addr, data)) } else { None }) |
