summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-06-30 10:48:07 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-06-30 10:48:07 +0200
commite3ac9aed3678c57ac179a953e7fdbf718f042690 (patch)
tree325b780c2e907d78d2e2f0cbcdd02008199c4dd5
parente60090c21550f8cadac4168332abe5b92106235b (diff)
parentcbe98db9cfd84d90e86ae14925cda80a9236edfe (diff)
downloadmullvadvpn-e3ac9aed3678c57ac179a953e7fdbf718f042690.tar.xz
mullvadvpn-e3ac9aed3678c57ac179a953e7fdbf718f042690.zip
Merge branch 'error-subscription'
-rw-r--r--mullvad_daemon/src/main.rs1
-rw-r--r--mullvad_daemon/src/management_interface.rs130
2 files changed, 93 insertions, 38 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs
index 53a6705f3d..4848ef956e 100644
--- a/mullvad_daemon/src/main.rs
+++ b/mullvad_daemon/src/main.rs
@@ -272,6 +272,7 @@ impl Daemon {
debug!("Triggering tunnel start");
if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") {
log_error(&e);
+ self.management_interface_broadcaster.notify_error(&e);
self.target_state = TargetState::Unsecured;
}
Ok(())
diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs
index 6c41c85793..fccdae2414 100644
--- a/mullvad_daemon/src/management_interface.rs
+++ b/mullvad_daemon/src/management_interface.rs
@@ -1,9 +1,12 @@
+use error_chain;
+
use jsonrpc_core::{Error, ErrorCode, Metadata};
use jsonrpc_core::futures::{BoxFuture, Future, future, sync};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
use jsonrpc_ws_server;
+use serde;
use states::{SecurityState, TargetState};
use std::collections::HashMap;
@@ -89,6 +92,16 @@ build_rpc_trait! {
#[rpc(name = "new_state_unsubscribe")]
fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
}
+
+ #[pubsub(name = "error")] {
+ /// Subscribes to the `error` event notifications.
+ #[rpc(name = "error_subscribe")]
+ fn error_subscribe(&self, Self::Metadata, pubsub::Subscriber<Vec<String>>);
+
+ /// Unsubscribes from the `error` event notifications.
+ #[rpc(name = "error_unsubscribe")]
+ fn error_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
+ }
}
}
@@ -102,11 +115,15 @@ pub enum TunnelCommand {
GetState(sync::oneshot::Sender<SecurityState>),
}
-type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<SecurityState>>>>;
+#[derive(Default)]
+struct ActiveSubscriptions {
+ new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<SecurityState>>>,
+ error_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Vec<String>>>>,
+}
pub struct ManagementInterfaceServer {
server: talpid_ipc::IpcServer,
- active_subscriptions: ActiveSubscriptions,
+ subscriptions: Arc<ActiveSubscriptions>,
}
impl ManagementInterfaceServer {
@@ -114,7 +131,7 @@ impl ManagementInterfaceServer {
where T: From<TunnelCommand> + 'static + Send
{
let rpc = ManagementInterface::new(tunnel_tx);
- let active_subscriptions = rpc.active_subscriptions.clone();
+ let subscriptions = rpc.subscriptions.clone();
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
@@ -122,7 +139,7 @@ impl ManagementInterfaceServer {
Ok(
ManagementInterfaceServer {
server,
- active_subscriptions,
+ subscriptions,
},
)
}
@@ -132,7 +149,7 @@ impl ManagementInterfaceServer {
}
pub fn event_broadcaster(&self) -> EventBroadcaster {
- EventBroadcaster { active_subscriptions: self.active_subscriptions.clone() }
+ EventBroadcaster { subscriptions: self.subscriptions.clone() }
}
/// Consumes the server and waits for it to finish. Returns an error if the server exited
@@ -145,31 +162,81 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
pub struct EventBroadcaster {
- active_subscriptions: ActiveSubscriptions,
+ subscriptions: Arc<ActiveSubscriptions>,
}
impl EventBroadcaster {
- /// Sends an event to all subscribers of the management interface.
- pub fn notify_new_state(&self, event: SecurityState) {
- let active_subscriptions = self.active_subscriptions.read().unwrap();
- for sink in active_subscriptions.values() {
- let _ = sink.notify(Ok(event)).wait();
+ /// Sends a new state update to all `new_state` subscribers of the management interface.
+ pub fn notify_new_state(&self, new_state: SecurityState) {
+ self.notify(&self.subscriptions.new_state_subscriptions, new_state);
+ }
+
+ /// Sends an error to all `error` subscribers of the management interface.
+ pub fn notify_error<E>(&self, error: &E)
+ where E: error_chain::ChainedError
+ {
+ let error_strings = error.iter().map(|e| e.to_string()).collect();
+ self.notify(&self.subscriptions.error_subscriptions, error_strings);
+ }
+
+ fn notify<T>(&self,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>,
+ value: T)
+ where T: serde::Serialize + Clone
+ {
+ let subscriptions = subscriptions_lock.read().unwrap();
+ for sink in subscriptions.values() {
+ let _ = sink.notify(Ok(value.clone())).wait();
}
}
}
struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> {
- active_subscriptions: ActiveSubscriptions,
+ subscriptions: Arc<ActiveSubscriptions>,
tx: Mutex<IntoSender<TunnelCommand, T>>,
}
impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self {
ManagementInterface {
- active_subscriptions: Default::default(),
+ subscriptions: Default::default(),
tx: Mutex::new(tx),
}
}
+
+ fn subscribe<V>(subscriber: pubsub::Subscriber<V>,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) {
+ let mut subscriptions = subscriptions_lock.write().unwrap();
+ loop {
+ let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
+ if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
+ if let Ok(sink) = subscriber.assign_id(id.clone()) {
+ debug!("Accepting new subscription with id {:?}", id);
+ entry.insert(sink);
+ }
+ break;
+ }
+ }
+ }
+
+ fn unsubscribe<V>(id: SubscriptionId,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>)
+ -> BoxFuture<(), Error> {
+ let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some();
+ let result = if was_removed {
+ debug!("Unsubscribing id {:?}", id);
+ future::ok(())
+ } else {
+ future::err(
+ Error {
+ code: ErrorCode::InvalidParams,
+ message: "Invalid subscription".to_owned(),
+ data: None,
+ },
+ )
+ };
+ result.boxed()
+ }
}
impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> {
@@ -247,35 +314,22 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
_meta: Self::Metadata,
subscriber: pubsub::Subscriber<SecurityState>) {
trace!("new_state_subscribe");
- let mut active_subscriptions = self.active_subscriptions.write().unwrap();
- loop {
- let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
- if let Entry::Vacant(entry) = active_subscriptions.entry(id.clone()) {
- if let Ok(sink) = subscriber.assign_id(id.clone()) {
- debug!("Accepting new subscription with id {:?}", id);
- entry.insert(sink);
- }
- break;
- }
- }
+ Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions);
}
fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
trace!("new_state_unsubscribe");
- let was_removed = self.active_subscriptions.write().unwrap().remove(&id).is_some();
- let result = if was_removed {
- debug!("Unsubscribing id {:?}", id);
- future::ok(())
- } else {
- future::err(
- Error {
- code: ErrorCode::InvalidParams,
- message: "Invalid subscription".to_owned(),
- data: None,
- },
- )
- };
- result.boxed()
+ Self::unsubscribe(id, &self.subscriptions.new_state_subscriptions)
+ }
+
+ fn error_subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<Vec<String>>) {
+ trace!("error_subscribe");
+ Self::subscribe(subscriber, &self.subscriptions.error_subscriptions);
+ }
+
+ fn error_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
+ trace!("error_unsubscribe");
+ Self::unsubscribe(id, &self.subscriptions.error_subscriptions)
}
}