summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2019-03-27 18:03:16 +0100
committerLinus Färnstrand <linus@mullvad.net>2019-03-27 18:03:16 +0100
commitd73e8b7e00bb1b1d8a164298ca66e2c98feac1f4 (patch)
tree37094fc6ce64372f34d880b5d430d06ad19052bf
parentd334f116441186ee57abab70dc0509d8e69b860f (diff)
parentae59b0aa3f8798950f788310fe403fdc2f8f8f6f (diff)
downloadmullvadvpn-d73e8b7e00bb1b1d8a164298ca66e2c98feac1f4.tar.xz
mullvadvpn-d73e8b7e00bb1b1d8a164298ca66e2c98feac1f4.zip
Merge branch 'unify-daemon-subscriptions'
-rw-r--r--Cargo.lock1
-rw-r--r--gui/src/main/daemon-rpc.ts37
-rw-r--r--gui/src/main/index.ts27
-rw-r--r--gui/src/shared/daemon-rpc-types.ts2
-rw-r--r--mullvad-cli/src/cmds/status.rs22
-rw-r--r--mullvad-daemon/src/management_interface.rs139
-rw-r--r--mullvad-ipc-client/src/lib.rs11
-rw-r--r--mullvad-tests/Cargo.toml1
-rw-r--r--mullvad-tests/tests/connection.rs36
-rw-r--r--mullvad-types/src/lib.rs11
10 files changed, 127 insertions, 160 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9478ab7cea..2d5c19e329 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1218,6 +1218,7 @@ dependencies = [
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"mullvad-ipc-client 0.1.0",
"mullvad-paths 0.1.0",
+ "mullvad-types 0.1.0",
"notify 4.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"openvpn-plugin 0.3.0 (git+https://github.com/mullvad/openvpn-plugin-rs?branch=auth-failed-event)",
"talpid-ipc 0.1.0",
diff --git a/gui/src/main/daemon-rpc.ts b/gui/src/main/daemon-rpc.ts
index 94cbeeb984..4464db6688 100644
--- a/gui/src/main/daemon-rpc.ts
+++ b/gui/src/main/daemon-rpc.ts
@@ -1,5 +1,6 @@
import {
AccountToken,
+ DaemonEvent,
IAccountData,
IAppVersionInfo,
ILocation,
@@ -295,6 +296,15 @@ const settingsSchema = partialObject({
tunnel_options: tunnelOptionsSchema,
});
+const daemonEventSchema = oneOf(
+ object({
+ state_transition: tunnelStateTransitionSchema,
+ }),
+ object({
+ settings: settingsSchema,
+ }),
+);
+
export class ResponseParseError extends Error {
constructor(message: string, private validationErrorValue?: Error) {
super(message);
@@ -432,28 +442,15 @@ export class DaemonRpc {
}
}
- public subscribeStateListener(
- listener: SubscriptionListener<TunnelStateTransition>,
- ): Promise<void> {
- return this.transport.subscribe('new_state', (payload) => {
- try {
- const newState = camelCaseObjectKeys(
- validate(tunnelStateTransitionSchema, payload),
- ) as TunnelStateTransition;
- listener.onEvent(newState);
- } catch (error) {
- listener.onError(new ResponseParseError('Invalid payload from new_state', error));
- }
- });
- }
-
- public subscribeSettingsListener(listener: SubscriptionListener<ISettings>): Promise<void> {
- return this.transport.subscribe('settings', (payload) => {
+ public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>): Promise<void> {
+ return this.transport.subscribe('daemon_event', (payload) => {
try {
- const newSettings = camelCaseObjectKeys(validate(settingsSchema, payload)) as ISettings;
- listener.onEvent(newSettings);
+ const daemonEvent = camelCaseObjectKeys(
+ validate(daemonEventSchema, payload),
+ ) as DaemonEvent;
+ listener.onEvent(daemonEvent);
} catch (error) {
- listener.onError(new ResponseParseError('Invalid payload from settings', error));
+ listener.onError(new ResponseParseError('Invalid payload from daemon_event', error));
}
});
}
diff --git a/gui/src/main/index.ts b/gui/src/main/index.ts
index b5e466912a..d8b558ddcb 100644
--- a/gui/src/main/index.ts
+++ b/gui/src/main/index.ts
@@ -7,6 +7,7 @@ import * as path from 'path';
import * as uuid from 'uuid';
import {
AccountToken,
+ DaemonEvent,
IAppVersionInfo,
ILocation,
IRelayList,
@@ -491,28 +492,20 @@ class ApplicationMain {
}
private async subscribeEvents(): Promise<void> {
- const stateListener = new SubscriptionListener(
- (newState: TunnelStateTransition) => {
- this.setTunnelState(newState);
- },
- (error: Error) => {
- log.error(`Cannot deserialize the new state: ${error.message}`);
- },
- );
-
- const settingsListener = new SubscriptionListener(
- (newSettings: ISettings) => {
- this.setSettings(newSettings);
+ const daemonEventListener = new SubscriptionListener(
+ (daemonEvent: DaemonEvent) => {
+ if ('stateTransition' in daemonEvent) {
+ this.setTunnelState(daemonEvent.stateTransition);
+ } else if ('settings' in daemonEvent) {
+ this.setSettings(daemonEvent.settings);
+ }
},
(error: Error) => {
- log.error(`Cannot deserialize the new settings: ${error.message}`);
+ log.error(`Cannot deserialize the daemon event: ${error.message}`);
},
);
- await Promise.all([
- this.daemonRpc.subscribeStateListener(stateListener),
- this.daemonRpc.subscribeSettingsListener(settingsListener),
- ]);
+ return this.daemonRpc.subscribeDaemonEventListener(daemonEventListener);
}
private setAccountHistory(accountHistory: AccountToken[]) {
diff --git a/gui/src/shared/daemon-rpc-types.ts b/gui/src/shared/daemon-rpc-types.ts
index 7908c4f67e..a3080b7590 100644
--- a/gui/src/shared/daemon-rpc-types.ts
+++ b/gui/src/shared/daemon-rpc-types.ts
@@ -41,6 +41,8 @@ export interface ITunnelEndpoint {
tunnel: TunnelType;
}
+export type DaemonEvent = { stateTransition: TunnelStateTransition } | { settings: ISettings };
+
export type TunnelStateTransition =
| { state: 'disconnected' }
| { state: 'connecting'; details?: ITunnelEndpoint }
diff --git a/mullvad-cli/src/cmds/status.rs b/mullvad-cli/src/cmds/status.rs
index f39d9cef68..fc6df014fc 100644
--- a/mullvad-cli/src/cmds/status.rs
+++ b/mullvad-cli/src/cmds/status.rs
@@ -1,7 +1,7 @@
use crate::{new_rpc_client, Command, Error, ErrorKind, Result, ResultExt};
use futures::{Future, Stream};
use mullvad_ipc_client::DaemonRpcClient;
-use mullvad_types::auth_failed::AuthFailed;
+use mullvad_types::{auth_failed::AuthFailed, DaemonEvent};
use talpid_types::tunnel::{BlockReason, TunnelStateTransition};
pub struct Status;
@@ -27,17 +27,19 @@ impl Command for Status {
print_location(&mut rpc)?;
if matches.subcommand_matches("listen").is_some() {
let subscription = rpc
- .new_state_subscribe()
+ .daemon_event_subscribe()
.wait()
.map_err(|_err| Error::from(ErrorKind::CantSubscribe))?;
- for new_state in subscription.wait() {
- let new_state = new_state.chain_err(|| "Subscription failed")?;
- print_state(&new_state);
-
- use self::TunnelStateTransition::*;
- match new_state {
- Connected(_) | Disconnected => print_location(&mut rpc)?,
- _ => {}
+ for event in subscription.wait() {
+ if let DaemonEvent::StateTransition(new_state) =
+ event.chain_err(|| "Subscription failed")?
+ {
+ print_state(&new_state);
+ use self::TunnelStateTransition::*;
+ match new_state {
+ Connected(_) | Disconnected => print_location(&mut rpc)?,
+ _ => {}
+ }
}
}
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 591a418ebd..f77cd1b8e4 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -19,9 +19,8 @@ use mullvad_types::{
relay_list::RelayList,
settings::{self, Settings},
states::TargetState,
- version,
+ version, DaemonEvent,
};
-use serde;
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex, RwLock},
@@ -150,29 +149,18 @@ build_rpc_trait! {
#[rpc(meta, name = "get_version_info")]
fn get_version_info(&self, Self::Metadata) -> BoxFuture<version::AppVersionInfo, Error>;
- #[pubsub(name = "new_state")] {
- /// Subscribes to the `new_state` event notifications.
- #[rpc(name = "new_state_subscribe")]
- fn new_state_subscribe(
+ #[pubsub(name = "daemon_event")] {
+ /// Subscribes to events from the daemon.
+ #[rpc(name = "daemon_event_subscribe")]
+ fn daemon_event_subscribe(
&self,
Self::Metadata,
- pubsub::Subscriber<TunnelStateTransition>
+ pubsub::Subscriber<DaemonEvent>
);
- /// Unsubscribes from the `new_state` event notifications.
- #[rpc(name = "new_state_unsubscribe")]
- fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
- }
-
- #[pubsub(name = "settings")] {
- /// Subscribes to the `settings` event notifications. Getting notified as soon as any
- /// daemon settings change.
- #[rpc(name = "settings_subscribe")]
- fn settings_subscribe(&self, Self::Metadata, pubsub::Subscriber<Settings>);
-
- /// Unsubscribes from the `settings` event notifications.
- #[rpc(name = "settings_unsubscribe")]
- fn settings_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
+ /// Unsubscribes from the `daemon_event` event notifications.
+ #[rpc(name = "daemon_event_unsubscribe")]
+ fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
}
}
}
@@ -237,15 +225,9 @@ pub enum ManagementCommand {
Shutdown,
}
-#[derive(Default)]
-struct ActiveSubscriptions {
- new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<TunnelStateTransition>>>,
- settings_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Settings>>>,
-}
-
pub struct ManagementInterfaceServer {
server: talpid_ipc::IpcServer,
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
}
impl ManagementInterfaceServer {
@@ -290,29 +272,24 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
pub struct EventBroadcaster {
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
}
impl EventBroadcaster {
/// Sends a new state update to all `new_state` subscribers of the management interface.
pub fn notify_new_state(&self, new_state: TunnelStateTransition) {
- log::debug!("Broadcasting new state to listeners: {:?}", new_state);
- self.notify(&self.subscriptions.new_state_subscriptions, new_state);
+ log::debug!("Broadcasting new state: {:?}", new_state);
+ self.notify(DaemonEvent::StateTransition(new_state));
}
/// Sends settings to all `settings` subscribers of the management interface.
pub fn notify_settings(&self, settings: &Settings) {
- self.notify(&self.subscriptions.settings_subscriptions, settings.clone());
+ log::debug!("Broadcasting new settings");
+ self.notify(DaemonEvent::Settings(settings.clone()));
}
- fn notify<T>(
- &self,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>,
- value: T,
- ) where
- T: serde::Serialize + Clone,
- {
- let subscriptions = subscriptions_lock.read().unwrap();
+ fn notify(&self, value: DaemonEvent) {
+ let subscriptions = self.subscriptions.read().unwrap();
for sink in subscriptions.values() {
let _ = sink.notify(Ok(value.clone())).wait();
}
@@ -320,7 +297,7 @@ impl EventBroadcaster {
}
struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> {
- subscriptions: Arc<ActiveSubscriptions>,
+ subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
tx: Mutex<IntoSender<ManagementCommand, T>>,
}
@@ -332,41 +309,6 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
}
}
- fn subscribe<V>(
- subscriber: pubsub::Subscriber<V>,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>,
- ) {
- let mut subscriptions = subscriptions_lock.write().unwrap();
- loop {
- let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
- if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
- if let Ok(sink) = subscriber.assign_id(id.clone()) {
- log::debug!("Accepting new subscription with id {:?}", id);
- entry.insert(sink);
- }
- break;
- }
- }
- }
-
- fn unsubscribe<V>(
- id: &SubscriptionId,
- subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>,
- ) -> BoxFuture<(), Error> {
- let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some();
- let result = if was_removed {
- log::debug!("Unsubscribing id {:?}", id);
- future::ok(())
- } else {
- future::err(Error {
- code: ErrorCode::InvalidParams,
- message: "Invalid subscription".to_owned(),
- data: None,
- })
- };
- Box::new(result)
- }
-
/// Sends a command to the daemon and maps the error to an RPC error.
fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> {
Box::new(
@@ -697,28 +639,39 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
Box::new(future)
}
- fn new_state_subscribe(
+ fn daemon_event_subscribe(
&self,
_: Self::Metadata,
- subscriber: pubsub::Subscriber<TunnelStateTransition>,
+ subscriber: pubsub::Subscriber<DaemonEvent>,
) {
- log::debug!("new_state_subscribe");
- Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions);
- }
-
- fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
- log::debug!("new_state_unsubscribe");
- Self::unsubscribe(&id, &self.subscriptions.new_state_subscriptions)
- }
-
- fn settings_subscribe(&self, _: Self::Metadata, subscriber: pubsub::Subscriber<Settings>) {
- log::debug!("settings_subscribe");
- Self::subscribe(subscriber, &self.subscriptions.settings_subscriptions);
+ log::debug!("daemon_event_subscribe");
+ let mut subscriptions = self.subscriptions.write().unwrap();
+ loop {
+ let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
+ if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
+ if let Ok(sink) = subscriber.assign_id(id.clone()) {
+ log::debug!("Accepting new subscription with id {:?}", id);
+ entry.insert(sink);
+ }
+ break;
+ }
+ }
}
- fn settings_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
- log::debug!("settings_unsubscribe");
- Self::unsubscribe(&id, &self.subscriptions.settings_subscriptions)
+ fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
+ log::debug!("daemon_event_unsubscribe");
+ let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some();
+ let result = if was_removed {
+ log::debug!("Unsubscribing id {:?}", id);
+ future::ok(())
+ } else {
+ future::err(Error {
+ code: ErrorCode::InvalidParams,
+ message: "Invalid subscription".to_owned(),
+ data: None,
+ })
+ };
+ Box::new(result)
}
}
diff --git a/mullvad-ipc-client/src/lib.rs b/mullvad-ipc-client/src/lib.rs
index 21503d0155..df07c9123f 100644
--- a/mullvad-ipc-client/src/lib.rs
+++ b/mullvad-ipc-client/src/lib.rs
@@ -11,6 +11,7 @@ use mullvad_types::{
relay_list::RelayList,
settings::{Settings, TunnelOptions},
version::AppVersionInfo,
+ DaemonEvent,
};
use serde::{Deserialize, Serialize};
use std::{path::Path, thread};
@@ -248,16 +249,16 @@ impl DaemonRpcClient {
.chain_err(|| ErrorKind::RpcCallError(method.to_owned()))
}
- pub fn new_state_subscribe(
+ pub fn daemon_event_subscribe(
&mut self,
) -> impl Future<
- Item = jsonrpc_client_pubsub::Subscription<TunnelStateTransition>,
+ Item = jsonrpc_client_pubsub::Subscription<DaemonEvent>,
Error = jsonrpc_client_pubsub::Error,
> {
self.subscriber.subscribe(
- "new_state_subscribe".to_string(),
- "new_state_unsubscribe".to_string(),
- "new_state".to_string(),
+ "daemon_event_subscribe".to_string(),
+ "daemon_event_unsubscribe".to_string(),
+ "daemon_event".to_string(),
0,
&NO_ARGS,
)
diff --git a/mullvad-tests/Cargo.toml b/mullvad-tests/Cargo.toml
index 549db35244..0e3554549f 100644
--- a/mullvad-tests/Cargo.toml
+++ b/mullvad-tests/Cargo.toml
@@ -13,6 +13,7 @@ integration-tests = []
duct = "0.12"
mullvad-ipc-client = { path = "../mullvad-ipc-client" }
mullvad-paths = { path = "../mullvad-paths" }
+mullvad-types = { path = "../mullvad-types" }
notify = "4.0"
openvpn-plugin = { git = "https://github.com/mullvad/openvpn-plugin-rs", branch = "auth-failed-event", features = ["serde"] }
talpid-ipc = { path = "../talpid-ipc" }
diff --git a/mullvad-tests/tests/connection.rs b/mullvad-tests/tests/connection.rs
index f15db8e5e5..509ffb5bf8 100644
--- a/mullvad-tests/tests/connection.rs
+++ b/mullvad-tests/tests/connection.rs
@@ -5,6 +5,7 @@ use mullvad_tests::{
mock_openvpn::search_openvpn_args, watch_event, DaemonRunner, MockOpenVpnPluginRpcClient,
PathWatcher,
};
+use mullvad_types::DaemonEvent;
use std::{fs, path::Path, time::Duration};
use talpid_types::{
net::{Endpoint, TransportProtocol, TunnelEndpoint, TunnelType},
@@ -59,7 +60,7 @@ fn respawns_openvpn_if_it_crashes() {
fn changes_to_connecting_state() {
let mut daemon = DaemonRunner::spawn();
let mut rpc_client = daemon.rpc_client().unwrap();
- let state_events = rpc_client.new_state_subscribe().wait().unwrap();
+ let state_events = rpc_client.daemon_event_subscribe().wait().unwrap();
rpc_client.set_account(Some("123456".to_owned())).unwrap();
rpc_client.connect().unwrap();
@@ -80,7 +81,7 @@ fn changes_to_connected_state() {
let mut rpc_client = daemon.rpc_client().unwrap();
let openvpn_args_file = daemon.mock_openvpn_args_file();
let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap();
- let state_events = rpc_client.new_state_subscribe().wait().unwrap();
+ let state_events = rpc_client.daemon_event_subscribe().wait().unwrap();
rpc_client.set_account(Some("123456".to_owned())).unwrap();
rpc_client.connect().unwrap();
@@ -111,7 +112,7 @@ fn returns_to_connecting_state() {
let mut rpc_client = daemon.rpc_client().unwrap();
let openvpn_args_file = daemon.mock_openvpn_args_file();
let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap();
- let state_events = rpc_client.new_state_subscribe().wait().unwrap();
+ let state_events = rpc_client.daemon_event_subscribe().wait().unwrap();
rpc_client.set_account(Some("123456".to_owned())).unwrap();
rpc_client.connect().unwrap();
@@ -149,7 +150,7 @@ fn disconnects() {
let mut rpc_client = daemon.rpc_client().unwrap();
let openvpn_args_file = daemon.mock_openvpn_args_file();
let mut openvpn_args_file_events = PathWatcher::watch(&openvpn_args_file).unwrap();
- let state_events = rpc_client.new_state_subscribe().wait().unwrap();
+ let state_events = rpc_client.daemon_event_subscribe().wait().unwrap();
rpc_client.set_account(Some("123456".to_owned())).unwrap();
rpc_client.connect().unwrap();
@@ -189,23 +190,28 @@ fn get_default_endpoint() -> TunnelEndpoint {
}
fn assert_state_event<
- S: Stream<Item = TunnelStateTransition, Error = jsonrpc_client_core::Error> + std::fmt::Debug,
+ S: Stream<Item = DaemonEvent, Error = jsonrpc_client_core::Error> + std::fmt::Debug,
>(
- receiver: S,
+ mut receiver: S,
expected_state: TunnelStateTransition,
) -> S {
use futures::future::Either;
- let timer = tokio_timer::Timer::default();
- let timeout = timer.sleep(Duration::from_secs(3));
-
- let (received_state, receiver) = match receiver.into_future().select2(timeout).wait() {
- Ok(Either::A((stream_result, _timer))) => stream_result,
- _ => panic!("Timed out waiting for tunnel state transition"),
- };
-
+ let mut transition = None;
+ while transition.is_none() {
+ let timer = tokio_timer::Timer::default();
+ let timeout = timer.sleep(Duration::from_secs(3));
+ let (event, receiver2) = match receiver.into_future().select2(timeout).wait() {
+ Ok(Either::A((stream_result, _timer))) => stream_result,
+ _ => panic!("Timed out waiting for tunnel state transition"),
+ };
+ receiver = receiver2;
+ if let DaemonEvent::StateTransition(new_state) = event.unwrap() {
+ transition = Some(new_state);
+ }
+ }
- assert_eq!(received_state.unwrap(), expected_state);
+ assert_eq!(transition.unwrap(), expected_state);
receiver
}
diff --git a/mullvad-types/src/lib.rs b/mullvad-types/src/lib.rs
index b2659911e4..2ac2295982 100644
--- a/mullvad-types/src/lib.rs
+++ b/mullvad-types/src/lib.rs
@@ -22,3 +22,14 @@ pub mod wireguard;
mod custom_tunnel;
pub use crate::custom_tunnel::*;
+
+/// An event sent out from the daemon to frontends.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum DaemonEvent {
+ /// The daemon transitioned into a new state.
+ StateTransition(talpid_types::tunnel::TunnelStateTransition),
+
+ /// The daemon settings changed.
+ Settings(settings::Settings),
+}