summaryrefslogtreecommitdiffhomepage
path: root/mullvad_daemon/src
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-06-12 11:14:36 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-06-21 03:29:44 +0200
commit22a73df71fc6d028d84b54776d6f5e7c7066da27 (patch)
tree281bea3479beff4b45c4668690ada0d8280da637 /mullvad_daemon/src
parent14e3220ec993e2be9c6f37a65710b887bae5f49e (diff)
downloadmullvadvpn-22a73df71fc6d028d84b54776d6f5e7c7066da27.tar.xz
mullvadvpn-22a73df71fc6d028d84b54776d6f5e7c7066da27.zip
Introduce plexmpsc and use in management interface
Diffstat (limited to 'mullvad_daemon/src')
-rw-r--r--mullvad_daemon/src/main.rs8
-rw-r--r--mullvad_daemon/src/management_interface.rs23
2 files changed, 18 insertions, 13 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs
index 1177e05bd7..23590e4a46 100644
--- a/mullvad_daemon/src/main.rs
+++ b/mullvad_daemon/src/main.rs
@@ -31,6 +31,7 @@ use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use talpid_core::net::RemoteAddr;
+use talpid_core::plexmpsc;
use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor};
error_chain!{
@@ -137,16 +138,17 @@ impl Daemon {
// Returns a handle that allows notifying all subscribers on events.
fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>)
-> Result<management_interface::EventBroadcaster> {
- let server = Self::start_management_interface_server(event_tx.clone())?;
+ let multiplex_event_tx = plexmpsc::Sender::from(event_tx.clone());
+ let server = Self::start_management_interface_server(multiplex_event_tx)?;
let event_broadcaster = server.event_broadcaster();
Self::spawn_management_interface_wait_thread(server, event_tx);
Ok(event_broadcaster)
}
- fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>)
+ fn start_management_interface_server(event_tx: plexmpsc::Sender<TunnelCommand, DaemonEvent>)
-> Result<ManagementInterfaceServer> {
let server =
- ManagementInterfaceServer::start(event_tx.clone())
+ ManagementInterfaceServer::start(event_tx)
.chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?;
info!(
"Mullvad management interface listening on {}",
diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs
index 7e66205862..6f5067c9cc 100644
--- a/mullvad_daemon/src/management_interface.rs
+++ b/mullvad_daemon/src/management_interface.rs
@@ -9,8 +9,9 @@ use states::{SecurityState, TargetState};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::net::{IpAddr, Ipv4Addr};
-use std::sync::{Arc, Mutex, RwLock, mpsc};
+use std::sync::{Arc, Mutex, RwLock};
+use talpid_core::plexmpsc;
use talpid_ipc;
use uuid;
@@ -109,7 +110,9 @@ pub struct ManagementInterfaceServer {
}
impl ManagementInterfaceServer {
- pub fn start(tunnel_tx: mpsc::Sender<::DaemonEvent>) -> talpid_ipc::Result<Self> {
+ pub fn start<T>(tunnel_tx: plexmpsc::Sender<TunnelCommand, T>) -> talpid_ipc::Result<Self>
+ where T: From<TunnelCommand> + 'static + Send
+ {
let rpc = ManagementInterface::new(tunnel_tx);
let active_subscriptions = rpc.active_subscriptions.clone();
@@ -155,13 +158,13 @@ impl EventBroadcaster {
}
}
-struct ManagementInterface {
+struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> {
active_subscriptions: ActiveSubscriptions,
- tx: Mutex<mpsc::Sender<::DaemonEvent>>,
+ tx: Mutex<plexmpsc::Sender<TunnelCommand, T>>,
}
-impl ManagementInterface {
- pub fn new(tx: mpsc::Sender<::DaemonEvent>) -> Self {
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
+ pub fn new(tx: plexmpsc::Sender<TunnelCommand, T>) -> Self {
ManagementInterface {
active_subscriptions: Default::default(),
tx: Mutex::new(tx),
@@ -169,7 +172,7 @@ impl ManagementInterface {
}
}
-impl ManagementInterfaceApi for ManagementInterface {
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> {
type Metadata = Meta;
fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> {
@@ -202,7 +205,7 @@ impl ManagementInterfaceApi for ManagementInterface {
self.tx
.lock()
.unwrap()
- .send(TunnelCommand::SetTargetState(TargetState::Secured).into())
+ .send(TunnelCommand::SetTargetState(TargetState::Secured))
.map_err(|_| Error::internal_error())
}
@@ -211,14 +214,14 @@ impl ManagementInterfaceApi for ManagementInterface {
self.tx
.lock()
.unwrap()
- .send(TunnelCommand::SetTargetState(TargetState::Unsecured).into())
+ .send(TunnelCommand::SetTargetState(TargetState::Unsecured))
.map_err(|_| Error::internal_error())
}
fn get_state(&self) -> BoxFuture<SecurityState, Error> {
trace!("get_state");
let (state_tx, state_rx) = sync::oneshot::channel();
- match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx).into()) {
+ match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) {
Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(),
Err(_) => future::err(Error::internal_error()).boxed(),
}