diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-06-30 10:48:07 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-06-30 10:48:07 +0200 |
| commit | e3ac9aed3678c57ac179a953e7fdbf718f042690 (patch) | |
| tree | 325b780c2e907d78d2e2f0cbcdd02008199c4dd5 | |
| parent | e60090c21550f8cadac4168332abe5b92106235b (diff) | |
| parent | cbe98db9cfd84d90e86ae14925cda80a9236edfe (diff) | |
| download | mullvadvpn-e3ac9aed3678c57ac179a953e7fdbf718f042690.tar.xz mullvadvpn-e3ac9aed3678c57ac179a953e7fdbf718f042690.zip | |
Merge branch 'error-subscription'
| -rw-r--r-- | mullvad_daemon/src/main.rs | 1 | ||||
| -rw-r--r-- | mullvad_daemon/src/management_interface.rs | 130 |
2 files changed, 93 insertions, 38 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index 53a6705f3d..4848ef956e 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -272,6 +272,7 @@ impl Daemon { debug!("Triggering tunnel start"); if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") { log_error(&e); + self.management_interface_broadcaster.notify_error(&e); self.target_state = TargetState::Unsecured; } Ok(()) diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs index 6c41c85793..fccdae2414 100644 --- a/mullvad_daemon/src/management_interface.rs +++ b/mullvad_daemon/src/management_interface.rs @@ -1,9 +1,12 @@ +use error_chain; + use jsonrpc_core::{Error, ErrorCode, Metadata}; use jsonrpc_core::futures::{BoxFuture, Future, future, sync}; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; use jsonrpc_ws_server; +use serde; use states::{SecurityState, TargetState}; use std::collections::HashMap; @@ -89,6 +92,16 @@ build_rpc_trait! { #[rpc(name = "new_state_unsubscribe")] fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; } + + #[pubsub(name = "error")] { + /// Subscribes to the `error` event notifications. + #[rpc(name = "error_subscribe")] + fn error_subscribe(&self, Self::Metadata, pubsub::Subscriber<Vec<String>>); + + /// Unsubscribes from the `error` event notifications. + #[rpc(name = "error_unsubscribe")] + fn error_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + } } } @@ -102,11 +115,15 @@ pub enum TunnelCommand { GetState(sync::oneshot::Sender<SecurityState>), } -type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<SecurityState>>>>; +#[derive(Default)] +struct ActiveSubscriptions { + new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<SecurityState>>>, + error_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Vec<String>>>>, +} pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, - active_subscriptions: ActiveSubscriptions, + subscriptions: Arc<ActiveSubscriptions>, } impl ManagementInterfaceServer { @@ -114,7 +131,7 @@ impl ManagementInterfaceServer { where T: From<TunnelCommand> + 'static + Send { let rpc = ManagementInterface::new(tunnel_tx); - let active_subscriptions = rpc.active_subscriptions.clone(); + let subscriptions = rpc.subscriptions.clone(); let mut io = PubSubHandler::default(); io.extend_with(rpc.to_delegate()); @@ -122,7 +139,7 @@ impl ManagementInterfaceServer { Ok( ManagementInterfaceServer { server, - active_subscriptions, + subscriptions, }, ) } @@ -132,7 +149,7 @@ impl ManagementInterfaceServer { } pub fn event_broadcaster(&self) -> EventBroadcaster { - EventBroadcaster { active_subscriptions: self.active_subscriptions.clone() } + EventBroadcaster { subscriptions: self.subscriptions.clone() } } /// Consumes the server and waits for it to finish. Returns an error if the server exited @@ -145,31 +162,81 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. pub struct EventBroadcaster { - active_subscriptions: ActiveSubscriptions, + subscriptions: Arc<ActiveSubscriptions>, } impl EventBroadcaster { - /// Sends an event to all subscribers of the management interface. - pub fn notify_new_state(&self, event: SecurityState) { - let active_subscriptions = self.active_subscriptions.read().unwrap(); - for sink in active_subscriptions.values() { - let _ = sink.notify(Ok(event)).wait(); + /// Sends a new state update to all `new_state` subscribers of the management interface. + pub fn notify_new_state(&self, new_state: SecurityState) { + self.notify(&self.subscriptions.new_state_subscriptions, new_state); + } + + /// Sends an error to all `error` subscribers of the management interface. + pub fn notify_error<E>(&self, error: &E) + where E: error_chain::ChainedError + { + let error_strings = error.iter().map(|e| e.to_string()).collect(); + self.notify(&self.subscriptions.error_subscriptions, error_strings); + } + + fn notify<T>(&self, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, + value: T) + where T: serde::Serialize + Clone + { + let subscriptions = subscriptions_lock.read().unwrap(); + for sink in subscriptions.values() { + let _ = sink.notify(Ok(value.clone())).wait(); } } } struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { - active_subscriptions: ActiveSubscriptions, + subscriptions: Arc<ActiveSubscriptions>, tx: Mutex<IntoSender<TunnelCommand, T>>, } impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self { ManagementInterface { - active_subscriptions: Default::default(), + subscriptions: Default::default(), tx: Mutex::new(tx), } } + + fn subscribe<V>(subscriber: pubsub::Subscriber<V>, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) { + let mut subscriptions = subscriptions_lock.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } + } + + fn unsubscribe<V>(id: SubscriptionId, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) + -> BoxFuture<(), Error> { + let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err( + Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }, + ) + }; + result.boxed() + } } impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { @@ -247,35 +314,22 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem _meta: Self::Metadata, subscriber: pubsub::Subscriber<SecurityState>) { trace!("new_state_subscribe"); - let mut active_subscriptions = self.active_subscriptions.write().unwrap(); - loop { - let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); - if let Entry::Vacant(entry) = active_subscriptions.entry(id.clone()) { - if let Ok(sink) = subscriber.assign_id(id.clone()) { - debug!("Accepting new subscription with id {:?}", id); - entry.insert(sink); - } - break; - } - } + Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); } fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { trace!("new_state_unsubscribe"); - let was_removed = self.active_subscriptions.write().unwrap().remove(&id).is_some(); - let result = if was_removed { - debug!("Unsubscribing id {:?}", id); - future::ok(()) - } else { - future::err( - Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription".to_owned(), - data: None, - }, - ) - }; - result.boxed() + Self::unsubscribe(id, &self.subscriptions.new_state_subscriptions) + } + + fn error_subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<Vec<String>>) { + trace!("error_subscribe"); + Self::subscribe(subscriber, &self.subscriptions.error_subscriptions); + } + + fn error_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + trace!("error_unsubscribe"); + Self::unsubscribe(id, &self.subscriptions.error_subscriptions) } } |
