diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-07-17 10:36:52 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-07-17 10:48:09 +0200 |
| commit | f07f1a0262c908ff83490850b67a167f963efc2d (patch) | |
| tree | 5a5209baefc0b10ba528f254e7ce2456c23f6ac9 /mullvad-daemon/src | |
| parent | 6a4202fc8c55752f0fef86c04b628a3f9ada5279 (diff) | |
| download | mullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.tar.xz mullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.zip | |
Rename all crates from snake_case to kebab-case
Diffstat (limited to 'mullvad-daemon/src')
| -rw-r--r-- | mullvad-daemon/src/cli.rs | 45 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 498 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 379 | ||||
| -rw-r--r-- | mullvad-daemon/src/mock_ipc.rs | 217 | ||||
| -rw-r--r-- | mullvad-daemon/src/rpc_info.rs | 47 | ||||
| -rw-r--r-- | mullvad-daemon/src/shutdown.rs | 42 |
6 files changed, 1228 insertions, 0 deletions
diff --git a/mullvad-daemon/src/cli.rs b/mullvad-daemon/src/cli.rs new file mode 100644 index 0000000000..670a2bf28b --- /dev/null +++ b/mullvad-daemon/src/cli.rs @@ -0,0 +1,45 @@ +use clap::{App, Arg}; +use log; + +use std::path::PathBuf; + +pub struct Config { + pub log_level: log::LogLevelFilter, + pub log_file: Option<PathBuf>, +} + +pub fn get_config() -> Config { + let app = create_app(); + let matches = app.get_matches(); + + let log_level = match matches.occurrences_of("v") { + 0 => log::LogLevelFilter::Info, + 1 => log::LogLevelFilter::Debug, + _ => log::LogLevelFilter::Trace, + }; + let log_file = matches.value_of_os("log_file").map(PathBuf::from); + + Config { + log_level, + log_file, + } +} + +fn create_app() -> App<'static, 'static> { + App::new(::CRATE_NAME) + .version(crate_version!()) + .author(crate_authors!()) + .about(crate_description!()) + .arg( + Arg::with_name("v") + .short("v") + .multiple(true) + .help("Sets the level of verbosity."), + ) + .arg( + Arg::with_name("log_file") + .long("log") + .takes_value(true) + .help("Activates file logging to the given path"), + ) +} diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs new file mode 100644 index 0000000000..b26ca315a8 --- /dev/null +++ b/mullvad-daemon/src/main.rs @@ -0,0 +1,498 @@ +#[macro_use] +extern crate clap; +extern crate chrono; +#[macro_use] +extern crate log; +#[macro_use] +extern crate error_chain; +extern crate fern; + +extern crate serde; +#[macro_use] +extern crate serde_derive; + +extern crate jsonrpc_core; +extern crate jsonrpc_pubsub; +#[macro_use] +extern crate jsonrpc_macros; +extern crate jsonrpc_ws_server; +extern crate uuid; +#[macro_use] +extern crate lazy_static; + +extern crate mullvad_types; +extern crate talpid_core; +extern crate talpid_ipc; + +mod cli; +mod management_interface; +mod rpc_info; +mod shutdown; + +use error_chain::ChainedError; +use management_interface::{ManagementInterfaceServer, TunnelCommand}; +use mullvad_types::states::{DaemonState, SecurityState, TargetState}; +use std::io; + +use std::path::PathBuf; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; + +use talpid_core::mpsc::IntoSender; +use talpid_core::net::{Endpoint, TransportProtocol}; +use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; + +error_chain!{ + errors { + /// The client is in the wrong state for the requested operation. Optimally the code should + /// be written in such a way so such states can't exist. + InvalidState { + description("Client is in an invalid state for the requested operation") + } + TunnelError(msg: &'static str) { + description("Error in the tunnel monitor") + display("Tunnel monitor error: {}", msg) + } + ManagementInterfaceError(msg: &'static str) { + description("Error in the management interface") + display("Management interface error: {}", msg) + } + InvalidSettings(msg: &'static str) { + description("Invalid settings") + display("Invalid Settings: {}", msg) + } + } +} + +lazy_static! { + // Temporary store of hardcoded remotes. + static ref REMOTES: [Endpoint; 3] = [ + Endpoint::new("se5.mullvad.net", 1300, TransportProtocol::Udp), + Endpoint::new("se6.mullvad.net", 1300, TransportProtocol::Udp), + Endpoint::new("se7.mullvad.net", 1300, TransportProtocol::Udp), + ]; +} + +static CRATE_NAME: &str = "mullvadd"; + + +/// All events that can happen in the daemon. Sent from various threads and exposed interfaces. +pub enum DaemonEvent { + /// An event coming from the tunnel software to indicate a change in state. + TunnelEvent(TunnelEvent), + /// Triggered by the thread waiting for the tunnel process. Means the tunnel process exited. + TunnelExited(tunnel::Result<()>), + /// Triggered by the thread waiting for a tunnel close operation to complete. Contains the + /// result of trying to kill the tunnel. + TunnelKillResult(io::Result<()>), + /// An event coming from the JSONRPC-2.0 management interface. + ManagementInterfaceEvent(TunnelCommand), + /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly. + ManagementInterfaceExited(talpid_ipc::Result<()>), + /// Daemon shutdown triggered by a signal, ctrl-c or similar. + TriggerShutdown, +} + +impl From<TunnelEvent> for DaemonEvent { + fn from(tunnel_event: TunnelEvent) -> Self { + DaemonEvent::TunnelEvent(tunnel_event) + } +} + +impl From<TunnelCommand> for DaemonEvent { + fn from(tunnel_command: TunnelCommand) -> Self { + DaemonEvent::ManagementInterfaceEvent(tunnel_command) + } +} + +/// Represents the internal state of the actual tunnel. +// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TunnelState { + /// No tunnel is running. + NotRunning, + /// The tunnel has been started, but it is not established/functional. + Connecting, + /// The tunnel is up and working. + Connected, + /// This state is active from when we manually trigger a tunnel kill until the tunnel wait + /// operation (TunnelExit) returned. + Exiting, +} + +impl TunnelState { + pub fn as_security_state(&self) -> SecurityState { + match *self { + TunnelState::Connected => SecurityState::Secured, + _ => SecurityState::Unsecured, + } + } +} + + +struct Daemon { + state: TunnelState, + // The tunnel_close_handle must only exist in the Connecting and Connected states! + tunnel_close_handle: Option<tunnel::CloseHandle>, + last_broadcasted_state: DaemonState, + target_state: TargetState, + shutdown: bool, + rx: mpsc::Receiver<DaemonEvent>, + tx: mpsc::Sender<DaemonEvent>, + management_interface_broadcaster: management_interface::EventBroadcaster, + + // Just for testing. A cyclic iterator iterating over the hardcoded remotes, + // picking a new one for each retry. + remote_iter: std::iter::Cycle<std::iter::Cloned<std::slice::Iter<'static, Endpoint>>>, + // The current account token for now. Should be moved into the settings later. + account_token: Option<String>, +} + +impl Daemon { + pub fn new() -> Result<Self> { + let (tx, rx) = mpsc::channel(); + let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; + let state = TunnelState::NotRunning; + let target_state = TargetState::Unsecured; + Ok( + Daemon { + state, + tunnel_close_handle: None, + target_state, + last_broadcasted_state: DaemonState { + state: state.as_security_state(), + target_state, + }, + shutdown: false, + rx, + tx, + management_interface_broadcaster, + remote_iter: REMOTES.iter().cloned().cycle(), + account_token: None, + }, + ) + } + + // Starts the management interface and spawns a thread that will process it. + // Returns a handle that allows notifying all subscribers on events. + fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) + -> Result<management_interface::EventBroadcaster> { + let multiplex_event_tx = IntoSender::from(event_tx.clone()); + let server = Self::start_management_interface_server(multiplex_event_tx)?; + let event_broadcaster = server.event_broadcaster(); + Self::spawn_management_interface_wait_thread(server, event_tx); + Ok(event_broadcaster) + } + + fn start_management_interface_server(event_tx: IntoSender<TunnelCommand, DaemonEvent>) + -> Result<ManagementInterfaceServer> { + let server = + ManagementInterfaceServer::start(event_tx) + .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; + info!( + "Mullvad management interface listening on {}", + server.address() + ); + rpc_info::write(server.address()).chain_err(|| ErrorKind::ManagementInterfaceError( + "Failed to write RPC address to file"))?; + Ok(server) + } + + fn spawn_management_interface_wait_thread(server: ManagementInterfaceServer, + exit_tx: mpsc::Sender<DaemonEvent>) { + thread::spawn( + move || { + let result = server.wait(); + debug!("Mullvad management interface shut down"); + let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited(result)); + }, + ); + } + + /// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a + /// shutdown event is received. + pub fn run(mut self) -> Result<()> { + while let Ok(event) = self.rx.recv() { + self.handle_event(event)?; + if self.shutdown && self.state == TunnelState::NotRunning { + break; + } + } + Ok(()) + } + + fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { + use DaemonEvent::*; + match event { + TunnelEvent(event) => self.handle_tunnel_event(event), + TunnelExited(result) => self.handle_tunnel_exited(result), + TunnelKillResult(result) => self.handle_tunnel_kill_result(result), + ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), + ManagementInterfaceExited(result) => self.handle_management_interface_exited(result), + TriggerShutdown => self.handle_trigger_shutdown_event(), + } + } + + fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) -> Result<()> { + info!("Tunnel event: {:?}", tunnel_event); + if self.state == TunnelState::Connecting && tunnel_event == TunnelEvent::Up { + self.set_state(TunnelState::Connected) + } else if self.state == TunnelState::Connected && tunnel_event == TunnelEvent::Down { + self.kill_tunnel() + } else { + Ok(()) + } + } + + fn handle_tunnel_exited(&mut self, result: tunnel::Result<()>) -> Result<()> { + if let Err(e) = result.chain_err(|| "Tunnel exited in an unexpected way") { + error!("{}", e.display()); + } + self.tunnel_close_handle = None; + self.set_state(TunnelState::NotRunning) + } + + fn handle_tunnel_kill_result(&mut self, result: io::Result<()>) -> Result<()> { + result.chain_err(|| "Error while trying to close tunnel") + } + + fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> { + use TunnelCommand::*; + match event { + SetTargetState(state) => { + if !self.shutdown { + self.set_target_state(state)?; + } else { + warn!("Ignoring target state change request due to shutdown"); + } + } + GetState(tx) => { + if let Err(_) = tx.send(self.last_broadcasted_state) { + warn!("Unable to send current state to management interface client",); + } + } + SetAccount(account_token) => self.account_token = account_token, + GetAccount(tx) => { + if let Err(_) = tx.send(self.account_token.clone()) { + warn!("Unable to send current account to management interface client"); + } + } + } + Ok(()) + } + + fn handle_management_interface_exited(&self, result: talpid_ipc::Result<()>) -> Result<()> { + let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly"); + match result { + Ok(()) => Err(error.into()), + Err(e) => Err(e).chain_err(|| error), + } + } + + fn handle_trigger_shutdown_event(&mut self) -> Result<()> { + self.shutdown = true; + self.set_target_state(TargetState::Unsecured) + } + + /// Update the state of the client. If it changed, notify the subscribers and trigger + /// appropriate actions. + fn set_state(&mut self, new_state: TunnelState) -> Result<()> { + if new_state != self.state { + debug!("State {:?} => {:?}", self.state, new_state); + self.state = new_state; + self.broadcast_state(); + self.verify_state_consistency()?; + self.apply_target_state() + } else { + // Calling set_state with the same state we already have is an error. Should try to + // mitigate this possibility completely with a better state machine later. + Err(ErrorKind::InvalidState.into()) + } + } + + fn broadcast_state(&mut self) { + let new_daemon_state = DaemonState { + state: self.state.as_security_state(), + target_state: self.target_state, + }; + if self.last_broadcasted_state != new_daemon_state { + self.last_broadcasted_state = new_daemon_state; + self.management_interface_broadcaster.notify_new_state(new_daemon_state); + } + } + + // Check that the current state is valid and consistent. + fn verify_state_consistency(&self) -> Result<()> { + use TunnelState::*; + ensure!( + match self.state { + NotRunning => self.tunnel_close_handle.is_none(), + Connecting => self.tunnel_close_handle.is_some(), + Connected => self.tunnel_close_handle.is_some(), + Exiting => self.tunnel_close_handle.is_none(), + }, + ErrorKind::InvalidState + ); + Ok(()) + } + + /// Set the target state of the client. If it changed trigger the operations needed to progress + /// towards that state. + fn set_target_state(&mut self, new_state: TargetState) -> Result<()> { + if new_state != self.target_state { + debug!("Target state {:?} => {:?}", self.target_state, new_state); + self.target_state = new_state; + self.broadcast_state(); + self.apply_target_state() + } else { + Ok(()) + } + } + + fn apply_target_state(&mut self) -> Result<()> { + match (self.target_state, self.state) { + (TargetState::Secured, TunnelState::NotRunning) => { + debug!("Triggering tunnel start"); + if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") { + error!("{}", e.display()); + self.management_interface_broadcaster.notify_error(&e); + self.set_target_state(TargetState::Unsecured)?; + } + Ok(()) + } + (TargetState::Unsecured, TunnelState::Connecting) | + (TargetState::Unsecured, TunnelState::Connected) => self.kill_tunnel(), + (..) => Ok(()), + } + } + + fn start_tunnel(&mut self) -> Result<()> { + ensure!( + self.state == TunnelState::NotRunning, + ErrorKind::InvalidState + ); + let remote = self.remote_iter.next().unwrap(); + let account_token = self.account_token + .as_ref() + .ok_or(ErrorKind::InvalidSettings("No account token"))? + .clone(); + let tunnel_monitor = self.spawn_tunnel_monitor(remote, &account_token)?; + self.tunnel_close_handle = Some(tunnel_monitor.close_handle()); + self.spawn_tunnel_monitor_wait_thread(tunnel_monitor); + self.set_state(TunnelState::Connecting) + } + + fn spawn_tunnel_monitor(&self, remote: Endpoint, account_token: &str) -> Result<TunnelMonitor> { + // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync + let event_tx = Arc::new(Mutex::new(self.tx.clone())); + let on_tunnel_event = move |event| { + let _ = event_tx.lock().unwrap().send(DaemonEvent::TunnelEvent(event)); + }; + TunnelMonitor::new(remote, account_token, on_tunnel_event) + .chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor")) + } + + fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) { + let error_tx = self.tx.clone(); + thread::spawn( + move || { + let result = tunnel_monitor.wait(); + let _ = error_tx.send(DaemonEvent::TunnelExited(result)); + trace!("Tunnel monitor thread exit"); + }, + ); + } + + fn kill_tunnel(&mut self) -> Result<()> { + ensure!( + self.state == TunnelState::Connecting || self.state == TunnelState::Connected, + ErrorKind::InvalidState + ); + let close_handle = self.tunnel_close_handle.take().unwrap(); + self.set_state(TunnelState::Exiting)?; + let result_tx = self.tx.clone(); + thread::spawn( + move || { + let result = close_handle.close(); + let _ = result_tx.send(DaemonEvent::TunnelKillResult(result)); + trace!("Tunnel kill thread exit"); + }, + ); + Ok(()) + } + + pub fn shutdown_handle(&self) -> DaemonShutdownHandle { + DaemonShutdownHandle { tx: self.tx.clone() } + } +} + +struct DaemonShutdownHandle { + tx: mpsc::Sender<DaemonEvent>, +} + +impl DaemonShutdownHandle { + pub fn shutdown(&self) { + let _ = self.tx.send(DaemonEvent::TriggerShutdown); + } +} + +impl Drop for Daemon { + fn drop(self: &mut Daemon) { + if let Err(e) = rpc_info::remove().chain_err(|| "Unable to clean up rpc address file") { + error!("{}", e.display()); + } + } +} + + +quick_main!(run); + +fn run() -> Result<()> { + let config = cli::get_config(); + init_logger(config.log_level, config.log_file.as_ref())?; + + let daemon = Daemon::new().chain_err(|| "Unable to initialize daemon")?; + + let shutdown_handle = daemon.shutdown_handle(); + shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown()) + .chain_err(|| "Unable to attach shutdown signal handler")?; + + daemon.run()?; + + debug!("Mullvad daemon is quitting"); + Ok(()) +} + +fn init_logger(log_level: log::LogLevelFilter, log_file: Option<&PathBuf>) -> Result<()> { + let silenced_crates = [ + "jsonrpc_core", + "tokio_core", + "jsonrpc_ws_server", + "ws", + "mio", + ]; + let mut config = fern::Dispatch::new() + .format( + |out, message, record| { + out.finish( + format_args!( + "{}[{}][{}] {}", + chrono::Local::now().format("[%Y-%m-%d %H:%M:%S]"), + record.target(), + record.level(), + message + ), + ) + }, + ) + .level(log_level) + .chain(std::io::stdout()); + for silenced_crate in &silenced_crates { + config = config.level_for(*silenced_crate, log::LogLevelFilter::Warn); + } + if let Some(ref log_file) = log_file { + let f = fern::log_file(log_file).chain_err(|| "Failed to open log file for writing")?; + config = config.chain(f); + } + config.apply().chain_err(|| "Failed to bootstrap logging system") +} diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs new file mode 100644 index 0000000000..ffba3dca0f --- /dev/null +++ b/mullvad-daemon/src/management_interface.rs @@ -0,0 +1,379 @@ +use error_chain; + +use jsonrpc_core::{Error, ErrorCode, Metadata}; +use jsonrpc_core::futures::{BoxFuture, Future, future, sync}; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; +use jsonrpc_ws_server; +use mullvad_types::states::{DaemonState, TargetState}; + +use serde; + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, Mutex, RwLock}; + +use talpid_core::mpsc::IntoSender; +use talpid_ipc; +use uuid; + + +pub type AccountToken = String; +pub type CountryCode = String; + +#[derive(Serialize)] +pub struct AccountData { + pub paid_until: String, +} + +#[derive(Serialize)] +pub struct Location { + pub latlong: [f64; 2], + pub country: String, + pub city: String, +} + + +build_rpc_trait! { + pub trait ManagementInterfaceApi { + type Metadata; + + /// Fetches and returns metadata about an account. Returns an error on non-existing + /// accounts. + #[rpc(name = "get_account_data")] + fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>; + + /// Returns available countries. + #[rpc(name = "get_countries")] + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>; + + /// Set which account to connect with. + #[rpc(name = "set_account")] + fn set_account(&self, Option<AccountToken>) -> Result<(), Error>; + + /// Get which account is configured. + #[rpc(async, name = "get_account")] + fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error>; + + /// Set which country to connect to + #[rpc(name = "set_country")] + fn set_country(&self, CountryCode) -> Result<(), Error>; + + /// Set if the client should automatically establish a tunnel on start or not. + #[rpc(name = "set_autoconnect")] + fn set_autoconnect(&self, bool) -> Result<(), Error>; + + /// Try to connect if disconnected, or do nothing if already connecting/connected. + #[rpc(name = "connect")] + fn connect(&self) -> Result<(), Error>; + + /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already + /// disconnected. + #[rpc(name = "disconnect")] + fn disconnect(&self) -> Result<(), Error>; + + /// Returns the current state of the Mullvad client. Changes to this state will + /// be announced to subscribers of `new_state`. + #[rpc(async, name = "get_state")] + fn get_state(&self) -> BoxFuture<DaemonState, Error>; + + /// Returns the current public IP of this computer. + #[rpc(name = "get_ip")] + fn get_ip(&self) -> Result<IpAddr, Error>; + + /// Performs a geoIP lookup and returns the current location as perceived by the public + /// internet. + #[rpc(name = "get_location")] + fn get_location(&self) -> Result<Location, Error>; + + #[pubsub(name = "new_state")] { + /// Subscribes to the `new_state` event notifications. + #[rpc(name = "new_state_subscribe")] + fn new_state_subscribe(&self, Self::Metadata, pubsub::Subscriber<DaemonState>); + + /// Unsubscribes from the `new_state` event notifications. + #[rpc(name = "new_state_unsubscribe")] + fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + } + + #[pubsub(name = "error")] { + /// Subscribes to the `error` event notifications. + #[rpc(name = "error_subscribe")] + fn error_subscribe(&self, Self::Metadata, pubsub::Subscriber<Vec<String>>); + + /// Unsubscribes from the `error` event notifications. + #[rpc(name = "error_unsubscribe")] + fn error_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + } + } +} + + +/// Enum representing commands coming in on the management interface. +#[derive(Debug)] +pub enum TunnelCommand { + /// Change target state. + SetTargetState(TargetState), + /// Request the current state. + GetState(sync::oneshot::Sender<DaemonState>), + /// Set which account token to use for subsequent connection attempts. + SetAccount(Option<AccountToken>), + /// Request the current account token being used. + GetAccount(sync::oneshot::Sender<Option<AccountToken>>), +} + +#[derive(Default)] +struct ActiveSubscriptions { + new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonState>>>, + error_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Vec<String>>>>, +} + +pub struct ManagementInterfaceServer { + server: talpid_ipc::IpcServer, + subscriptions: Arc<ActiveSubscriptions>, +} + +impl ManagementInterfaceServer { + pub fn start<T>(tunnel_tx: IntoSender<TunnelCommand, T>) -> talpid_ipc::Result<Self> + where T: From<TunnelCommand> + 'static + Send + { + let rpc = ManagementInterface::new(tunnel_tx); + let subscriptions = rpc.subscriptions.clone(); + + let mut io = PubSubHandler::default(); + io.extend_with(rpc.to_delegate()); + let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?; + Ok( + ManagementInterfaceServer { + server, + subscriptions, + }, + ) + } + + pub fn address(&self) -> &str { + self.server.address() + } + + pub fn event_broadcaster(&self) -> EventBroadcaster { + EventBroadcaster { subscriptions: self.subscriptions.clone() } + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. + pub fn wait(self) -> talpid_ipc::Result<()> { + self.server.wait() + } +} + + +/// A handle that allows broadcasting messages to all subscribers of the management interface. +pub struct EventBroadcaster { + subscriptions: Arc<ActiveSubscriptions>, +} + +impl EventBroadcaster { + /// Sends a new state update to all `new_state` subscribers of the management interface. + pub fn notify_new_state(&self, new_state: DaemonState) { + self.notify(&self.subscriptions.new_state_subscriptions, new_state); + } + + /// Sends an error to all `error` subscribers of the management interface. + pub fn notify_error<E>(&self, error: &E) + where E: error_chain::ChainedError + { + let error_strings = error.iter().map(|e| e.to_string()).collect(); + self.notify(&self.subscriptions.error_subscriptions, error_strings); + } + + fn notify<T>(&self, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, + value: T) + where T: serde::Serialize + Clone + { + let subscriptions = subscriptions_lock.read().unwrap(); + for sink in subscriptions.values() { + let _ = sink.notify(Ok(value.clone())).wait(); + } + } +} + +struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { + subscriptions: Arc<ActiveSubscriptions>, + tx: Mutex<IntoSender<TunnelCommand, T>>, +} + +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { + pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self { + ManagementInterface { + subscriptions: Default::default(), + tx: Mutex::new(tx), + } + } + + fn subscribe<V>(subscriber: pubsub::Subscriber<V>, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) { + let mut subscriptions = subscriptions_lock.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } + } + + fn unsubscribe<V>(id: SubscriptionId, + subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) + -> BoxFuture<(), Error> { + let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err( + Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }, + ) + }; + result.boxed() + } +} + +impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { + type Metadata = Meta; + + fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> { + trace!("get_account_data"); + Ok(AccountData { paid_until: "2018-12-31T16:00:00.000Z".to_owned() },) + } + + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { + trace!("get_countries"); + Ok(HashMap::new()) + } + + fn set_account(&self, account_token: Option<AccountToken>) -> Result<(), Error> { + trace!("set_account"); + self.tx + .lock() + .unwrap() + .send(TunnelCommand::SetAccount(account_token)) + .map_err(|_| Error::internal_error()) + } + + fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error> { + trace!("get_account"); + let (tx, rx) = sync::oneshot::channel(); + match self.tx.lock().unwrap().send(TunnelCommand::GetAccount(tx)) { + Ok(()) => rx.map_err(|_| Error::internal_error()).boxed(), + Err(_) => future::err(Error::internal_error()).boxed(), + } + } + + fn set_country(&self, _country_code: CountryCode) -> Result<(), Error> { + trace!("set_country"); + Ok(()) + } + + fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { + trace!("set_autoconnect"); + Ok(()) + } + + fn connect(&self) -> Result<(), Error> { + trace!("connect"); + self.tx + .lock() + .unwrap() + .send(TunnelCommand::SetTargetState(TargetState::Secured)) + .map_err(|_| Error::internal_error()) + } + + fn disconnect(&self) -> Result<(), Error> { + trace!("disconnect"); + self.tx + .lock() + .unwrap() + .send(TunnelCommand::SetTargetState(TargetState::Unsecured)) + .map_err(|_| Error::internal_error()) + } + + fn get_state(&self) -> BoxFuture<DaemonState, Error> { + trace!("get_state"); + let (state_tx, state_rx) = sync::oneshot::channel(); + match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) { + Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(), + Err(_) => future::err(Error::internal_error()).boxed(), + } + } + + fn get_ip(&self) -> Result<IpAddr, Error> { + trace!("get_ip"); + Ok(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))) + } + + fn get_location(&self) -> Result<Location, Error> { + trace!("get_location"); + Ok( + Location { + latlong: [1.0, 2.0], + country: "narnia".to_owned(), + city: "Le city".to_owned(), + }, + ) + } + + fn new_state_subscribe(&self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber<DaemonState>) { + trace!("new_state_subscribe"); + Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); + } + + fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + trace!("new_state_unsubscribe"); + Self::unsubscribe(id, &self.subscriptions.new_state_subscriptions) + } + + fn error_subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<Vec<String>>) { + trace!("error_subscribe"); + Self::subscribe(subscriber, &self.subscriptions.error_subscriptions); + } + + fn error_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + trace!("error_unsubscribe"); + Self::unsubscribe(id, &self.subscriptions.error_subscriptions) + } +} + + +/// The metadata type. There is one instance associated with each connection. In this pubsub +/// scenario they are created by `meta_extractor` by the server on each new incoming +/// connection. +#[derive(Clone, Debug, Default)] +pub struct Meta { + session: Option<Arc<Session>>, +} + +/// Make the `Meta` type possible to use as jsonrpc metadata type. +impl Metadata for Meta {} + +/// Make the `Meta` type possible to use as a pubsub metadata type. +impl PubSubMetadata for Meta { + fn session(&self) -> Option<Arc<Session>> { + self.session.clone() + } +} + +/// Metadata extractor function for `Meta`. +fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { + Meta { session: Some(Arc::new(Session::new(context.sender()))) } +} diff --git a/mullvad-daemon/src/mock_ipc.rs b/mullvad-daemon/src/mock_ipc.rs new file mode 100644 index 0000000000..90859401c2 --- /dev/null +++ b/mullvad-daemon/src/mock_ipc.rs @@ -0,0 +1,217 @@ +use ipc_api::*; + +use jsonrpc_core::{self, Error, ErrorCode, Metadata}; +use jsonrpc_core::futures::{BoxFuture, Future, future}; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; +use jsonrpc_ws_server; + +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, RwLock, atomic}; + +use talpid_ipc; + +type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>; + +pub struct IpcServer { + server: talpid_ipc::IpcServer, +} + +impl IpcServer { + pub fn start() -> talpid_ipc::Result<Self> { + let active_subscriptions = ActiveSubscriptions::default(); + let rpc = MockIpcApi::new(active_subscriptions.clone()); + let mut io = PubSubHandler::default(); + io.extend_with(rpc.to_delegate()); + let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?; + + Self::spawn_broadcast_thread(active_subscriptions); + Ok(IpcServer { server }) + } + + pub fn address(&self) -> &str { + self.server.address() + } + + pub fn wait(self) -> talpid_ipc::Result<()> { + self.server.wait() + } + + // TODO(linus): This thread will never die. But this is just mock anyway so not important. + fn spawn_broadcast_thread(active_subscriptions: ActiveSubscriptions) { + ::std::thread::spawn( + move || loop { + { + let subscribers = active_subscriptions.read().unwrap(); + for sink in subscribers.values() { + let _ = sink.notify(Ok("Hello World!".into())).wait(); + } + } + ::std::thread::sleep(::std::time::Duration::from_secs(1)); + }, + ); + } +} + + + +/// The metadata type. There is one instance associated with each connection. In this pubsub +/// scenario they are created by `From<Sender<String>>::from` by the server on each new incoming +/// connection. +#[derive(Clone, Debug, Default)] +pub struct Meta { + session: Option<Arc<Session>>, +} + +/// Make the `Meta` type possible to use as jsonrpc metadata type. +impl Metadata for Meta {} + +/// Make the `Meta` type possible to use as a pubsub metadata type. +impl PubSubMetadata for Meta { + fn session(&self) -> Option<Arc<Session>> { + self.session.clone() + } +} + +/// Metadata extractor function for `Meta`. +fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { + Meta { session: Some(Arc::new(Session::new(context.sender()))) } +} + +/// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for +/// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to +/// a handle) to the jsonrpc client talking with `talpid_core`. +pub struct MockIpcApi { + next_subscription_id: atomic::AtomicUsize, + active: ActiveSubscriptions, + country: RwLock<CountryCode>, + tunnel_is_up: atomic::AtomicBool, +} + +impl MockIpcApi { + pub fn new(active: ActiveSubscriptions) -> Self { + MockIpcApi { + next_subscription_id: atomic::AtomicUsize::new(0), + active: active, + country: RwLock::new("se".to_owned()), + tunnel_is_up: atomic::AtomicBool::new(false), + } + } +} + +impl IpcApi for MockIpcApi { + type Metadata = Meta; + + fn get_account_data(&self, account_token: AccountToken) -> Result<AccountData, Error> { + debug!("Login for {}", account_token); + + let paid_until = if account_token.starts_with("1111") { + // accounts starting with 1111 expire in one month + Ok("2018-12-31T16:00:00.000Z".to_owned()) + } else if account_token.starts_with("2222") { + Ok("2012-12-31T16:00:00.000Z".to_owned()) + } else if account_token.starts_with("3333") { + Ok("2037-12-31T16:00:00.000Z".to_owned()) + } else { + Err(jsonrpc_core::Error::invalid_params("You are not welcome")) + }?; + Ok(AccountData { paid_until: paid_until }) + } + + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { + let mut countries = HashMap::new(); + countries.insert("se".to_owned(), "Sweden".to_owned()); + countries.insert("de".to_owned(), "Denmark".to_owned()); + countries.insert("na".to_owned(), "Narnia".to_owned()); + Ok(countries) + } + + fn set_account(&self, _account_token: AccountToken) -> Result<(), Error> { + Ok(()) + } + + fn set_country(&self, country_code: CountryCode) -> Result<(), Error> { + *self.country.write().unwrap() = country_code; + Ok(()) + } + + fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { + Ok(()) + } + + fn connect(&self) -> Result<(), Error> { + if self.country.read().unwrap().starts_with("se") { + Err(jsonrpc_core::Error::invalid_params("Invalid server")) + } else { + self.tunnel_is_up.store(true, atomic::Ordering::SeqCst); + Ok(()) + } + } + + fn disconnect(&self) -> Result<(), Error> { + self.tunnel_is_up.store(false, atomic::Ordering::SeqCst); + Ok(()) + } + + fn get_state(&self) -> Result<SecurityState, Error> { + if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + Ok(SecurityState::Secured) + } else { + Ok(SecurityState::Unsecured) + } + } + + fn get_ip(&self) -> Result<IpAddr, Error> { + let ip = if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)) + } else { + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)) + } + .to_owned(); + Ok(ip) + } + + fn get_location(&self) -> Result<Location, Error> { + Ok( + if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + Location { + latlong: [1.0, 2.0], + country: "narnia".to_owned(), + city: "Le city".to_owned(), + } + } else { + Location { + latlong: [60.0, 61.0], + country: "sweden".to_owned(), + city: "bollebygd".to_owned(), + } + }, + ) + } + + fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>) { + let id = self.next_subscription_id.fetch_add(1, atomic::Ordering::SeqCst); + let sub_id = SubscriptionId::Number(id as u64); + if let Ok(sink) = subscriber.assign_id(sub_id.clone()) { + debug!("Accepting new subscription with id {}", id); + self.active.write().unwrap().insert(sub_id, sink); + } + } + + fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> { + debug!("Unsubscribing id {:?}", id); + if self.active.write().unwrap().remove(&id).is_some() { + future::ok(true).boxed() + } else { + future::err( + Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription.".into(), + data: None, + }, + ) + .boxed() + } + } +} diff --git a/mullvad-daemon/src/rpc_info.rs b/mullvad-daemon/src/rpc_info.rs new file mode 100644 index 0000000000..25078ff899 --- /dev/null +++ b/mullvad-daemon/src/rpc_info.rs @@ -0,0 +1,47 @@ +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; + +error_chain! { + errors { + WriteFailed(path: PathBuf) { + description("Failed to write RCP address to file") + display("Failed to write RPC address to {}", path.to_string_lossy()) + } + RemoveFailed(path: PathBuf) { + description("Failed to remove file") + display("Failed to remove {}", path.to_string_lossy()) + } + } +} + +lazy_static! { + /// The path to the file where we write the RPC address + static ref RPC_ADDRESS_FILE_PATH: PathBuf = ::std::env::temp_dir().join(".mullvad_rpc_address"); +} + +/// Writes down the RPC address to some API to a file. +pub fn write(rpc_address: &str) -> Result<()> { + open_file(RPC_ADDRESS_FILE_PATH.as_path()) + .and_then(|mut file| file.write_all(rpc_address.as_bytes())) + .chain_err(|| ErrorKind::WriteFailed(RPC_ADDRESS_FILE_PATH.to_owned()))?; + + debug!( + "Wrote RPC address to {}", + RPC_ADDRESS_FILE_PATH.to_string_lossy() + ); + Ok(()) +} + +pub fn remove() -> Result<()> { + fs::remove_file(RPC_ADDRESS_FILE_PATH.as_path()) + .chain_err(|| ErrorKind::RemoveFailed(RPC_ADDRESS_FILE_PATH.to_owned())) +} + +fn open_file(path: &Path) -> io::Result<File> { + OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(path) +} diff --git a/mullvad-daemon/src/shutdown.rs b/mullvad-daemon/src/shutdown.rs new file mode 100644 index 0000000000..4da73c4d05 --- /dev/null +++ b/mullvad-daemon/src/shutdown.rs @@ -0,0 +1,42 @@ +error_chain!{} + +#[cfg(unix)] +mod platform { + extern crate simple_signal; + + use self::simple_signal::Signal; + use super::Result; + + pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()> + where F: Fn() + 'static + Send + { + simple_signal::set_handler( + &[Signal::Term, Signal::Int], move |s| { + debug!("Process received signal: {:?}", s); + f(); + } + ); + Ok(()) + } +} + +#[cfg(windows)] +mod platform { + extern crate ctrlc; + + use super::{Result, ResultExt}; + + pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()> + where F: Fn() + 'static + Send + { + ctrlc::set_handler( + move || { + debug!("Process received Ctrl-c"); + f(); + }, + ) + .chain_err(|| "Unable to attach ctrl-c handler") + } +} + +pub use self::platform::*; |
