summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-01-25 11:55:02 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-01-26 16:28:08 +0100
commitb29b5ab4a6d3bd33d5d5242ca0593cec55f77045 (patch)
treec0e7a03cf36bd2d5e7a5b4c15b8287e15d671f59
parent43a89edb9feaed2d5595c92424f29de3cc10999e (diff)
downloadmullvadvpn-b29b5ab4a6d3bd33d5d5242ca0593cec55f77045.tar.xz
mullvadvpn-b29b5ab4a6d3bd33d5d5242ca0593cec55f77045.zip
Add unit tests for AbortableStream
-rw-r--r--mullvad-rpc/Cargo.toml2
-rw-r--r--mullvad-rpc/src/abortable_stream.rs74
2 files changed, 75 insertions, 1 deletions
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml
index a8ae423877..05c8933c0d 100644
--- a/mullvad-rpc/Cargo.toml
+++ b/mullvad-rpc/Cargo.toml
@@ -24,7 +24,7 @@ regex = "1"
serde = "1"
serde_json = "1.0"
hyper-rustls = "0.23"
-tokio = { version = "1.8", features = [ "macros", "time", "rt-multi-thread", "net", "io-std", "fs" ] }
+tokio = { version = "1.8", features = [ "macros", "time", "rt-multi-thread", "net", "io-std", "io-util", "fs" ] }
tokio-rustls = "0.23"
rustls-pemfile = "0.2"
urlencoding = "1"
diff --git a/mullvad-rpc/src/abortable_stream.rs b/mullvad-rpc/src/abortable_stream.rs
index 57f6556503..8754dc8cf1 100644
--- a/mullvad-rpc/src/abortable_stream.rs
+++ b/mullvad-rpc/src/abortable_stream.rs
@@ -141,3 +141,77 @@ where
self.stream.connected()
}
}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::time::Duration;
+ use tokio::io::AsyncReadExt;
+
+ /// Test whether the abort handle stops the stream.
+ #[test]
+ fn test_abort() {
+ let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime");
+
+ let (client, _server) = tokio::io::duplex(64);
+
+ runtime.block_on(async move {
+ let (mut stream, abort_handle) = AbortableStream::new(client, None);
+
+ let stream_task = tokio::spawn(async move {
+ let mut buf = vec![];
+ stream.read_to_end(&mut buf).await
+ });
+
+ abort_handle.close();
+ let result = tokio::time::timeout(Duration::from_secs(1), stream_task)
+ .await
+ .unwrap();
+ assert!(
+ matches!(result, Ok(Err(error)) if error.kind() == io::ErrorKind::ConnectionReset)
+ );
+ });
+ }
+
+ /// Test whether the shutdown signal is sent when the stream is explicitly closed.
+ #[test]
+ fn test_shutdown_signal() {
+ let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime");
+
+ let (client, _server) = tokio::io::duplex(64);
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+
+ runtime.block_on(async move {
+ let (_stream, abort_handle) = AbortableStream::new(client, Some(shutdown_tx));
+ abort_handle.close();
+ assert!(tokio::time::timeout(Duration::from_secs(1), shutdown_rx)
+ .await
+ .unwrap()
+ .is_ok());
+ });
+ }
+
+ /// Test whether the shutdown signal is sent when the stream stops on its own.
+ #[test]
+ fn test_shutdown_signal_normal() {
+ let runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime");
+
+ let (client, server) = tokio::io::duplex(64);
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+
+ runtime.block_on(async move {
+ let (mut stream, _abort_handle) = AbortableStream::new(client, Some(shutdown_tx));
+
+ tokio::spawn(async move {
+ drop(server);
+ let mut buf = vec![];
+ stream.read_to_end(&mut buf).await
+ });
+
+ assert!(tokio::time::timeout(Duration::from_secs(1), shutdown_rx)
+ .await
+ .unwrap()
+ .is_ok());
+ });
+ }
+}