diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-06-12 11:14:36 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-06-21 03:29:44 +0200 |
| commit | 22a73df71fc6d028d84b54776d6f5e7c7066da27 (patch) | |
| tree | 281bea3479beff4b45c4668690ada0d8280da637 | |
| parent | 14e3220ec993e2be9c6f37a65710b887bae5f49e (diff) | |
| download | mullvadvpn-22a73df71fc6d028d84b54776d6f5e7c7066da27.tar.xz mullvadvpn-22a73df71fc6d028d84b54776d6f5e7c7066da27.zip | |
Introduce plexmpsc and use in management interface
| -rw-r--r-- | mullvad_daemon/src/main.rs | 8 | ||||
| -rw-r--r-- | mullvad_daemon/src/management_interface.rs | 23 | ||||
| -rw-r--r-- | talpid_core/src/lib.rs | 3 | ||||
| -rw-r--r-- | talpid_core/src/plexmpsc.rs | 76 |
4 files changed, 97 insertions, 13 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index 1177e05bd7..23590e4a46 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -31,6 +31,7 @@ use std::sync::{Arc, Mutex, mpsc}; use std::thread; use talpid_core::net::RemoteAddr; +use talpid_core::plexmpsc; use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; error_chain!{ @@ -137,16 +138,17 @@ impl Daemon { // Returns a handle that allows notifying all subscribers on events. fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) -> Result<management_interface::EventBroadcaster> { - let server = Self::start_management_interface_server(event_tx.clone())?; + let multiplex_event_tx = plexmpsc::Sender::from(event_tx.clone()); + let server = Self::start_management_interface_server(multiplex_event_tx)?; let event_broadcaster = server.event_broadcaster(); Self::spawn_management_interface_wait_thread(server, event_tx); Ok(event_broadcaster) } - fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>) + fn start_management_interface_server(event_tx: plexmpsc::Sender<TunnelCommand, DaemonEvent>) -> Result<ManagementInterfaceServer> { let server = - ManagementInterfaceServer::start(event_tx.clone()) + ManagementInterfaceServer::start(event_tx) .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; info!( "Mullvad management interface listening on {}", diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs index 7e66205862..6f5067c9cc 100644 --- a/mullvad_daemon/src/management_interface.rs +++ b/mullvad_daemon/src/management_interface.rs @@ -9,8 +9,9 @@ use states::{SecurityState, TargetState}; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, Mutex, RwLock, mpsc}; +use std::sync::{Arc, Mutex, RwLock}; +use talpid_core::plexmpsc; use talpid_ipc; use uuid; @@ -109,7 +110,9 @@ pub struct ManagementInterfaceServer { } impl ManagementInterfaceServer { - pub fn start(tunnel_tx: mpsc::Sender<::DaemonEvent>) -> talpid_ipc::Result<Self> { + pub fn start<T>(tunnel_tx: plexmpsc::Sender<TunnelCommand, T>) -> talpid_ipc::Result<Self> + where T: From<TunnelCommand> + 'static + Send + { let rpc = ManagementInterface::new(tunnel_tx); let active_subscriptions = rpc.active_subscriptions.clone(); @@ -155,13 +158,13 @@ impl EventBroadcaster { } } -struct ManagementInterface { +struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { active_subscriptions: ActiveSubscriptions, - tx: Mutex<mpsc::Sender<::DaemonEvent>>, + tx: Mutex<plexmpsc::Sender<TunnelCommand, T>>, } -impl ManagementInterface { - pub fn new(tx: mpsc::Sender<::DaemonEvent>) -> Self { +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { + pub fn new(tx: plexmpsc::Sender<TunnelCommand, T>) -> Self { ManagementInterface { active_subscriptions: Default::default(), tx: Mutex::new(tx), @@ -169,7 +172,7 @@ impl ManagementInterface { } } -impl ManagementInterfaceApi for ManagementInterface { +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { type Metadata = Meta; fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> { @@ -202,7 +205,7 @@ impl ManagementInterfaceApi for ManagementInterface { self.tx .lock() .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Secured).into()) + .send(TunnelCommand::SetTargetState(TargetState::Secured)) .map_err(|_| Error::internal_error()) } @@ -211,14 +214,14 @@ impl ManagementInterfaceApi for ManagementInterface { self.tx .lock() .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Unsecured).into()) + .send(TunnelCommand::SetTargetState(TargetState::Unsecured)) .map_err(|_| Error::internal_error()) } fn get_state(&self) -> BoxFuture<SecurityState, Error> { trace!("get_state"); let (state_tx, state_rx) = sync::oneshot::channel(); - match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx).into()) { + match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) { Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(), Err(_) => future::err(Error::internal_error()).boxed(), } diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs index 5496d038ce..3ebb24d1ec 100644 --- a/talpid_core/src/lib.rs +++ b/talpid_core/src/lib.rs @@ -30,3 +30,6 @@ pub mod net; /// Abstracts over different VPN tunnel technologies pub mod tunnel; + +/// Multiplexing abstractions over `std::mpsc` +pub mod plexmpsc; diff --git a/talpid_core/src/plexmpsc.rs b/talpid_core/src/plexmpsc.rs new file mode 100644 index 0000000000..be1904bdb2 --- /dev/null +++ b/talpid_core/src/plexmpsc.rs @@ -0,0 +1,76 @@ +use std::marker::PhantomData; +use std::sync::mpsc; + +/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending. +#[derive(Debug, Clone)] +pub struct Sender<T, U> { + sender: mpsc::Sender<U>, + _marker: PhantomData<T>, +} + +impl<T, U> Sender<T, U> + where T: Into<U> +{ + /// Converts the `T` into a `U` and sends it on the channel. + pub fn send(&self, t: T) -> Result<(), mpsc::SendError<U>> { + self.sender.send(t.into()) + } +} + +impl<T, U> From<mpsc::Sender<U>> for Sender<T, U> + where T: Into<U> +{ + fn from(sender: mpsc::Sender<U>) -> Self { + Sender { + sender: sender, + _marker: PhantomData, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + use std::thread; + + #[derive(Debug, Eq, PartialEq)] + enum Inner { + One, + Two, + } + + #[derive(Debug, Eq, PartialEq)] + enum Outer { + Inner(Inner), + Other, + } + + impl From<Inner> for Outer { + fn from(o: Inner) -> Self { + Outer::Inner(o) + } + } + + #[test] + fn sender() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: Sender<Inner, Outer> = tx.clone().into(); + + tx.send(Outer::Other).unwrap(); + inner_tx.send(Inner::Two).unwrap(); + + assert_eq!(Outer::Other, rx.recv().unwrap()); + assert_eq!(Outer::Inner(Inner::Two), rx.recv().unwrap()); + } + + #[test] + fn send_between_thread() { + let (tx, rx) = mpsc::channel::<Outer>(); + let inner_tx: Sender<Inner, Outer> = tx.clone().into(); + + thread::spawn(move || { inner_tx.send(Inner::One).unwrap(); }); + + assert_eq!(Outer::Inner(Inner::One), rx.recv().unwrap()); + } +} |
