diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 15:15:42 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 15:45:22 +0100 |
| commit | aefbb16235edb96fa38745d1ded72a23ccddafd3 (patch) | |
| tree | b54cb4a388dc525b4812fbe89f06335b594b35d1 | |
| parent | d334f116441186ee57abab70dc0509d8e69b860f (diff) | |
| download | mullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.tar.xz mullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.zip | |
Change daemon to expose a single event subscription
| -rw-r--r-- | mullvad-cli/src/cmds/status.rs | 22 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 139 | ||||
| -rw-r--r-- | mullvad-ipc-client/src/lib.rs | 11 | ||||
| -rw-r--r-- | mullvad-types/src/lib.rs | 11 |
4 files changed, 75 insertions, 108 deletions
diff --git a/mullvad-cli/src/cmds/status.rs b/mullvad-cli/src/cmds/status.rs index f39d9cef68..fc6df014fc 100644 --- a/mullvad-cli/src/cmds/status.rs +++ b/mullvad-cli/src/cmds/status.rs @@ -1,7 +1,7 @@ use crate::{new_rpc_client, Command, Error, ErrorKind, Result, ResultExt}; use futures::{Future, Stream}; use mullvad_ipc_client::DaemonRpcClient; -use mullvad_types::auth_failed::AuthFailed; +use mullvad_types::{auth_failed::AuthFailed, DaemonEvent}; use talpid_types::tunnel::{BlockReason, TunnelStateTransition}; pub struct Status; @@ -27,17 +27,19 @@ impl Command for Status { print_location(&mut rpc)?; if matches.subcommand_matches("listen").is_some() { let subscription = rpc - .new_state_subscribe() + .daemon_event_subscribe() .wait() .map_err(|_err| Error::from(ErrorKind::CantSubscribe))?; - for new_state in subscription.wait() { - let new_state = new_state.chain_err(|| "Subscription failed")?; - print_state(&new_state); - - use self::TunnelStateTransition::*; - match new_state { - Connected(_) | Disconnected => print_location(&mut rpc)?, - _ => {} + for event in subscription.wait() { + if let DaemonEvent::StateTransition(new_state) = + event.chain_err(|| "Subscription failed")? + { + print_state(&new_state); + use self::TunnelStateTransition::*; + match new_state { + Connected(_) | Disconnected => print_location(&mut rpc)?, + _ => {} + } } } } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 591a418ebd..f77cd1b8e4 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -19,9 +19,8 @@ use mullvad_types::{ relay_list::RelayList, settings::{self, Settings}, states::TargetState, - version, + version, DaemonEvent, }; -use serde; use std::{ collections::{hash_map::Entry, HashMap}, sync::{Arc, Mutex, RwLock}, @@ -150,29 +149,18 @@ build_rpc_trait! { #[rpc(meta, name = "get_version_info")] fn get_version_info(&self, Self::Metadata) -> BoxFuture<version::AppVersionInfo, Error>; - #[pubsub(name = "new_state")] { - /// Subscribes to the `new_state` event notifications. - #[rpc(name = "new_state_subscribe")] - fn new_state_subscribe( + #[pubsub(name = "daemon_event")] { + /// Subscribes to events from the daemon. + #[rpc(name = "daemon_event_subscribe")] + fn daemon_event_subscribe( &self, Self::Metadata, - pubsub::Subscriber<TunnelStateTransition> + pubsub::Subscriber<DaemonEvent> ); - /// Unsubscribes from the `new_state` event notifications. - #[rpc(name = "new_state_unsubscribe")] - fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; - } - - #[pubsub(name = "settings")] { - /// Subscribes to the `settings` event notifications. Getting notified as soon as any - /// daemon settings change. - #[rpc(name = "settings_subscribe")] - fn settings_subscribe(&self, Self::Metadata, pubsub::Subscriber<Settings>); - - /// Unsubscribes from the `settings` event notifications. - #[rpc(name = "settings_unsubscribe")] - fn settings_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + /// Unsubscribes from the `daemon_event` event notifications. + #[rpc(name = "daemon_event_unsubscribe")] + fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; } } } @@ -237,15 +225,9 @@ pub enum ManagementCommand { Shutdown, } -#[derive(Default)] -struct ActiveSubscriptions { - new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<TunnelStateTransition>>>, - settings_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Settings>>>, -} - pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl ManagementInterfaceServer { @@ -290,29 +272,24 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. pub struct EventBroadcaster { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl EventBroadcaster { /// Sends a new state update to all `new_state` subscribers of the management interface. pub fn notify_new_state(&self, new_state: TunnelStateTransition) { - log::debug!("Broadcasting new state to listeners: {:?}", new_state); - self.notify(&self.subscriptions.new_state_subscriptions, new_state); + log::debug!("Broadcasting new state: {:?}", new_state); + self.notify(DaemonEvent::StateTransition(new_state)); } /// Sends settings to all `settings` subscribers of the management interface. pub fn notify_settings(&self, settings: &Settings) { - self.notify(&self.subscriptions.settings_subscriptions, settings.clone()); + log::debug!("Broadcasting new settings"); + self.notify(DaemonEvent::Settings(settings.clone())); } - fn notify<T>( - &self, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, - value: T, - ) where - T: serde::Serialize + Clone, - { - let subscriptions = subscriptions_lock.read().unwrap(); + fn notify(&self, value: DaemonEvent) { + let subscriptions = self.subscriptions.read().unwrap(); for sink in subscriptions.values() { let _ = sink.notify(Ok(value.clone())).wait(); } @@ -320,7 +297,7 @@ impl EventBroadcaster { } struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, tx: Mutex<IntoSender<ManagementCommand, T>>, } @@ -332,41 +309,6 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { } } - fn subscribe<V>( - subscriber: pubsub::Subscriber<V>, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) { - let mut subscriptions = subscriptions_lock.write().unwrap(); - loop { - let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); - if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { - if let Ok(sink) = subscriber.assign_id(id.clone()) { - log::debug!("Accepting new subscription with id {:?}", id); - entry.insert(sink); - } - break; - } - } - } - - fn unsubscribe<V>( - id: &SubscriptionId, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) -> BoxFuture<(), Error> { - let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); - let result = if was_removed { - log::debug!("Unsubscribing id {:?}", id); - future::ok(()) - } else { - future::err(Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription".to_owned(), - data: None, - }) - }; - Box::new(result) - } - /// Sends a command to the daemon and maps the error to an RPC error. fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> { Box::new( @@ -697,28 +639,39 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi Box::new(future) } - fn new_state_subscribe( + fn daemon_event_subscribe( &self, _: Self::Metadata, - subscriber: pubsub::Subscriber<TunnelStateTransition>, + subscriber: pubsub::Subscriber<DaemonEvent>, ) { - log::debug!("new_state_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); - } - - fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("new_state_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.new_state_subscriptions) - } - - fn settings_subscribe(&self, _: Self::Metadata, subscriber: pubsub::Subscriber<Settings>) { - log::debug!("settings_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.settings_subscriptions); + log::debug!("daemon_event_subscribe"); + let mut subscriptions = self.subscriptions.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + log::debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } } - fn settings_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("settings_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.settings_subscriptions) + fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + log::debug!("daemon_event_unsubscribe"); + let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + log::debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }) + }; + Box::new(result) } } diff --git a/mullvad-ipc-client/src/lib.rs b/mullvad-ipc-client/src/lib.rs index 21503d0155..df07c9123f 100644 --- a/mullvad-ipc-client/src/lib.rs +++ b/mullvad-ipc-client/src/lib.rs @@ -11,6 +11,7 @@ use mullvad_types::{ relay_list::RelayList, settings::{Settings, TunnelOptions}, version::AppVersionInfo, + DaemonEvent, }; use serde::{Deserialize, Serialize}; use std::{path::Path, thread}; @@ -248,16 +249,16 @@ impl DaemonRpcClient { .chain_err(|| ErrorKind::RpcCallError(method.to_owned())) } - pub fn new_state_subscribe( + pub fn daemon_event_subscribe( &mut self, ) -> impl Future< - Item = jsonrpc_client_pubsub::Subscription<TunnelStateTransition>, + Item = jsonrpc_client_pubsub::Subscription<DaemonEvent>, Error = jsonrpc_client_pubsub::Error, > { self.subscriber.subscribe( - "new_state_subscribe".to_string(), - "new_state_unsubscribe".to_string(), - "new_state".to_string(), + "daemon_event_subscribe".to_string(), + "daemon_event_unsubscribe".to_string(), + "daemon_event".to_string(), 0, &NO_ARGS, ) diff --git a/mullvad-types/src/lib.rs b/mullvad-types/src/lib.rs index b2659911e4..2ac2295982 100644 --- a/mullvad-types/src/lib.rs +++ b/mullvad-types/src/lib.rs @@ -22,3 +22,14 @@ pub mod wireguard; mod custom_tunnel; pub use crate::custom_tunnel::*; + +/// An event sent out from the daemon to frontends. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DaemonEvent { + /// The daemon transitioned into a new state. + StateTransition(talpid_types::tunnel::TunnelStateTransition), + + /// The daemon settings changed. + Settings(settings::Settings), +} |
