diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 18:03:16 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2019-03-27 18:03:16 +0100 |
| commit | d73e8b7e00bb1b1d8a164298ca66e2c98feac1f4 (patch) | |
| tree | 37094fc6ce64372f34d880b5d430d06ad19052bf | |
| parent | d334f116441186ee57abab70dc0509d8e69b860f (diff) | |
| parent | ae59b0aa3f8798950f788310fe403fdc2f8f8f6f (diff) | |
| download | mullvadvpn-d73e8b7e00bb1b1d8a164298ca66e2c98feac1f4.tar.xz mullvadvpn-d73e8b7e00bb1b1d8a164298ca66e2c98feac1f4.zip | |
Merge branch 'unify-daemon-subscriptions'
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | gui/src/main/daemon-rpc.ts | 37 | ||||
| -rw-r--r-- | gui/src/main/index.ts | 27 | ||||
| -rw-r--r-- | gui/src/shared/daemon-rpc-types.ts | 2 | ||||
| -rw-r--r-- | mullvad-cli/src/cmds/status.rs | 22 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 139 | ||||
| -rw-r--r-- | mullvad-ipc-client/src/lib.rs | 11 | ||||
| -rw-r--r-- | mullvad-tests/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-tests/tests/connection.rs | 36 | ||||
| -rw-r--r-- | mullvad-types/src/lib.rs | 11 |
10 files changed, 127 insertions, 160 deletions
diff --git a/Cargo.lock b/Cargo.lock index 9478ab7cea..2d5c19e329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1218,6 +1218,7 @@ dependencies = [ "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", "mullvad-ipc-client 0.1.0", "mullvad-paths 0.1.0", + "mullvad-types 0.1.0", "notify 4.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "openvpn-plugin 0.3.0 (git+https://github.com/mullvad/openvpn-plugin-rs?branch=auth-failed-event)", "talpid-ipc 0.1.0", diff --git a/gui/src/main/daemon-rpc.ts b/gui/src/main/daemon-rpc.ts index 94cbeeb984..4464db6688 100644 --- a/gui/src/main/daemon-rpc.ts +++ b/gui/src/main/daemon-rpc.ts @@ -1,5 +1,6 @@ import { AccountToken, + DaemonEvent, IAccountData, IAppVersionInfo, ILocation, @@ -295,6 +296,15 @@ const settingsSchema = partialObject({ tunnel_options: tunnelOptionsSchema, }); +const daemonEventSchema = oneOf( + object({ + state_transition: tunnelStateTransitionSchema, + }), + object({ + settings: settingsSchema, + }), +); + export class ResponseParseError extends Error { constructor(message: string, private validationErrorValue?: Error) { super(message); @@ -432,28 +442,15 @@ export class DaemonRpc { } } - public subscribeStateListener( - listener: SubscriptionListener<TunnelStateTransition>, - ): Promise<void> { - return this.transport.subscribe('new_state', (payload) => { - try { - const newState = camelCaseObjectKeys( - validate(tunnelStateTransitionSchema, payload), - ) as TunnelStateTransition; - listener.onEvent(newState); - } catch (error) { - listener.onError(new ResponseParseError('Invalid payload from new_state', error)); - } - }); - } - - public subscribeSettingsListener(listener: SubscriptionListener<ISettings>): Promise<void> { - return this.transport.subscribe('settings', (payload) => { + public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>): Promise<void> { + return this.transport.subscribe('daemon_event', (payload) => { try { - const newSettings = camelCaseObjectKeys(validate(settingsSchema, payload)) as ISettings; - listener.onEvent(newSettings); + const daemonEvent = camelCaseObjectKeys( + validate(daemonEventSchema, payload), + ) as DaemonEvent; + listener.onEvent(daemonEvent); } catch (error) { - listener.onError(new ResponseParseError('Invalid payload from settings', error)); + listener.onError(new ResponseParseError('Invalid payload from daemon_event', error)); } }); } diff --git a/gui/src/main/index.ts b/gui/src/main/index.ts index b5e466912a..d8b558ddcb 100644 --- a/gui/src/main/index.ts +++ b/gui/src/main/index.ts @@ -7,6 +7,7 @@ import * as path from 'path'; import * as uuid from 'uuid'; import { AccountToken, + DaemonEvent, IAppVersionInfo, ILocation, IRelayList, @@ -491,28 +492,20 @@ class ApplicationMain { } private async subscribeEvents(): Promise<void> { - const stateListener = new SubscriptionListener( - (newState: TunnelStateTransition) => { - this.setTunnelState(newState); - }, - (error: Error) => { - log.error(`Cannot deserialize the new state: ${error.message}`); - }, - ); - - const settingsListener = new SubscriptionListener( - (newSettings: ISettings) => { - this.setSettings(newSettings); + const daemonEventListener = new SubscriptionListener( + (daemonEvent: DaemonEvent) => { + if ('stateTransition' in daemonEvent) { + this.setTunnelState(daemonEvent.stateTransition); + } else if ('settings' in daemonEvent) { + this.setSettings(daemonEvent.settings); + } }, (error: Error) => { - log.error(`Cannot deserialize the new settings: ${error.message}`); + log.error(`Cannot deserialize the daemon event: ${error.message}`); }, ); - await Promise.all([ - this.daemonRpc.subscribeStateListener(stateListener), - this.daemonRpc.subscribeSettingsListener(settingsListener), - ]); + return this.daemonRpc.subscribeDaemonEventListener(daemonEventListener); } private setAccountHistory(accountHistory: AccountToken[]) { diff --git a/gui/src/shared/daemon-rpc-types.ts b/gui/src/shared/daemon-rpc-types.ts index 7908c4f67e..a3080b7590 100644 --- a/gui/src/shared/daemon-rpc-types.ts +++ b/gui/src/shared/daemon-rpc-types.ts @@ -41,6 +41,8 @@ export interface ITunnelEndpoint { tunnel: TunnelType; } +export type DaemonEvent = { stateTransition: TunnelStateTransition } | { settings: ISettings }; + export type TunnelStateTransition = | { state: 'disconnected' } | { state: 'connecting'; details?: ITunnelEndpoint } diff --git a/mullvad-cli/src/cmds/status.rs b/mullvad-cli/src/cmds/status.rs index f39d9cef68..fc6df014fc 100644 --- a/mullvad-cli/src/cmds/status.rs +++ b/mullvad-cli/src/cmds/status.rs @@ -1,7 +1,7 @@ use crate::{new_rpc_client, Command, Error, ErrorKind, Result, ResultExt}; use futures::{Future, Stream}; use mullvad_ipc_client::DaemonRpcClient; -use mullvad_types::auth_failed::AuthFailed; +use mullvad_types::{auth_failed::AuthFailed, DaemonEvent}; use talpid_types::tunnel::{BlockReason, TunnelStateTransition}; pub struct Status; @@ -27,17 +27,19 @@ impl Command for Status { print_location(&mut rpc)?; if matches.subcommand_matches("listen").is_some() { let subscription = rpc - .new_state_subscribe() + .daemon_event_subscribe() .wait() .map_err(|_err| Error::from(ErrorKind::CantSubscribe))?; - for new_state in subscription.wait() { - let new_state = new_state.chain_err(|| "Subscription failed")?; - print_state(&new_state); - - use self::TunnelStateTransition::*; - match new_state { - Connected(_) | Disconnected => print_location(&mut rpc)?, - _ => {} + for event in subscription.wait() { + if let DaemonEvent::StateTransition(new_state) = + event.chain_err(|| "Subscription failed")? + { + print_state(&new_state); + use self::TunnelStateTransition::*; + match new_state { + Connected(_) | Disconnected => print_location(&mut rpc)?, + _ => {} + } } } } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 591a418ebd..f77cd1b8e4 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -19,9 +19,8 @@ use mullvad_types::{ relay_list::RelayList, settings::{self, Settings}, states::TargetState, - version, + version, DaemonEvent, }; -use serde; use std::{ collections::{hash_map::Entry, HashMap}, sync::{Arc, Mutex, RwLock}, @@ -150,29 +149,18 @@ build_rpc_trait! { #[rpc(meta, name = "get_version_info")] fn get_version_info(&self, Self::Metadata) -> BoxFuture<version::AppVersionInfo, Error>; - #[pubsub(name = "new_state")] { - /// Subscribes to the `new_state` event notifications. - #[rpc(name = "new_state_subscribe")] - fn new_state_subscribe( + #[pubsub(name = "daemon_event")] { + /// Subscribes to events from the daemon. + #[rpc(name = "daemon_event_subscribe")] + fn daemon_event_subscribe( &self, Self::Metadata, - pubsub::Subscriber<TunnelStateTransition> + pubsub::Subscriber<DaemonEvent> ); - /// Unsubscribes from the `new_state` event notifications. - #[rpc(name = "new_state_unsubscribe")] - fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; - } - - #[pubsub(name = "settings")] { - /// Subscribes to the `settings` event notifications. Getting notified as soon as any - /// daemon settings change. - #[rpc(name = "settings_subscribe")] - fn settings_subscribe(&self, Self::Metadata, pubsub::Subscriber<Settings>); - - /// Unsubscribes from the `settings` event notifications. - #[rpc(name = "settings_unsubscribe")] - fn settings_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + /// Unsubscribes from the `daemon_event` event notifications. + #[rpc(name = "daemon_event_unsubscribe")] + fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; } } } @@ -237,15 +225,9 @@ pub enum ManagementCommand { Shutdown, } -#[derive(Default)] -struct ActiveSubscriptions { - new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<TunnelStateTransition>>>, - settings_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Settings>>>, -} - pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl ManagementInterfaceServer { @@ -290,29 +272,24 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. pub struct EventBroadcaster { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl EventBroadcaster { /// Sends a new state update to all `new_state` subscribers of the management interface. pub fn notify_new_state(&self, new_state: TunnelStateTransition) { - log::debug!("Broadcasting new state to listeners: {:?}", new_state); - self.notify(&self.subscriptions.new_state_subscriptions, new_state); + log::debug!("Broadcasting new state: {:?}", new_state); + self.notify(DaemonEvent::StateTransition(new_state)); } /// Sends settings to all `settings` subscribers of the management interface. pub fn notify_settings(&self, settings: &Settings) { - self.notify(&self.subscriptions.settings_subscriptions, settings.clone()); + log::debug!("Broadcasting new settings"); + self.notify(DaemonEvent::Settings(settings.clone())); } - fn notify<T>( - &self, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, - value: T, - ) where - T: serde::Serialize + Clone, - { - let subscriptions = subscriptions_lock.read().unwrap(); + fn notify(&self, value: DaemonEvent) { + let subscriptions = self.subscriptions.read().unwrap(); for sink in subscriptions.values() { let _ = sink.notify(Ok(value.clone())).wait(); } @@ -320,7 +297,7 @@ impl EventBroadcaster { } struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { - subscriptions: Arc<ActiveSubscriptions>, + subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, tx: Mutex<IntoSender<ManagementCommand, T>>, } @@ -332,41 +309,6 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { } } - fn subscribe<V>( - subscriber: pubsub::Subscriber<V>, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) { - let mut subscriptions = subscriptions_lock.write().unwrap(); - loop { - let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); - if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { - if let Ok(sink) = subscriber.assign_id(id.clone()) { - log::debug!("Accepting new subscription with id {:?}", id); - entry.insert(sink); - } - break; - } - } - } - - fn unsubscribe<V>( - id: &SubscriptionId, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>, - ) -> BoxFuture<(), Error> { - let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); - let result = if was_removed { - log::debug!("Unsubscribing id {:?}", id); - future::ok(()) - } else { - future::err(Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription".to_owned(), - data: None, - }) - }; - Box::new(result) - } - /// Sends a command to the daemon and maps the error to an RPC error. fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> { Box::new( @@ -697,28 +639,39 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi Box::new(future) } - fn new_state_subscribe( + fn daemon_event_subscribe( &self, _: Self::Metadata, - subscriber: pubsub::Subscriber<TunnelStateTransition>, + subscriber: pubsub::Subscriber<DaemonEvent>, ) { - log::debug!("new_state_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); - } - - fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("new_state_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.new_state_subscriptions) - } - - fn settings_subscribe(&self, _: Self::Metadata, subscriber: pubsub::Subscriber<Settings>) { - log::debug!("settings_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.settings_subscriptions); + log::debug!("daemon_event_subscribe"); + let mut subscriptions = self.subscriptions.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + log::debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } } - fn settings_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - log::debug!("settings_unsubscribe"); - Self::unsubscribe(&id, &self.subscriptions.settings_subscriptions) + fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + log::debug!("daemon_event_unsubscribe"); + let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + log::debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }) + }; + Box::new(result) } } diff --git a/mullvad-ipc-client/src/lib.rs b/mullvad-ipc-client/src/lib.rs index 21503d0155..df07c9123f 100644 --- a/mullvad-ipc-client/src/lib.rs +++ b/mullvad-ipc-client/src/lib.rs @@ -11,6 +11,7 @@ use mullvad_types::{ relay_list::RelayList, settings::{Settings, TunnelOptions}, version::AppVersionInfo, + DaemonEvent, }; use serde::{Deserialize, Serialize}; use std::{path::Path, thread}; @@ -248,16 +249,16 @@ impl DaemonRpcClient { .chain_err(|| ErrorKind::RpcCallError(method.to_owned())) } - pub fn new_state_subscribe( + pub fn daemon_event_subscribe( &mut self, ) -> impl Future< - Item = jsonrpc_client_pubsub::Subscription<TunnelStateTransition>, + Item = jsonrpc_client_pubsub::Subscription<DaemonEvent>, Error = jsonrpc_client_pubsub::Error, > { self.subscriber.subscribe( - "new_state_subscribe".to_string(), - "new_state_unsubscribe".to_string(), - "new_state".to_string(), + "daemon_event_subscribe".to_string(), + "daemon_event_unsubscribe".to_string(), + "daemon_event".to_string(), 0, &NO_ARGS, ) diff --git a/mullvad-tests/Cargo.toml b/mullvad-tests/Cargo.toml index 549db35244..0e3554549f 100644 --- a/mullvad-tests/Cargo.toml +++ b/mullvad-tests/Cargo.toml @@ -13,6 +13,7 @@ integration-tests = [] duct = "0.12" mullvad-ipc-client = { path = "../mullvad-ipc-client" } mullvad-paths = { path = "../mullvad-paths" } +mullvad-types = { path = "../mullvad-types" } notify = "4.0" openvpn-plugin = { git = "https://github.com/mullvad/openvpn-plugin-rs", branch = "auth-failed-event", features = ["serde"] } talpid-ipc = { path = "../talpid-ipc" } diff --git a/mullvad-tests/tests/connection.rs b/mullvad-tests/tests/connection.rs index f15db8e5e5..509ffb5bf8 100644 --- a/mullvad-tests/tests/connection.rs +++ b/mullvad-tests/tests/connection.rs @@ -5,6 +5,7 @@ use mullvad_tests::{ mock_openvpn::search_openvpn_args, watch_event, DaemonRunner, MockOpenVpnPluginRpcClient, PathWatcher, }; +use mullvad_types::DaemonEvent; use std::{fs, path::Path, time::Duration}; use talpid_types::{ net::{Endpoint, TransportProtocol, TunnelEndpoint, TunnelType}, @@ -59,7 +60,7 @@ fn respawns_openvpn_if_it_crashes() { fn changes_to_connecting_state() { let mut daemon = DaemonRunner::spawn(); let mut rpc_client = daemon.rpc_client().unwrap(); - let state_events = rpc_client.new_state_subscribe().wait().unwrap(); + let state_events = rpc_client.daemon_event_subscribe().wait().unwrap(); rpc_client.set_account(Some("123456".to_owned())).unwrap(); rpc_client.connect().unwrap(); @@ -80,7 +81,7 @@ fn changes_to_connected_state() { let mut rpc_client = daemon.rpc_client().unwrap(); let openvpn_args_file = daemon.mock_openvpn_args_file(); let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap(); - let state_events = rpc_client.new_state_subscribe().wait().unwrap(); + let state_events = rpc_client.daemon_event_subscribe().wait().unwrap(); rpc_client.set_account(Some("123456".to_owned())).unwrap(); rpc_client.connect().unwrap(); @@ -111,7 +112,7 @@ fn returns_to_connecting_state() { let mut rpc_client = daemon.rpc_client().unwrap(); let openvpn_args_file = daemon.mock_openvpn_args_file(); let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap(); - let state_events = rpc_client.new_state_subscribe().wait().unwrap(); + let state_events = rpc_client.daemon_event_subscribe().wait().unwrap(); rpc_client.set_account(Some("123456".to_owned())).unwrap(); rpc_client.connect().unwrap(); @@ -149,7 +150,7 @@ fn disconnects() { let mut rpc_client = daemon.rpc_client().unwrap(); let openvpn_args_file = daemon.mock_openvpn_args_file(); let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap(); - let state_events = rpc_client.new_state_subscribe().wait().unwrap(); + let state_events = rpc_client.daemon_event_subscribe().wait().unwrap(); rpc_client.set_account(Some("123456".to_owned())).unwrap(); rpc_client.connect().unwrap(); @@ -189,23 +190,28 @@ fn get_default_endpoint() -> TunnelEndpoint { } fn assert_state_event< - S: Stream<Item = TunnelStateTransition, Error = jsonrpc_client_core::Error> + std::fmt::Debug, + S: Stream<Item = DaemonEvent, Error = jsonrpc_client_core::Error> + std::fmt::Debug, >( - receiver: S, + mut receiver: S, expected_state: TunnelStateTransition, ) -> S { use futures::future::Either; - let timer = tokio_timer::Timer::default(); - let timeout = timer.sleep(Duration::from_secs(3)); - - let (received_state, receiver) = match receiver.into_future().select2(timeout).wait() { - Ok(Either::A((stream_result, _timer))) => stream_result, - _ => panic!("Timed out waiting for tunnel state transition"), - }; - + let mut transition = None; + while transition.is_none() { + let timer = tokio_timer::Timer::default(); + let timeout = timer.sleep(Duration::from_secs(3)); + let (event, receiver2) = match receiver.into_future().select2(timeout).wait() { + Ok(Either::A((stream_result, _timer))) => stream_result, + _ => panic!("Timed out waiting for tunnel state transition"), + }; + receiver = receiver2; + if let DaemonEvent::StateTransition(new_state) = event.unwrap() { + transition = Some(new_state); + } + } - assert_eq!(received_state.unwrap(), expected_state); + assert_eq!(transition.unwrap(), expected_state); receiver } diff --git a/mullvad-types/src/lib.rs b/mullvad-types/src/lib.rs index b2659911e4..2ac2295982 100644 --- a/mullvad-types/src/lib.rs +++ b/mullvad-types/src/lib.rs @@ -22,3 +22,14 @@ pub mod wireguard; mod custom_tunnel; pub use crate::custom_tunnel::*; + +/// An event sent out from the daemon to frontends. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DaemonEvent { + /// The daemon transitioned into a new state. + StateTransition(talpid_types::tunnel::TunnelStateTransition), + + /// The daemon settings changed. + Settings(settings::Settings), +} |
