diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 15:15:42 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 15:45:22 +0100 |
| commit | aefbb16235edb96fa38745d1ded72a23ccddafd3 (patch) | |
| tree | b54cb4a388dc525b4812fbe89f06335b594b35d1 /mullvad-daemon | |
| parent | d334f116441186ee57abab70dc0509d8e69b860f (diff) | |
| download | mullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.tar.xz mullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.zip | |
Change daemon to expose a single event subscription
Diffstat (limited to 'mullvad-daemon')
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 139 |
1 files changed, 46 insertions, 93 deletions
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 591a418ebd..f77cd1b8e4 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -19,9 +19,8 @@ use mullvad_types::{ relay_list::RelayList, settings::{self, Settings}, states::TargetState, - version, + version, DaemonEvent, }; -use serde; use std::{ collections::{hash_map::Entry, HashMap}, sync::{Arc, Mutex, RwLock}, @@ -150,29 +149,18 @@ build_rpc_trait! { #[rpc(meta, name = "get_version_info")] fn get_version_info(&self, Self::Metadata) -> BoxFuture<version::AppVersionInfo, Error>; - #[pubsub(name = "new_state")] { - /// Subscribes to the `new_state` event notifications. - #[rpc(name = "new_state_subscribe")] - fn new_state_subscribe( + #[pubsub(name = "daemon_event")] { + /// Subscribes to events from the daemon. + #[rpc(name = "daemon_event_subscribe")] + fn daemon_event_subscribe( &self, Self::Metadata, - pubsub::Subscriber<TunnelStateTransition> + pubsub::Subscriber<DaemonEvent> ); - /// Unsubscribes from the `new_state` event notifications. - #[rpc(name = "new_state_unsubscribe")] - fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; - } - - #[pubsub(name = "settings")] { - /// Subscribes to the `settings` event notifications. Getting notified as soon as any - /// daemon settings change. - #[rpc(name = "settings_subscribe")] - fn settings_subscribe(&self, Self::Metadata, pubsub::Subscriber<Settings>); - - /// Unsubscribes from the `settings` event notifications. - #[rpc(name = "settings_unsubscribe")] - fn settings_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + /// Unsubscribes from the `daemon_event` event notifications. + #[rpc(name = "daemon_event_unsubscribe")] + fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; } } } @@ -237,15 +225,9 @@ pub enum ManagementCommand { Shutdown, } -#[derive(Default)] -struct ActiveSubscriptions { - new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<TunnelStateTransition>>>, - settings_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Settings>>>, -} - pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl ManagementInterfaceServer { @@ -290,29 +272,24 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. pub struct EventBroadcaster { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl EventBroadcaster { /// Sends a new state update to all `new_state` subscribers of the management interface. pub fn notify_new_state(&self, new_state: TunnelStateTransition) { - log::debug!("Broadcasting new state to listeners: {:?}", new_state); - self.notify(&self.subscriptions.new_state_subscriptions, new_state); + log::debug!("Broadcasting new state: {:?}", new_state); + self.notify(DaemonEvent::StateTransition(new_state)); } /// Sends settings to all `settings` subscribers of the management interface. pub fn notify_settings(&self, settings: &Settings) { - self.notify(&self.subscriptions.settings_subscriptions, settings.clone()); + log::debug!("Broadcasting new settings"); + self.notify(DaemonEvent::Settings(settings.clone())); } - fn notify<T>( - &self, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, - value: T, - ) where - T: serde::Serialize + Clone, - { - let subscriptions = subscriptions_lock.read().unwrap(); + fn notify(&self, value: DaemonEvent) { + let subscriptions = self.subscriptions.read().unwrap(); for sink in subscriptions.values() { let _ = sink.notify(Ok(value.clone())).wait(); } @@ -320,7 +297,7 @@ impl EventBroadcaster { } struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, tx: Mutex<IntoSender<ManagementCommand, T>>, } @@ -332,41 +309,6 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { } } - fn subscribe<V>( - subscriber: pubsub::Subscriber<V>, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) { - let mut subscriptions = subscriptions_lock.write().unwrap(); - loop { - let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); - if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { - if let Ok(sink) = subscriber.assign_id(id.clone()) { - log::debug!("Accepting new subscription with id {:?}", id); - entry.insert(sink); - } - break; - } - } - } - - fn unsubscribe<V>( - id: &SubscriptionId, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) -> BoxFuture<(), Error> { - let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); - let result = if was_removed { - log::debug!("Unsubscribing id {:?}", id); - future::ok(()) - } else { - future::err(Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription".to_owned(), - data: None, - }) - }; - Box::new(result) - } - /// Sends a command to the daemon and maps the error to an RPC error. fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> { Box::new( @@ -697,28 +639,39 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi Box::new(future) } - fn new_state_subscribe( + fn daemon_event_subscribe( &self, _: Self::Metadata, - subscriber: pubsub::Subscriber<TunnelStateTransition>, + subscriber: pubsub::Subscriber<DaemonEvent>, ) { - log::debug!("new_state_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); - } - - fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("new_state_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.new_state_subscriptions) - } - - fn settings_subscribe(&self, _: Self::Metadata, subscriber: pubsub::Subscriber<Settings>) { - log::debug!("settings_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.settings_subscriptions); + log::debug!("daemon_event_subscribe"); + let mut subscriptions = self.subscriptions.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + log::debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } } - fn settings_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("settings_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.settings_subscriptions) + fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + log::debug!("daemon_event_unsubscribe"); + let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + log::debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }) + }; + Box::new(result) } } |
