diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2019-03-28 18:19:38 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2019-03-28 18:19:38 +0100 |
| commit | f6d349dd30dfea61d58b47c0fe0f7dc3e40ca929 (patch) | |
| tree | 71ccb73f47d6f50219c9974835a2718a8e4349e8 | |
| parent | 34e0808f0c8446ece987f05a5b8aa8926f68b962 (diff) | |
| parent | c399cbe64ee2744d5917074e0b203cd353942403 (diff) | |
| download | mullvadvpn-f6d349dd30dfea61d58b47c0fe0f7dc3e40ca929.tar.xz mullvadvpn-f6d349dd30dfea61d58b47c0fe0f7dc3e40ca929.zip | |
Merge branch 'notify-about-relay-list-updates'
| -rw-r--r-- | gui/src/main/daemon-rpc.ts | 3 | ||||
| -rw-r--r-- | gui/src/main/index.ts | 29 | ||||
| -rw-r--r-- | gui/src/shared/daemon-rpc-types.ts | 5 | ||||
| -rw-r--r-- | mullvad-cli/src/cmds/status.rs | 37 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 88 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 11 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 27 | ||||
| -rw-r--r-- | mullvad-types/src/lib.rs | 3 |
8 files changed, 119 insertions, 84 deletions
diff --git a/gui/src/main/daemon-rpc.ts b/gui/src/main/daemon-rpc.ts index 4464db6688..7897aba7c9 100644 --- a/gui/src/main/daemon-rpc.ts +++ b/gui/src/main/daemon-rpc.ts @@ -303,6 +303,9 @@ const daemonEventSchema = oneOf( object({ settings: settingsSchema, }), + object({ + relay_list: relayListSchema, + }), ); export class ResponseParseError extends Error { diff --git a/gui/src/main/index.ts b/gui/src/main/index.ts index d8b558ddcb..6080c80af2 100644 --- a/gui/src/main/index.ts +++ b/gui/src/main/index.ts @@ -28,7 +28,6 @@ import ReconnectionBackoff from './reconnection-backoff'; import TrayIconController, { TrayIconType } from './tray-icon-controller'; import WindowController from './window-controller'; -const RELAY_LIST_UPDATE_INTERVAL = 60 * 60 * 1000; const VERSION_UPDATE_INTERVAL = 24 * 60 * 60 * 1000; const DAEMON_RPC_PATH = @@ -96,7 +95,6 @@ class ApplicationMain { private lastDisconnectedLocation?: ILocation; private relays: IRelayList = { countries: [] }; - private relaysInterval?: NodeJS.Timeout; private currentVersion: ICurrentAppVersionInfo = { daemon: '', @@ -417,7 +415,6 @@ class ApplicationMain { this.fetchLatestVersion(); // start periodic updates - this.startRelaysPeriodicUpdates(); this.startLatestVersionPeriodicUpdates(); // notify user about inconsistent version @@ -447,7 +444,6 @@ class ApplicationMain { this.connectedToDaemon = false; // stop periodic updates - this.stopRelaysPeriodicUpdates(); this.stopLatestVersionPeriodicUpdates(); // notify renderer process @@ -498,6 +494,8 @@ class ApplicationMain { this.setTunnelState(daemonEvent.stateTransition); } else if ('settings' in daemonEvent) { this.setSettings(daemonEvent.settings); + } else if ('relayList' in daemonEvent) { + this.setRelays(daemonEvent.relayList, this.settings.relaySettings); } }, (error: Error) => { @@ -605,29 +603,6 @@ class ApplicationMain { }; } - private startRelaysPeriodicUpdates() { - log.debug('Start relays periodic updates'); - - const handler = async () => { - try { - this.setRelays(await this.daemonRpc.getRelayLocations(), this.settings.relaySettings); - } catch (error) { - log.error(`Failed to fetch relay locations: ${error.message}`); - } - }; - - this.relaysInterval = global.setInterval(handler, RELAY_LIST_UPDATE_INTERVAL); - } - - private stopRelaysPeriodicUpdates() { - if (this.relaysInterval) { - clearInterval(this.relaysInterval); - this.relaysInterval = undefined; - - log.debug('Stop relays periodic updates'); - } - } - private setDaemonVersion(daemonVersion: string) { const guiVersion = app.getVersion().replace('.0', ''); const versionInfo = { diff --git a/gui/src/shared/daemon-rpc-types.ts b/gui/src/shared/daemon-rpc-types.ts index a3080b7590..6b29ddd764 100644 --- a/gui/src/shared/daemon-rpc-types.ts +++ b/gui/src/shared/daemon-rpc-types.ts @@ -41,7 +41,10 @@ export interface ITunnelEndpoint { tunnel: TunnelType; } -export type DaemonEvent = { stateTransition: TunnelStateTransition } | { settings: ISettings }; +export type DaemonEvent = + | { stateTransition: TunnelStateTransition } + | { settings: ISettings } + | { relayList: IRelayList }; export type TunnelStateTransition = | { state: 'disconnected' } diff --git a/mullvad-cli/src/cmds/status.rs b/mullvad-cli/src/cmds/status.rs index fc6df014fc..862514a9f4 100644 --- a/mullvad-cli/src/cmds/status.rs +++ b/mullvad-cli/src/cmds/status.rs @@ -15,7 +15,13 @@ impl Command for Status { clap::SubCommand::with_name(self.name()) .about("View the state of the VPN tunnel") .subcommand( - clap::SubCommand::with_name("listen").about("Listen for VPN tunnel state changes"), + clap::SubCommand::with_name("listen") + .about("Listen for VPN tunnel state changes") + .arg( + clap::Arg::with_name("verbose") + .short("v") + .help("Enables verbose output"), + ), ) } @@ -25,20 +31,31 @@ impl Command for Status { print_state(&state); print_location(&mut rpc)?; - if matches.subcommand_matches("listen").is_some() { + if let Some(listen_matches) = matches.subcommand_matches("listen") { + let verbose = listen_matches.is_present("verbose"); let subscription = rpc .daemon_event_subscribe() .wait() .map_err(|_err| Error::from(ErrorKind::CantSubscribe))?; for event in subscription.wait() { - if let DaemonEvent::StateTransition(new_state) = - event.chain_err(|| "Subscription failed")? - { - print_state(&new_state); - use self::TunnelStateTransition::*; - match new_state { - Connected(_) | Disconnected => print_location(&mut rpc)?, - _ => {} + match event.chain_err(|| "Subscription failed")? { + DaemonEvent::StateTransition(new_state) => { + print_state(&new_state); + use self::TunnelStateTransition::*; + match new_state { + Connected(_) | Disconnected => print_location(&mut rpc)?, + _ => {} + } + } + DaemonEvent::Settings(settings) => { + if verbose { + println!("New settings: {:#?}", settings); + } + } + DaemonEvent::RelayList(relay_list) => { + if verbose { + println!("New relay list: {:#?}", relay_list); + } } } } diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 7a4e4b9943..55bfce8f62 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -79,7 +79,7 @@ error_chain! { type SyncUnboundedSender<T> = ::futures::sink::Wait<UnboundedSender<T>>; /// All events that can happen in the daemon. Sent from various threads and exposed interfaces. -pub enum DaemonEvent { +enum InternalDaemonEvent { /// Tunnel has changed state. TunnelStateTransition(TunnelStateTransition), /// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay. @@ -92,15 +92,15 @@ pub enum DaemonEvent { TriggerShutdown, } -impl From<TunnelStateTransition> for DaemonEvent { +impl From<TunnelStateTransition> for InternalDaemonEvent { fn from(tunnel_state_transition: TunnelStateTransition) -> Self { - DaemonEvent::TunnelStateTransition(tunnel_state_transition) + InternalDaemonEvent::TunnelStateTransition(tunnel_state_transition) } } -impl From<ManagementCommand> for DaemonEvent { +impl From<ManagementCommand> for InternalDaemonEvent { fn from(command: ManagementCommand) -> Self { - DaemonEvent::ManagementInterfaceEvent(command) + InternalDaemonEvent::ManagementInterfaceEvent(command) } } @@ -153,8 +153,8 @@ pub struct Daemon { tunnel_state: TunnelStateTransition, target_state: TargetState, state: DaemonExecutionState, - rx: mpsc::Receiver<DaemonEvent>, - tx: mpsc::Sender<DaemonEvent>, + rx: mpsc::Receiver<InternalDaemonEvent>, + tx: mpsc::Sender<InternalDaemonEvent>, reconnection_loop_tx: Option<mpsc::Sender<()>>, management_interface_broadcaster: management_interface::EventBroadcaster, #[cfg(unix)] @@ -198,15 +198,28 @@ impl Daemon { let rpc_handle = rpc_handle.chain_err(|| "Unable to create RPC client")?; let https_handle = https_handle.chain_err(|| "Unable to create am.i.mullvad client")?; - let relay_selector = - relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir); + let (internal_event_tx, internal_event_rx) = mpsc::channel(); + + let management_interface_result = + Self::start_management_interface(internal_event_tx.clone())?; + + let management_interface_broadcaster = management_interface_result.0.clone(); + let on_relay_list_update = move |relay_list: &RelayList| { + management_interface_broadcaster.notify_relay_list(relay_list.clone()); + }; + let relay_selector = relays::RelaySelector::new( + rpc_handle.clone(), + on_relay_list_update, + &resource_dir, + &cache_dir, + ); let settings = Settings::load().chain_err(|| "Unable to read settings")?; let account_history = account_history::AccountHistory::new(&cache_dir) .chain_err(|| "Unable to read wireguard key cache")?; - - let (tx, rx) = mpsc::channel(); - let tunnel_parameters_generator = MullvadTunnelParametersGenerator { tx: tx.clone() }; + let tunnel_parameters_generator = MullvadTunnelParametersGenerator { + tx: internal_event_tx.clone(), + }; let tunnel_command_tx = tunnel_state_machine::spawn( settings.get_allow_lan(), settings.get_block_when_disconnected(), @@ -214,22 +227,19 @@ impl Daemon { log_dir, resource_dir, cache_dir.clone(), - IntoSender::from(tx.clone()), + IntoSender::from(internal_event_tx.clone()), )?; - let target_state = TargetState::Unsecured; - let management_interface_result = Self::start_management_interface(tx.clone())?; - // Attempt to download a fresh relay list relay_selector.update(); Ok(Daemon { tunnel_command_tx: Sink::wait(tunnel_command_tx), tunnel_state: TunnelStateTransition::Disconnected, - target_state, + target_state: TargetState::Unsecured, state: DaemonExecutionState::Running, - rx, - tx, + rx: internal_event_rx, + tx: internal_event_tx, reconnection_loop_tx: None, management_interface_broadcaster: management_interface_result.0, #[cfg(unix)] @@ -250,7 +260,7 @@ impl Daemon { // Starts the management interface and spawns a thread that will process it. // Returns a handle that allows notifying all subscribers on events. fn start_management_interface( - event_tx: mpsc::Sender<DaemonEvent>, + event_tx: mpsc::Sender<InternalDaemonEvent>, ) -> Result<(management_interface::EventBroadcaster, String)> { let multiplex_event_tx = IntoSender::from(event_tx.clone()); let server = Self::start_management_interface_server(multiplex_event_tx)?; @@ -261,7 +271,7 @@ impl Daemon { } fn start_management_interface_server( - event_tx: IntoSender<ManagementCommand, DaemonEvent>, + event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>, ) -> Result<ManagementInterfaceServer> { let server = ManagementInterfaceServer::start(event_tx) .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"))?; @@ -275,12 +285,12 @@ impl Daemon { fn spawn_management_interface_wait_thread( server: ManagementInterfaceServer, - exit_tx: mpsc::Sender<DaemonEvent>, + exit_tx: mpsc::Sender<InternalDaemonEvent>, ) { thread::spawn(move || { server.wait(); error!("Mullvad management interface shut down"); - let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited); + let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited); }); } @@ -300,8 +310,8 @@ impl Daemon { Ok(()) } - fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { - use self::DaemonEvent::*; + fn handle_event(&mut self, event: InternalDaemonEvent) -> Result<()> { + use self::InternalDaemonEvent::*; match event { TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition), GenerateTunnelParameters(tunnel_parameters_tx, retry_attempt) => { @@ -434,7 +444,7 @@ impl Daemon { if let Err(mpsc::RecvTimeoutError::Timeout) = rx.recv_timeout(delay) { debug!("Attempting to reconnect"); - let _ = tunnel_command_tx.send(DaemonEvent::ManagementInterfaceEvent( + let _ = tunnel_command_tx.send(InternalDaemonEvent::ManagementInterfaceEvent( ManagementCommand::SetTargetState(result_tx, TargetState::Secured), )); } @@ -585,7 +595,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_account response"); if account_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); match account_token { Some(token) => { if let Err(e) = self.account_history.bump_history(&token) { @@ -651,7 +661,7 @@ impl Daemon { Self::oneshot_send(tx, (), "update_relay_settings response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the relay settings changed"); self.reconnect_tunnel(); } @@ -667,7 +677,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_allow_lan response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); self.send_tunnel_command(TunnelCommand::AllowLan(allow_lan)); } } @@ -688,7 +698,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_block_when_disconnected response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); self.send_tunnel_command(TunnelCommand::BlockWhenDisconnected( block_when_disconnected, )); @@ -705,7 +715,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set auto-connect response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); } } Err(e) => error!("{}", e.display_chain()), @@ -719,7 +729,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_openvpn_mssfix response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the OpenVPN mssfix setting changed"); self.reconnect_tunnel(); } @@ -744,7 +754,7 @@ impl Daemon { Self::oneshot_send(tx, Ok(()), "set_openvpn_proxy response"); if proxy_changed || constraints_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the OpenVPN proxy setting changed"); self.reconnect_tunnel(); } @@ -786,7 +796,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_enable_ipv6 response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the enable IPv6 setting changed"); self.reconnect_tunnel(); } @@ -802,7 +812,7 @@ impl Daemon { Self::oneshot_send(tx, (), "set_wireguard_mtu response"); if settings_changed { self.management_interface_broadcaster - .notify_settings(&self.settings); + .notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the WireGuard MTU setting changed"); self.reconnect_tunnel(); } @@ -979,12 +989,12 @@ impl Daemon { } pub struct DaemonShutdownHandle { - tx: mpsc::Sender<DaemonEvent>, + tx: mpsc::Sender<InternalDaemonEvent>, } impl DaemonShutdownHandle { pub fn shutdown(&self) { - let _ = self.tx.send(DaemonEvent::TriggerShutdown); + let _ = self.tx.send(InternalDaemonEvent::TriggerShutdown); } } @@ -1005,14 +1015,14 @@ impl Drop for Daemon { struct MullvadTunnelParametersGenerator { - tx: mpsc::Sender<DaemonEvent>, + tx: mpsc::Sender<InternalDaemonEvent>, } impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { fn generate(&mut self, retry_attempt: u32) -> Option<TunnelParameters> { let (response_tx, response_rx) = mpsc::channel(); self.tx - .send(DaemonEvent::GenerateTunnelParameters( + .send(InternalDaemonEvent::GenerateTunnelParameters( response_tx, retry_attempt, )) diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index f77cd1b8e4..a6bc12f8b8 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -271,6 +271,7 @@ impl ManagementInterfaceServer { } /// A handle that allows broadcasting messages to all subscribers of the management interface. +#[derive(Clone)] pub struct EventBroadcaster { subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } @@ -283,9 +284,15 @@ impl EventBroadcaster { } /// Sends settings to all `settings` subscribers of the management interface. - pub fn notify_settings(&self, settings: &Settings) { + pub fn notify_settings(&self, settings: Settings) { log::debug!("Broadcasting new settings"); - self.notify(DaemonEvent::Settings(settings.clone())); + self.notify(DaemonEvent::Settings(settings)); + } + + /// Sends settings to all `settings` subscribers of the management interface. + pub fn notify_relay_list(&self, relay_list: RelayList) { + log::debug!("Broadcasting new relay list"); + self.notify(DaemonEvent::RelayList(relay_list)); } fn notify(&self, value: DaemonEvent) { diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index a6ba4cb0af..7725477885 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -147,7 +147,12 @@ pub struct RelaySelector { impl RelaySelector { /// Returns a new `RelaySelector` backed by relays cached on disk. Use the `update` method /// to refresh the relay list from the internet. - pub fn new(rpc_handle: HttpHandle, resource_dir: &Path, cache_dir: &Path) -> Self { + pub fn new( + rpc_handle: HttpHandle, + on_update: impl Fn(&RelayList) + Send + 'static, + resource_dir: &Path, + cache_dir: &Path, + ) -> Self { let cache_path = cache_dir.join(RELAYS_FILENAME); let resource_path = resource_dir.join(RELAYS_FILENAME); let unsynchronized_parsed_relays = Self::read_relays_from_disk(&cache_path, &resource_path) @@ -163,7 +168,12 @@ impl RelaySelector { .format(DATE_TIME_FORMAT_STR) ); let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays)); - let updater = RelayListUpdater::spawn(rpc_handle, cache_path, parsed_relays.clone()); + let updater = RelayListUpdater::spawn( + rpc_handle, + cache_path, + parsed_relays.clone(), + Box::new(on_update), + ); RelaySelector { parsed_relays, rng: rand::thread_rng(), @@ -498,6 +508,7 @@ struct RelayListUpdater { rpc_client: RelayListProxy<HttpHandle>, cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, + on_update: Box<dyn Fn(&RelayList)>, close_handle: mpsc::Receiver<()>, } @@ -506,10 +517,13 @@ impl RelayListUpdater { rpc_handle: HttpHandle, cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, + on_update: Box<dyn Fn(&RelayList) + Send + 'static>, ) -> RelayListUpdaterHandle { let (tx, rx) = mpsc::channel(); - thread::spawn(move || Self::new(rpc_handle, cache_path, parsed_relays, rx).run()); + thread::spawn(move || { + Self::new(rpc_handle, cache_path, parsed_relays, on_update, rx).run() + }); tx } @@ -518,6 +532,7 @@ impl RelayListUpdater { rpc_handle: HttpHandle, cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, + on_update: Box<dyn Fn(&RelayList)>, close_handle: mpsc::Receiver<()>, ) -> Self { let rpc_client = RelayListProxy::new(rpc_handle); @@ -526,6 +541,7 @@ impl RelayListUpdater { rpc_client, cache_path, parsed_relays, + on_update, close_handle, } } @@ -582,8 +598,9 @@ impl RelayListUpdater { new_parsed_relays.relays().len() ); - *self.lock_parsed_relays() = new_parsed_relays; - + let mut parsed_relays = self.lock_parsed_relays(); + *parsed_relays = new_parsed_relays; + (self.on_update)(parsed_relays.locations()); Ok(()) } diff --git a/mullvad-types/src/lib.rs b/mullvad-types/src/lib.rs index 2ac2295982..ad83e0276d 100644 --- a/mullvad-types/src/lib.rs +++ b/mullvad-types/src/lib.rs @@ -32,4 +32,7 @@ pub enum DaemonEvent { /// The daemon settings changed. Settings(settings::Settings), + + /// The daemon got an updated relay list. + RelayList(relay_list::RelayList), } |
