summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-06-21 03:30:42 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-06-21 03:30:42 +0200
commit856faacc8304b9fcb6e1ecb8846081fa6279c9fe (patch)
tree181db8a0ceb87ded84d2b75c1fa464a37bb4ae00
parent14e3220ec993e2be9c6f37a65710b887bae5f49e (diff)
parent16889f62a3c54a8540f7fd9b064c154f002e4233 (diff)
downloadmullvadvpn-856faacc8304b9fcb6e1ecb8846081fa6279c9fe.tar.xz
mullvadvpn-856faacc8304b9fcb6e1ecb8846081fa6279c9fe.zip
Merge branch 'multiplex-channel'
-rw-r--r--mullvad_daemon/src/main.rs8
-rw-r--r--mullvad_daemon/src/management_interface.rs23
-rw-r--r--talpid_core/src/lib.rs3
-rw-r--r--talpid_core/src/mpsc.rs76
4 files changed, 97 insertions, 13 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs
index 1177e05bd7..bfbc3de531 100644
--- a/mullvad_daemon/src/main.rs
+++ b/mullvad_daemon/src/main.rs
@@ -30,6 +30,7 @@ use states::{SecurityState, TargetState};
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
+use talpid_core::mpsc::IntoSender;
use talpid_core::net::RemoteAddr;
use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor};
@@ -137,16 +138,17 @@ impl Daemon {
// Returns a handle that allows notifying all subscribers on events.
fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>)
-> Result<management_interface::EventBroadcaster> {
- let server = Self::start_management_interface_server(event_tx.clone())?;
+ let multiplex_event_tx = IntoSender::from(event_tx.clone());
+ let server = Self::start_management_interface_server(multiplex_event_tx)?;
let event_broadcaster = server.event_broadcaster();
Self::spawn_management_interface_wait_thread(server, event_tx);
Ok(event_broadcaster)
}
- fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>)
+ fn start_management_interface_server(event_tx: IntoSender<TunnelCommand, DaemonEvent>)
-> Result<ManagementInterfaceServer> {
let server =
- ManagementInterfaceServer::start(event_tx.clone())
+ ManagementInterfaceServer::start(event_tx)
.chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?;
info!(
"Mullvad management interface listening on {}",
diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs
index 7e66205862..6c41c85793 100644
--- a/mullvad_daemon/src/management_interface.rs
+++ b/mullvad_daemon/src/management_interface.rs
@@ -9,8 +9,9 @@ use states::{SecurityState, TargetState};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::net::{IpAddr, Ipv4Addr};
-use std::sync::{Arc, Mutex, RwLock, mpsc};
+use std::sync::{Arc, Mutex, RwLock};
+use talpid_core::mpsc::IntoSender;
use talpid_ipc;
use uuid;
@@ -109,7 +110,9 @@ pub struct ManagementInterfaceServer {
}
impl ManagementInterfaceServer {
- pub fn start(tunnel_tx: mpsc::Sender<::DaemonEvent>) -> talpid_ipc::Result<Self> {
+ pub fn start<T>(tunnel_tx: IntoSender<TunnelCommand, T>) -> talpid_ipc::Result<Self>
+ where T: From<TunnelCommand> + 'static + Send
+ {
let rpc = ManagementInterface::new(tunnel_tx);
let active_subscriptions = rpc.active_subscriptions.clone();
@@ -155,13 +158,13 @@ impl EventBroadcaster {
}
}
-struct ManagementInterface {
+struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> {
active_subscriptions: ActiveSubscriptions,
- tx: Mutex<mpsc::Sender<::DaemonEvent>>,
+ tx: Mutex<IntoSender<TunnelCommand, T>>,
}
-impl ManagementInterface {
- pub fn new(tx: mpsc::Sender<::DaemonEvent>) -> Self {
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
+ pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self {
ManagementInterface {
active_subscriptions: Default::default(),
tx: Mutex::new(tx),
@@ -169,7 +172,7 @@ impl ManagementInterface {
}
}
-impl ManagementInterfaceApi for ManagementInterface {
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> {
type Metadata = Meta;
fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> {
@@ -202,7 +205,7 @@ impl ManagementInterfaceApi for ManagementInterface {
self.tx
.lock()
.unwrap()
- .send(TunnelCommand::SetTargetState(TargetState::Secured).into())
+ .send(TunnelCommand::SetTargetState(TargetState::Secured))
.map_err(|_| Error::internal_error())
}
@@ -211,14 +214,14 @@ impl ManagementInterfaceApi for ManagementInterface {
self.tx
.lock()
.unwrap()
- .send(TunnelCommand::SetTargetState(TargetState::Unsecured).into())
+ .send(TunnelCommand::SetTargetState(TargetState::Unsecured))
.map_err(|_| Error::internal_error())
}
fn get_state(&self) -> BoxFuture<SecurityState, Error> {
trace!("get_state");
let (state_tx, state_rx) = sync::oneshot::channel();
- match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx).into()) {
+ match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) {
Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(),
Err(_) => future::err(Error::internal_error()).boxed(),
}
diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs
index 5496d038ce..52213ebee0 100644
--- a/talpid_core/src/lib.rs
+++ b/talpid_core/src/lib.rs
@@ -30,3 +30,6 @@ pub mod net;
/// Abstracts over different VPN tunnel technologies
pub mod tunnel;
+
+/// Abstractions and extra features on `std::mpsc`
+pub mod mpsc;
diff --git a/talpid_core/src/mpsc.rs b/talpid_core/src/mpsc.rs
new file mode 100644
index 0000000000..d63956c452
--- /dev/null
+++ b/talpid_core/src/mpsc.rs
@@ -0,0 +1,76 @@
+use std::marker::PhantomData;
+use std::sync::mpsc;
+
+/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending.
+#[derive(Debug, Clone)]
+pub struct IntoSender<T, U> {
+ sender: mpsc::Sender<U>,
+ _marker: PhantomData<T>,
+}
+
+impl<T, U> IntoSender<T, U>
+ where T: Into<U>
+{
+ /// Converts the `T` into a `U` and sends it on the channel.
+ pub fn send(&self, t: T) -> Result<(), mpsc::SendError<U>> {
+ self.sender.send(t.into())
+ }
+}
+
+impl<T, U> From<mpsc::Sender<U>> for IntoSender<T, U>
+ where T: Into<U>
+{
+ fn from(sender: mpsc::Sender<U>) -> Self {
+ IntoSender {
+ sender: sender,
+ _marker: PhantomData,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::mpsc;
+ use std::thread;
+
+ #[derive(Debug, Eq, PartialEq)]
+ enum Inner {
+ One,
+ Two,
+ }
+
+ #[derive(Debug, Eq, PartialEq)]
+ enum Outer {
+ Inner(Inner),
+ Other,
+ }
+
+ impl From<Inner> for Outer {
+ fn from(o: Inner) -> Self {
+ Outer::Inner(o)
+ }
+ }
+
+ #[test]
+ fn sender() {
+ let (tx, rx) = mpsc::channel::<Outer>();
+ let inner_tx: IntoSender<Inner, Outer> = tx.clone().into();
+
+ tx.send(Outer::Other).unwrap();
+ inner_tx.send(Inner::Two).unwrap();
+
+ assert_eq!(Outer::Other, rx.recv().unwrap());
+ assert_eq!(Outer::Inner(Inner::Two), rx.recv().unwrap());
+ }
+
+ #[test]
+ fn send_between_thread() {
+ let (tx, rx) = mpsc::channel::<Outer>();
+ let inner_tx: IntoSender<Inner, Outer> = tx.clone().into();
+
+ thread::spawn(move || { inner_tx.send(Inner::One).unwrap(); });
+
+ assert_eq!(Outer::Inner(Inner::One), rx.recv().unwrap());
+ }
+}