summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2019-05-20 14:58:38 +0000
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2019-05-21 13:09:41 +0000
commitc0d08d8d4dd00d06c0ae0be3cc25ca26b557fb94 (patch)
tree137d8c58aeb07d45fc7caf4d8df184a4bf0d6db8
parent1c0fd7fa2809322547d127c13f177c717bebbac4 (diff)
downloadmullvadvpn-c0d08d8d4dd00d06c0ae0be3cc25ca26b557fb94.tar.xz
mullvadvpn-c0d08d8d4dd00d06c0ae0be3cc25ca26b557fb94.zip
Allow daemon to use generic `EventListener`s
-rw-r--r--mullvad-daemon/src/lib.rs179
-rw-r--r--mullvad-daemon/src/management_interface.rs34
2 files changed, 133 insertions, 80 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index bc8bfef9ae..327252ada8 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -21,7 +21,9 @@ mod rpc_uniqueness_check;
pub mod version;
pub use crate::management_interface::ManagementCommand;
-use crate::management_interface::{BoxFuture, ManagementInterfaceServer};
+use crate::management_interface::{
+ BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer,
+};
use futures::{
future,
sync::{mpsc::UnboundedSender, oneshot},
@@ -178,7 +180,19 @@ impl DaemonCommandSender {
}
}
-pub struct Daemon {
+/// Trait representing something that can broadcast daemon events.
+pub trait EventListener {
+ /// Notify that the tunnel state changed.
+ fn notify_new_state(&self, new_state: TunnelStateTransition);
+
+ /// Notify that the settings changed.
+ fn notify_settings(&self, settings: Settings);
+
+ /// Notify that the relay list changed.
+ fn notify_relay_list(&self, relay_list: RelayList);
+}
+
+pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> {
tunnel_command_tx: SyncUnboundedSender<TunnelCommand>,
tunnel_state: TunnelStateTransition,
target_state: TargetState,
@@ -186,7 +200,7 @@ pub struct Daemon {
rx: mpsc::Receiver<InternalDaemonEvent>,
tx: mpsc::Sender<InternalDaemonEvent>,
reconnection_loop_tx: Option<mpsc::Sender<()>>,
- management_interface_broadcaster: management_interface::EventBroadcaster,
+ event_listener: L,
settings: Settings,
account_history: account_history::AccountHistory,
wg_key_proxy: WireguardKeyProxy<HttpHandle>,
@@ -199,7 +213,7 @@ pub struct Daemon {
version: String,
}
-impl Daemon {
+impl Daemon<ManagementInterfaceEventBroadcaster> {
pub fn start(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
@@ -209,6 +223,87 @@ impl Daemon {
if rpc_uniqueness_check::is_another_instance_running() {
return Err(Error::DaemonIsAlreadyRunning);
}
+ let (tx, rx) = mpsc::channel();
+ let management_interface_broadcaster = Self::start_management_interface(tx.clone())?;
+
+ Self::start_internal(
+ tx,
+ rx,
+ management_interface_broadcaster,
+ log_dir,
+ resource_dir,
+ cache_dir,
+ version,
+ )
+ }
+
+ // Starts the management interface and spawns a thread that will process it.
+ // Returns a handle that allows notifying all subscribers on events.
+ fn start_management_interface(
+ event_tx: mpsc::Sender<InternalDaemonEvent>,
+ ) -> Result<ManagementInterfaceEventBroadcaster> {
+ let multiplex_event_tx = IntoSender::from(event_tx.clone());
+ let server = Self::start_management_interface_server(multiplex_event_tx)?;
+ let event_broadcaster = server.event_broadcaster();
+ Self::spawn_management_interface_wait_thread(server, event_tx);
+ Ok(event_broadcaster)
+ }
+
+ fn start_management_interface_server(
+ event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>,
+ ) -> Result<ManagementInterfaceServer> {
+ let server =
+ ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?;
+ info!("Management interface listening on {}", server.socket_path());
+
+ Ok(server)
+ }
+
+ fn spawn_management_interface_wait_thread(
+ server: ManagementInterfaceServer,
+ exit_tx: mpsc::Sender<InternalDaemonEvent>,
+ ) {
+ thread::spawn(move || {
+ server.wait();
+ info!("Management interface shut down");
+ let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited);
+ });
+ }
+}
+
+impl<L> Daemon<L>
+where
+ L: EventListener + Clone + Send + 'static,
+{
+ pub fn start_with_event_listener(
+ event_listener: L,
+ log_dir: Option<PathBuf>,
+ resource_dir: PathBuf,
+ cache_dir: PathBuf,
+ version: String,
+ ) -> Result<Self> {
+ let (tx, rx) = mpsc::channel();
+
+ Self::start_internal(
+ tx,
+ rx,
+ event_listener,
+ log_dir,
+ resource_dir,
+ cache_dir,
+ version,
+ )
+ }
+
+ fn start_internal(
+ internal_event_tx: mpsc::Sender<InternalDaemonEvent>,
+ internal_event_rx: mpsc::Receiver<InternalDaemonEvent>,
+ event_listener: L,
+ log_dir: Option<PathBuf>,
+ resource_dir: PathBuf,
+ cache_dir: PathBuf,
+ version: String,
+ ) -> Result<Self> {
let ca_path = resource_dir.join(mullvad_paths::resources::API_CA_FILENAME);
let mut rpc_manager = mullvad_rpc::MullvadRpcFactory::with_cache_dir(&cache_dir, &ca_path);
@@ -225,15 +320,11 @@ impl Daemon {
let rpc_handle = rpc_handle.map_err(Error::InitRpcClient)?;
let https_handle = https_handle.map_err(Error::InitHttpsClient)?;
- let (internal_event_tx, internal_event_rx) = mpsc::channel();
-
- let management_interface_broadcaster =
- Self::start_management_interface(internal_event_tx.clone())?;
- let relay_list_broadcaster = management_interface_broadcaster.clone();
-
+ let relay_list_listener = event_listener.clone();
let on_relay_list_update = move |relay_list: &RelayList| {
- relay_list_broadcaster.notify_relay_list(relay_list.clone());
+ relay_list_listener.notify_relay_list(relay_list.clone());
};
+
let relay_selector = relays::RelaySelector::new(
rpc_handle.clone(),
on_relay_list_update,
@@ -269,7 +360,7 @@ impl Daemon {
rx: internal_event_rx,
tx: internal_event_tx,
reconnection_loop_tx: None,
- management_interface_broadcaster,
+ event_listener,
settings,
account_history,
wg_key_proxy: WireguardKeyProxy::new(rpc_handle.clone()),
@@ -283,39 +374,6 @@ impl Daemon {
})
}
- // Starts the management interface and spawns a thread that will process it.
- // Returns a handle that allows notifying all subscribers on events.
- fn start_management_interface(
- event_tx: mpsc::Sender<InternalDaemonEvent>,
- ) -> Result<management_interface::EventBroadcaster> {
- let multiplex_event_tx = IntoSender::from(event_tx.clone());
- let server = Self::start_management_interface_server(multiplex_event_tx)?;
- let event_broadcaster = server.event_broadcaster();
- Self::spawn_management_interface_wait_thread(server, event_tx);
- Ok(event_broadcaster)
- }
-
- fn start_management_interface_server(
- event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>,
- ) -> Result<ManagementInterfaceServer> {
- let server =
- ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?;
- info!("Management interface listening on {}", server.socket_path());
-
- Ok(server)
- }
-
- fn spawn_management_interface_wait_thread(
- server: ManagementInterfaceServer,
- exit_tx: mpsc::Sender<InternalDaemonEvent>,
- ) {
- thread::spawn(move || {
- server.wait();
- info!("Management interface shut down");
- let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited);
- });
- }
-
/// Retrieve a channel for sending daemon commands.
pub fn command_sender(&self) -> DaemonCommandSender {
DaemonCommandSender::new(self.tx.clone())
@@ -334,7 +392,6 @@ impl Daemon {
break;
}
}
- self.management_interface_broadcaster.close();
Ok(())
}
@@ -373,8 +430,7 @@ impl Daemon {
}
self.tunnel_state = tunnel_state.clone();
- self.management_interface_broadcaster
- .notify_new_state(tunnel_state);
+ self.event_listener.notify_new_state(tunnel_state);
}
fn handle_generate_tunnel_parameters(
@@ -625,8 +681,7 @@ impl Daemon {
Ok(account_changed) => {
Self::oneshot_send(tx, (), "set_account response");
if account_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
match account_token {
Some(token) => {
if let Err(e) = self.account_history.bump_history(&token) {
@@ -693,8 +748,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "update_relay_settings response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
info!("Initiating tunnel restart because the relay settings changed");
self.reconnect_tunnel();
}
@@ -709,8 +763,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_allow_lan response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
self.send_tunnel_command(TunnelCommand::AllowLan(allow_lan));
}
}
@@ -730,8 +783,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_block_when_disconnected response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
self.send_tunnel_command(TunnelCommand::BlockWhenDisconnected(
block_when_disconnected,
));
@@ -747,8 +799,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set auto-connect response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
}
}
Err(e) => error!("{}", e.display_chain_with_msg("Unable to save settings")),
@@ -761,8 +812,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_openvpn_mssfix response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
info!("Initiating tunnel restart because the OpenVPN mssfix setting changed");
self.reconnect_tunnel();
}
@@ -786,8 +836,7 @@ impl Daemon {
(Ok(proxy_changed), Ok(constraints_changed)) => {
Self::oneshot_send(tx, Ok(()), "set_openvpn_proxy response");
if proxy_changed || constraints_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
info!("Initiating tunnel restart because the OpenVPN proxy setting changed");
self.reconnect_tunnel();
}
@@ -828,8 +877,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_enable_ipv6 response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
info!("Initiating tunnel restart because the enable IPv6 setting changed");
self.reconnect_tunnel();
}
@@ -844,8 +892,7 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_wireguard_mtu response");
if settings_changed {
- self.management_interface_broadcaster
- .notify_settings(self.settings.clone());
+ self.event_listener.notify_settings(self.settings.clone());
info!("Initiating tunnel restart because the WireGuard MTU setting changed");
self.reconnect_tunnel();
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index c2ebf4ceb2..9d212abef8 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,3 +1,4 @@
+use crate::EventListener;
use jsonrpc_core::{
futures::{
future,
@@ -257,10 +258,10 @@ impl ManagementInterfaceServer {
self.server.path()
}
- pub fn event_broadcaster(&self) -> EventBroadcaster {
- EventBroadcaster {
+ pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster {
+ ManagementInterfaceEventBroadcaster {
subscriptions: self.subscriptions.clone(),
- close_handle: self.server.close_handle(),
+ close_handle: Some(self.server.close_handle()),
}
}
@@ -273,35 +274,32 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
#[derive(Clone)]
-pub struct EventBroadcaster {
+pub struct ManagementInterfaceEventBroadcaster {
subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
- close_handle: talpid_ipc::CloseHandle,
+ close_handle: Option<talpid_ipc::CloseHandle>,
}
-impl EventBroadcaster {
- /// Notifies that the management interface should be closed
- pub fn close(self) {
- self.close_handle.close();
- }
-
+impl EventListener for ManagementInterfaceEventBroadcaster {
/// Sends a new state update to all `new_state` subscribers of the management interface.
- pub fn notify_new_state(&self, new_state: TunnelStateTransition) {
+ fn notify_new_state(&self, new_state: TunnelStateTransition) {
log::debug!("Broadcasting new state: {:?}", new_state);
self.notify(DaemonEvent::StateTransition(new_state));
}
/// Sends settings to all `settings` subscribers of the management interface.
- pub fn notify_settings(&self, settings: Settings) {
+ fn notify_settings(&self, settings: Settings) {
log::debug!("Broadcasting new settings");
self.notify(DaemonEvent::Settings(settings));
}
/// Sends settings to all `settings` subscribers of the management interface.
- pub fn notify_relay_list(&self, relay_list: RelayList) {
+ fn notify_relay_list(&self, relay_list: RelayList) {
log::debug!("Broadcasting new relay list");
self.notify(DaemonEvent::RelayList(relay_list));
}
+}
+impl ManagementInterfaceEventBroadcaster {
fn notify(&self, value: DaemonEvent) {
let subscriptions = self.subscriptions.read().unwrap();
for sink in subscriptions.values() {
@@ -310,6 +308,14 @@ impl EventBroadcaster {
}
}
+impl Drop for ManagementInterfaceEventBroadcaster {
+ fn drop(&mut self) {
+ if let Some(close_handle) = self.close_handle.take() {
+ close_handle.close();
+ }
+ }
+}
+
struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> {
subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
tx: Mutex<IntoSender<ManagementCommand, T>>,