diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-01-25 11:55:02 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-01-26 16:28:08 +0100 |
| commit | b29b5ab4a6d3bd33d5d5242ca0593cec55f77045 (patch) | |
| tree | c0e7a03cf36bd2d5e7a5b4c15b8287e15d671f59 | |
| parent | 43a89edb9feaed2d5595c92424f29de3cc10999e (diff) | |
| download | mullvadvpn-b29b5ab4a6d3bd33d5d5242ca0593cec55f77045.tar.xz mullvadvpn-b29b5ab4a6d3bd33d5d5242ca0593cec55f77045.zip | |
Add unit tests for AbortableStream
| -rw-r--r-- | mullvad-rpc/Cargo.toml | 2 | ||||
| -rw-r--r-- | mullvad-rpc/src/abortable_stream.rs | 74 |
2 files changed, 75 insertions, 1 deletions
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml index a8ae423877..05c8933c0d 100644 --- a/mullvad-rpc/Cargo.toml +++ b/mullvad-rpc/Cargo.toml @@ -24,7 +24,7 @@ regex = "1" serde = "1" serde_json = "1.0" hyper-rustls = "0.23" -tokio = { version = "1.8", features = [ "macros", "time", "rt-multi-thread", "net", "io-std", "fs" ] } +tokio = { version = "1.8", features = [ "macros", "time", "rt-multi-thread", "net", "io-std", "io-util", "fs" ] } tokio-rustls = "0.23" rustls-pemfile = "0.2" urlencoding = "1" diff --git a/mullvad-rpc/src/abortable_stream.rs b/mullvad-rpc/src/abortable_stream.rs index 57f6556503..8754dc8cf1 100644 --- a/mullvad-rpc/src/abortable_stream.rs +++ b/mullvad-rpc/src/abortable_stream.rs @@ -141,3 +141,77 @@ where self.stream.connected() } } + +#[cfg(test)] +mod test { + use super::*; + use std::time::Duration; + use tokio::io::AsyncReadExt; + + /// Test whether the abort handle stops the stream. + #[test] + fn test_abort() { + let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime"); + + let (client, _server) = tokio::io::duplex(64); + + runtime.block_on(async move { + let (mut stream, abort_handle) = AbortableStream::new(client, None); + + let stream_task = tokio::spawn(async move { + let mut buf = vec![]; + stream.read_to_end(&mut buf).await + }); + + abort_handle.close(); + let result = tokio::time::timeout(Duration::from_secs(1), stream_task) + .await + .unwrap(); + assert!( + matches!(result, Ok(Err(error)) if error.kind() == io::ErrorKind::ConnectionReset) + ); + }); + } + + /// Test whether the shutdown signal is sent when the stream is explicitly closed. + #[test] + fn test_shutdown_signal() { + let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime"); + + let (client, _server) = tokio::io::duplex(64); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + runtime.block_on(async move { + let (_stream, abort_handle) = AbortableStream::new(client, Some(shutdown_tx)); + abort_handle.close(); + assert!(tokio::time::timeout(Duration::from_secs(1), shutdown_rx) + .await + .unwrap() + .is_ok()); + }); + } + + /// Test whether the shutdown signal is sent when the stream stops on its own. + #[test] + fn test_shutdown_signal_normal() { + let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime"); + + let (client, server) = tokio::io::duplex(64); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + runtime.block_on(async move { + let (mut stream, _abort_handle) = AbortableStream::new(client, Some(shutdown_tx)); + + tokio::spawn(async move { + drop(server); + let mut buf = vec![]; + stream.read_to_end(&mut buf).await + }); + + assert!(tokio::time::timeout(Duration::from_secs(1), shutdown_rx) + .await + .unwrap() + .is_ok()); + }); + } +} |
