summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2019-03-27 15:15:42 +0100
committerLinus Färnstrand <linus@mullvad.net>2019-03-27 15:45:22 +0100
commitaefbb16235edb96fa38745d1ded72a23ccddafd3 (patch)
treeb54cb4a388dc525b4812fbe89f06335b594b35d1
parentd334f116441186ee57abab70dc0509d8e69b860f (diff)
downloadmullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.tar.xz
mullvadvpn-aefbb16235edb96fa38745d1ded72a23ccddafd3.zip
Change daemon to expose a single event subscription
-rw-r--r--mullvad-cli/src/cmds/status.rs22
-rw-r--r--mullvad-daemon/src/management_interface.rs139
-rw-r--r--mullvad-ipc-client/src/lib.rs11
-rw-r--r--mullvad-types/src/lib.rs11
4 files changed, 75 insertions, 108 deletions
diff --git a/mullvad-cli/src/cmds/status.rs b/mullvad-cli/src/cmds/status.rs
index f39d9cef68..fc6df014fc 100644
--- a/mullvad-cli/src/cmds/status.rs
+++ b/mullvad-cli/src/cmds/status.rs
@@ -1,7 +1,7 @@
use crate::{new_rpc_client, Command, Error, ErrorKind, Result, ResultExt};
use futures::{Future, Stream};
use mullvad_ipc_client::DaemonRpcClient;
-use mullvad_types::auth_failed::AuthFailed;
+use mullvad_types::{auth_failed::AuthFailed, DaemonEvent};
use talpid_types::tunnel::{BlockReason, TunnelStateTransition};
pub struct Status;
@@ -27,17 +27,19 @@ impl Command for Status {
print_location(&mut rpc)?;
if matches.subcommand_matches("listen").is_some() {
let subscription = rpc
- .new_state_subscribe()
+ .daemon_event_subscribe()
.wait()
.map_err(|_err| Error::from(ErrorKind::CantSubscribe))?;
- for new_state in subscription.wait() {
- let new_state = new_state.chain_err(|| "Subscription failed")?;
- print_state(&new_state);
-
- use self::TunnelStateTransition::*;
- match new_state {
- Connected(_) | Disconnected => print_location(&mut rpc)?,
- _ => {}
+ for event in subscription.wait() {
+ if let DaemonEvent::StateTransition(new_state) =
+ event.chain_err(|| "Subscription failed")?
+ {
+ print_state(&new_state);
+ use self::TunnelStateTransition::*;
+ match new_state {
+ Connected(_) | Disconnected => print_location(&mut rpc)?,
+ _ => {}
+ }
}
}
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 591a418ebd..f77cd1b8e4 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -19,9 +19,8 @@ use mullvad_types::{
relay_list::RelayList,
settings::{self, Settings},
states::TargetState,
- version,
+ version, DaemonEvent,
};
-use serde;
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex, RwLock},
@@ -150,29 +149,18 @@ build_rpc_trait! {
#[rpc(meta, name = "get_version_info")]
fn get_version_info(&self, Self::Metadata) -> BoxFuture<version::AppVersionInfo, Error>;
- #[pubsub(name = "new_state")] {
- /// Subscribes to the `new_state` event notifications.
- #[rpc(name = "new_state_subscribe")]
- fn new_state_subscribe(
+ #[pubsub(name = "daemon_event")] {
+ /// Subscribes to events from the daemon.
+ #[rpc(name = "daemon_event_subscribe")]
+ fn daemon_event_subscribe(
&self,
Self::Metadata,
- pubsub::Subscriber<TunnelStateTransition>
+ pubsub::Subscriber<DaemonEvent>
);
- /// Unsubscribes from the `new_state` event notifications.
- #[rpc(name = "new_state_unsubscribe")]
- fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
- }
-
- #[pubsub(name = "settings")] {
- /// Subscribes to the `settings` event notifications. Getting notified as soon as any
- /// daemon settings change.
- #[rpc(name = "settings_subscribe")]
- fn settings_subscribe(&self, Self::Metadata, pubsub::Subscriber<Settings>);
-
- /// Unsubscribes from the `settings` event notifications.
- #[rpc(name = "settings_unsubscribe")]
- fn settings_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
+ /// Unsubscribes from the `daemon_event` event notifications.
+ #[rpc(name = "daemon_event_unsubscribe")]
+ fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
}
}
}
@@ -237,15 +225,9 @@ pub enum ManagementCommand {
Shutdown,
}
-#[derive(Default)]
-struct ActiveSubscriptions {
- new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<TunnelStateTransition>>>,
- settings_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Settings>>>,
-}
-
pub struct ManagementInterfaceServer {
server: talpid_ipc::IpcServer,
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
}
impl ManagementInterfaceServer {
@@ -290,29 +272,24 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
pub struct EventBroadcaster {
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
}
impl EventBroadcaster {
/// Sends a new state update to all `new_state` subscribers of the management interface.
pub fn notify_new_state(&self, new_state: TunnelStateTransition) {
- log::debug!("Broadcasting new state to listeners: {:?}", new_state);
- self.notify(&self.subscriptions.new_state_subscriptions, new_state);
+ log::debug!("Broadcasting new state: {:?}", new_state);
+ self.notify(DaemonEvent::StateTransition(new_state));
}
/// Sends settings to all `settings` subscribers of the management interface.
pub fn notify_settings(&self, settings: &Settings) {
- self.notify(&self.subscriptions.settings_subscriptions, settings.clone());
+ log::debug!("Broadcasting new settings");
+ self.notify(DaemonEvent::Settings(settings.clone()));
}
- fn notify<T>(
- &self,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>,
- value: T,
- ) where
- T: serde::Serialize + Clone,
- {
- let subscriptions = subscriptions_lock.read().unwrap();
+ fn notify(&self, value: DaemonEvent) {
+ let subscriptions = self.subscriptions.read().unwrap();
for sink in subscriptions.values() {
let _ = sink.notify(Ok(value.clone())).wait();
}
@@ -320,7 +297,7 @@ impl EventBroadcaster {
}
struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> {
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
tx: Mutex<IntoSender<ManagementCommand, T>>,
}
@@ -332,41 +309,6 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
}
}
- fn subscribe<V>(
- subscriber: pubsub::Subscriber<V>,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>,
- ) {
- let mut subscriptions = subscriptions_lock.write().unwrap();
- loop {
- let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
- if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
- if let Ok(sink) = subscriber.assign_id(id.clone()) {
- log::debug!("Accepting new subscription with id {:?}", id);
- entry.insert(sink);
- }
- break;
- }
- }
- }
-
- fn unsubscribe<V>(
- id: &SubscriptionId,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>,
- ) -> BoxFuture<(), Error> {
- let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some();
- let result = if was_removed {
- log::debug!("Unsubscribing id {:?}", id);
- future::ok(())
- } else {
- future::err(Error {
- code: ErrorCode::InvalidParams,
- message: "Invalid subscription".to_owned(),
- data: None,
- })
- };
- Box::new(result)
- }
-
/// Sends a command to the daemon and maps the error to an RPC error.
fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> {
Box::new(
@@ -697,28 +639,39 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
Box::new(future)
}
- fn new_state_subscribe(
+ fn daemon_event_subscribe(
&self,
_: Self::Metadata,
- subscriber: pubsub::Subscriber<TunnelStateTransition>,
+ subscriber: pubsub::Subscriber<DaemonEvent>,
) {
- log::debug!("new_state_subscribe");
- Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions);
- }
-
- fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
- log::debug!("new_state_unsubscribe");
- Self::unsubscribe(&id, &self.subscriptions.new_state_subscriptions)
- }
-
- fn settings_subscribe(&self, _: Self::Metadata, subscriber: pubsub::Subscriber<Settings>) {
- log::debug!("settings_subscribe");
- Self::subscribe(subscriber, &self.subscriptions.settings_subscriptions);
+ log::debug!("daemon_event_subscribe");
+ let mut subscriptions = self.subscriptions.write().unwrap();
+ loop {
+ let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
+ if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
+ if let Ok(sink) = subscriber.assign_id(id.clone()) {
+ log::debug!("Accepting new subscription with id {:?}", id);
+ entry.insert(sink);
+ }
+ break;
+ }
+ }
}
- fn settings_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
- log::debug!("settings_unsubscribe");
- Self::unsubscribe(&id, &self.subscriptions.settings_subscriptions)
+ fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
+ log::debug!("daemon_event_unsubscribe");
+ let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some();
+ let result = if was_removed {
+ log::debug!("Unsubscribing id {:?}", id);
+ future::ok(())
+ } else {
+ future::err(Error {
+ code: ErrorCode::InvalidParams,
+ message: "Invalid subscription".to_owned(),
+ data: None,
+ })
+ };
+ Box::new(result)
}
}
diff --git a/mullvad-ipc-client/src/lib.rs b/mullvad-ipc-client/src/lib.rs
index 21503d0155..df07c9123f 100644
--- a/mullvad-ipc-client/src/lib.rs
+++ b/mullvad-ipc-client/src/lib.rs
@@ -11,6 +11,7 @@ use mullvad_types::{
relay_list::RelayList,
settings::{Settings, TunnelOptions},
version::AppVersionInfo,
+ DaemonEvent,
};
use serde::{Deserialize, Serialize};
use std::{path::Path, thread};
@@ -248,16 +249,16 @@ impl DaemonRpcClient {
.chain_err(|| ErrorKind::RpcCallError(method.to_owned()))
}
- pub fn new_state_subscribe(
+ pub fn daemon_event_subscribe(
&mut self,
) -> impl Future<
- Item = jsonrpc_client_pubsub::Subscription<TunnelStateTransition>,
+ Item = jsonrpc_client_pubsub::Subscription<DaemonEvent>,
Error = jsonrpc_client_pubsub::Error,
> {
self.subscriber.subscribe(
- "new_state_subscribe".to_string(),
- "new_state_unsubscribe".to_string(),
- "new_state".to_string(),
+ "daemon_event_subscribe".to_string(),
+ "daemon_event_unsubscribe".to_string(),
+ "daemon_event".to_string(),
0,
&NO_ARGS,
)
diff --git a/mullvad-types/src/lib.rs b/mullvad-types/src/lib.rs
index b2659911e4..2ac2295982 100644
--- a/mullvad-types/src/lib.rs
+++ b/mullvad-types/src/lib.rs
@@ -22,3 +22,14 @@ pub mod wireguard;
mod custom_tunnel;
pub use crate::custom_tunnel::*;
+
+/// An event sent out from the daemon to frontends.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum DaemonEvent {
+ /// The daemon transitioned into a new state.
+ StateTransition(talpid_types::tunnel::TunnelStateTransition),
+
+ /// The daemon settings changed.
+ Settings(settings::Settings),
+}