diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-06-21 03:30:42 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-06-21 03:30:42 +0200 |
| commit | 856faacc8304b9fcb6e1ecb8846081fa6279c9fe (patch) | |
| tree | 181db8a0ceb87ded84d2b75c1fa464a37bb4ae00 | |
| parent | 14e3220ec993e2be9c6f37a65710b887bae5f49e (diff) | |
| parent | 16889f62a3c54a8540f7fd9b064c154f002e4233 (diff) | |
| download | mullvadvpn-856faacc8304b9fcb6e1ecb8846081fa6279c9fe.tar.xz mullvadvpn-856faacc8304b9fcb6e1ecb8846081fa6279c9fe.zip | |
Merge branch 'multiplex-channel'
| -rw-r--r-- | mullvad_daemon/src/main.rs | 8 | ||||
| -rw-r--r-- | mullvad_daemon/src/management_interface.rs | 23 | ||||
| -rw-r--r-- | talpid_core/src/lib.rs | 3 | ||||
| -rw-r--r-- | talpid_core/src/mpsc.rs | 76 |
4 files changed, 97 insertions, 13 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index 1177e05bd7..bfbc3de531 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -30,6 +30,7 @@ use states::{SecurityState, TargetState}; use std::sync::{Arc, Mutex, mpsc}; use std::thread; +use talpid_core::mpsc::IntoSender; use talpid_core::net::RemoteAddr; use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; @@ -137,16 +138,17 @@ impl Daemon { // Returns a handle that allows notifying all subscribers on events. fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) -> Result<management_interface::EventBroadcaster> { - let server = Self::start_management_interface_server(event_tx.clone())?; + let multiplex_event_tx = IntoSender::from(event_tx.clone()); + let server = Self::start_management_interface_server(multiplex_event_tx)?; let event_broadcaster = server.event_broadcaster(); Self::spawn_management_interface_wait_thread(server, event_tx); Ok(event_broadcaster) } - fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>) + fn start_management_interface_server(event_tx: IntoSender<TunnelCommand, DaemonEvent>) -> Result<ManagementInterfaceServer> { let server = - ManagementInterfaceServer::start(event_tx.clone()) + ManagementInterfaceServer::start(event_tx) .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; info!( "Mullvad management interface listening on {}", diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs index 7e66205862..6c41c85793 100644 --- a/mullvad_daemon/src/management_interface.rs +++ b/mullvad_daemon/src/management_interface.rs @@ -9,8 +9,9 @@ use states::{SecurityState, TargetState}; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, Mutex, RwLock, mpsc}; +use std::sync::{Arc, Mutex, RwLock}; +use talpid_core::mpsc::IntoSender; use talpid_ipc; use uuid; @@ -109,7 +110,9 @@ pub struct ManagementInterfaceServer { } impl ManagementInterfaceServer { - pub fn start(tunnel_tx: mpsc::Sender<::DaemonEvent>) -> talpid_ipc::Result<Self> { + pub fn start<T>(tunnel_tx: IntoSender<TunnelCommand, T>) -> talpid_ipc::Result<Self> + where T: From<TunnelCommand> + 'static + Send + { let rpc = ManagementInterface::new(tunnel_tx); let active_subscriptions = rpc.active_subscriptions.clone(); @@ -155,13 +158,13 @@ impl EventBroadcaster { } } -struct ManagementInterface { +struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { active_subscriptions: ActiveSubscriptions, - tx: Mutex<mpsc::Sender<::DaemonEvent>>, + tx: Mutex<IntoSender<TunnelCommand, T>>, } -impl ManagementInterface { - pub fn new(tx: mpsc::Sender<::DaemonEvent>) -> Self { +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { + pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self { ManagementInterface { active_subscriptions: Default::default(), tx: Mutex::new(tx), @@ -169,7 +172,7 @@ impl ManagementInterface { } } -impl ManagementInterfaceApi for ManagementInterface { +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { type Metadata = Meta; fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> { @@ -202,7 +205,7 @@ impl ManagementInterfaceApi for ManagementInterface { self.tx .lock() .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Secured).into()) + .send(TunnelCommand::SetTargetState(TargetState::Secured)) .map_err(|_| Error::internal_error()) } @@ -211,14 +214,14 @@ impl ManagementInterfaceApi for ManagementInterface { self.tx .lock() .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Unsecured).into()) + .send(TunnelCommand::SetTargetState(TargetState::Unsecured)) .map_err(|_| Error::internal_error()) } fn get_state(&self) -> BoxFuture<SecurityState, Error> { trace!("get_state"); let (state_tx, state_rx) = sync::oneshot::channel(); - match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx).into()) { + match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) { Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(), Err(_) => future::err(Error::internal_error()).boxed(), } diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs index 5496d038ce..52213ebee0 100644 --- a/talpid_core/src/lib.rs +++ b/talpid_core/src/lib.rs @@ -30,3 +30,6 @@ pub mod net; /// Abstracts over different VPN tunnel technologies pub mod tunnel; + +/// Abstractions and extra features on `std::mpsc` +pub mod mpsc; diff --git a/talpid_core/src/mpsc.rs b/talpid_core/src/mpsc.rs new file mode 100644 index 0000000000..d63956c452 --- /dev/null +++ b/talpid_core/src/mpsc.rs @@ -0,0 +1,76 @@ +use std::marker::PhantomData; +use std::sync::mpsc; + +/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending. +#[derive(Debug, Clone)] +pub struct IntoSender<T, U> { + sender: mpsc::Sender<U>, + _marker: PhantomData<T>, +} + +impl<T, U> IntoSender<T, U> + where T: Into<U> +{ + /// Converts the `T` into a `U` and sends it on the channel. + pub fn send(&self, t: T) -> Result<(), mpsc::SendError<U>> { + self.sender.send(t.into()) + } +} + +impl<T, U> From<mpsc::Sender<U>> for IntoSender<T, U> + where T: Into<U> +{ + fn from(sender: mpsc::Sender<U>) -> Self { + IntoSender { + sender: sender, + _marker: PhantomData, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + use std::thread; + + #[derive(Debug, Eq, PartialEq)] + enum Inner { + One, + Two, + } + + #[derive(Debug, Eq, PartialEq)] + enum Outer { + Inner(Inner), + Other, + } + + impl From<Inner> for Outer { + fn from(o: Inner) -> Self { + Outer::Inner(o) + } + } + + #[test] + fn sender() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); + + tx.send(Outer::Other).unwrap(); + inner_tx.send(Inner::Two).unwrap(); + + assert_eq!(Outer::Other, rx.recv().unwrap()); + assert_eq!(Outer::Inner(Inner::Two), rx.recv().unwrap()); + } + + #[test] + fn send_between_thread() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); + + thread::spawn(move || { inner_tx.send(Inner::One).unwrap(); }); + + assert_eq!(Outer::Inner(Inner::One), rx.recv().unwrap()); + } +} |
