diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-07-17 10:36:52 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-07-17 10:48:09 +0200 |
| commit | f07f1a0262c908ff83490850b67a167f963efc2d (patch) | |
| tree | 5a5209baefc0b10ba528f254e7ce2456c23f6ac9 /mullvad_daemon/src | |
| parent | 6a4202fc8c55752f0fef86c04b628a3f9ada5279 (diff) | |
| download | mullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.tar.xz mullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.zip | |
Rename all crates from snake_case to kebab-case
Diffstat (limited to 'mullvad_daemon/src')
| -rw-r--r-- | mullvad_daemon/src/cli.rs | 45 | ||||
| -rw-r--r-- | mullvad_daemon/src/main.rs | 498 | ||||
| -rw-r--r-- | mullvad_daemon/src/management_interface.rs | 379 | ||||
| -rw-r--r-- | mullvad_daemon/src/mock_ipc.rs | 217 | ||||
| -rw-r--r-- | mullvad_daemon/src/rpc_info.rs | 47 | ||||
| -rw-r--r-- | mullvad_daemon/src/shutdown.rs | 42 |
6 files changed, 0 insertions, 1228 deletions
diff --git a/mullvad_daemon/src/cli.rs b/mullvad_daemon/src/cli.rs deleted file mode 100644 index 670a2bf28b..0000000000 --- a/mullvad_daemon/src/cli.rs +++ /dev/null @@ -1,45 +0,0 @@ -use clap::{App, Arg}; -use log; - -use std::path::PathBuf; - -pub struct Config { - pub log_level: log::LogLevelFilter, - pub log_file: Option<PathBuf>, -} - -pub fn get_config() -> Config { - let app = create_app(); - let matches = app.get_matches(); - - let log_level = match matches.occurrences_of("v") { - 0 => log::LogLevelFilter::Info, - 1 => log::LogLevelFilter::Debug, - _ => log::LogLevelFilter::Trace, - }; - let log_file = matches.value_of_os("log_file").map(PathBuf::from); - - Config { - log_level, - log_file, - } -} - -fn create_app() -> App<'static, 'static> { - App::new(::CRATE_NAME) - .version(crate_version!()) - .author(crate_authors!()) - .about(crate_description!()) - .arg( - Arg::with_name("v") - .short("v") - .multiple(true) - .help("Sets the level of verbosity."), - ) - .arg( - Arg::with_name("log_file") - .long("log") - .takes_value(true) - .help("Activates file logging to the given path"), - ) -} diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs deleted file mode 100644 index b26ca315a8..0000000000 --- a/mullvad_daemon/src/main.rs +++ /dev/null @@ -1,498 +0,0 @@ -#[macro_use] -extern crate clap; -extern crate chrono; -#[macro_use] -extern crate log; -#[macro_use] -extern crate error_chain; -extern crate fern; - -extern crate serde; -#[macro_use] -extern crate serde_derive; - -extern crate jsonrpc_core; -extern crate jsonrpc_pubsub; -#[macro_use] -extern crate jsonrpc_macros; -extern crate jsonrpc_ws_server; -extern crate uuid; -#[macro_use] -extern crate lazy_static; - -extern crate mullvad_types; -extern crate talpid_core; -extern crate talpid_ipc; - -mod cli; -mod management_interface; -mod rpc_info; -mod shutdown; - -use error_chain::ChainedError; -use management_interface::{ManagementInterfaceServer, TunnelCommand}; -use mullvad_types::states::{DaemonState, SecurityState, TargetState}; -use std::io; - -use std::path::PathBuf; -use std::sync::{Arc, Mutex, mpsc}; -use std::thread; - -use talpid_core::mpsc::IntoSender; -use talpid_core::net::{Endpoint, TransportProtocol}; -use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; - -error_chain!{ - errors { - /// The client is in the wrong state for the requested operation. Optimally the code should - /// be written in such a way so such states can't exist. - InvalidState { - description("Client is in an invalid state for the requested operation") - } - TunnelError(msg: &'static str) { - description("Error in the tunnel monitor") - display("Tunnel monitor error: {}", msg) - } - ManagementInterfaceError(msg: &'static str) { - description("Error in the management interface") - display("Management interface error: {}", msg) - } - InvalidSettings(msg: &'static str) { - description("Invalid settings") - display("Invalid Settings: {}", msg) - } - } -} - -lazy_static! { - // Temporary store of hardcoded remotes. - static ref REMOTES: [Endpoint; 3] = [ - Endpoint::new("se5.mullvad.net", 1300, TransportProtocol::Udp), - Endpoint::new("se6.mullvad.net", 1300, TransportProtocol::Udp), - Endpoint::new("se7.mullvad.net", 1300, TransportProtocol::Udp), - ]; -} - -static CRATE_NAME: &str = "mullvadd"; - - -/// All events that can happen in the daemon. Sent from various threads and exposed interfaces. -pub enum DaemonEvent { - /// An event coming from the tunnel software to indicate a change in state. - TunnelEvent(TunnelEvent), - /// Triggered by the thread waiting for the tunnel process. Means the tunnel process exited. - TunnelExited(tunnel::Result<()>), - /// Triggered by the thread waiting for a tunnel close operation to complete. Contains the - /// result of trying to kill the tunnel. - TunnelKillResult(io::Result<()>), - /// An event coming from the JSONRPC-2.0 management interface. - ManagementInterfaceEvent(TunnelCommand), - /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly. - ManagementInterfaceExited(talpid_ipc::Result<()>), - /// Daemon shutdown triggered by a signal, ctrl-c or similar. - TriggerShutdown, -} - -impl From<TunnelEvent> for DaemonEvent { - fn from(tunnel_event: TunnelEvent) -> Self { - DaemonEvent::TunnelEvent(tunnel_event) - } -} - -impl From<TunnelCommand> for DaemonEvent { - fn from(tunnel_command: TunnelCommand) -> Self { - DaemonEvent::ManagementInterfaceEvent(tunnel_command) - } -} - -/// Represents the internal state of the actual tunnel. -// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum TunnelState { - /// No tunnel is running. - NotRunning, - /// The tunnel has been started, but it is not established/functional. - Connecting, - /// The tunnel is up and working. - Connected, - /// This state is active from when we manually trigger a tunnel kill until the tunnel wait - /// operation (TunnelExit) returned. - Exiting, -} - -impl TunnelState { - pub fn as_security_state(&self) -> SecurityState { - match *self { - TunnelState::Connected => SecurityState::Secured, - _ => SecurityState::Unsecured, - } - } -} - - -struct Daemon { - state: TunnelState, - // The tunnel_close_handle must only exist in the Connecting and Connected states! - tunnel_close_handle: Option<tunnel::CloseHandle>, - last_broadcasted_state: DaemonState, - target_state: TargetState, - shutdown: bool, - rx: mpsc::Receiver<DaemonEvent>, - tx: mpsc::Sender<DaemonEvent>, - management_interface_broadcaster: management_interface::EventBroadcaster, - - // Just for testing. A cyclic iterator iterating over the hardcoded remotes, - // picking a new one for each retry. - remote_iter: std::iter::Cycle<std::iter::Cloned<std::slice::Iter<'static, Endpoint>>>, - // The current account token for now. Should be moved into the settings later. - account_token: Option<String>, -} - -impl Daemon { - pub fn new() -> Result<Self> { - let (tx, rx) = mpsc::channel(); - let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; - let state = TunnelState::NotRunning; - let target_state = TargetState::Unsecured; - Ok( - Daemon { - state, - tunnel_close_handle: None, - target_state, - last_broadcasted_state: DaemonState { - state: state.as_security_state(), - target_state, - }, - shutdown: false, - rx, - tx, - management_interface_broadcaster, - remote_iter: REMOTES.iter().cloned().cycle(), - account_token: None, - }, - ) - } - - // Starts the management interface and spawns a thread that will process it. - // Returns a handle that allows notifying all subscribers on events. - fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) - -> Result<management_interface::EventBroadcaster> { - let multiplex_event_tx = IntoSender::from(event_tx.clone()); - let server = Self::start_management_interface_server(multiplex_event_tx)?; - let event_broadcaster = server.event_broadcaster(); - Self::spawn_management_interface_wait_thread(server, event_tx); - Ok(event_broadcaster) - } - - fn start_management_interface_server(event_tx: IntoSender<TunnelCommand, DaemonEvent>) - -> Result<ManagementInterfaceServer> { - let server = - ManagementInterfaceServer::start(event_tx) - .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; - info!( - "Mullvad management interface listening on {}", - server.address() - ); - rpc_info::write(server.address()).chain_err(|| ErrorKind::ManagementInterfaceError( - "Failed to write RPC address to file"))?; - Ok(server) - } - - fn spawn_management_interface_wait_thread(server: ManagementInterfaceServer, - exit_tx: mpsc::Sender<DaemonEvent>) { - thread::spawn( - move || { - let result = server.wait(); - debug!("Mullvad management interface shut down"); - let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited(result)); - }, - ); - } - - /// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a - /// shutdown event is received. - pub fn run(mut self) -> Result<()> { - while let Ok(event) = self.rx.recv() { - self.handle_event(event)?; - if self.shutdown && self.state == TunnelState::NotRunning { - break; - } - } - Ok(()) - } - - fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { - use DaemonEvent::*; - match event { - TunnelEvent(event) => self.handle_tunnel_event(event), - TunnelExited(result) => self.handle_tunnel_exited(result), - TunnelKillResult(result) => self.handle_tunnel_kill_result(result), - ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), - ManagementInterfaceExited(result) => self.handle_management_interface_exited(result), - TriggerShutdown => self.handle_trigger_shutdown_event(), - } - } - - fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) -> Result<()> { - info!("Tunnel event: {:?}", tunnel_event); - if self.state == TunnelState::Connecting && tunnel_event == TunnelEvent::Up { - self.set_state(TunnelState::Connected) - } else if self.state == TunnelState::Connected && tunnel_event == TunnelEvent::Down { - self.kill_tunnel() - } else { - Ok(()) - } - } - - fn handle_tunnel_exited(&mut self, result: tunnel::Result<()>) -> Result<()> { - if let Err(e) = result.chain_err(|| "Tunnel exited in an unexpected way") { - error!("{}", e.display()); - } - self.tunnel_close_handle = None; - self.set_state(TunnelState::NotRunning) - } - - fn handle_tunnel_kill_result(&mut self, result: io::Result<()>) -> Result<()> { - result.chain_err(|| "Error while trying to close tunnel") - } - - fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> { - use TunnelCommand::*; - match event { - SetTargetState(state) => { - if !self.shutdown { - self.set_target_state(state)?; - } else { - warn!("Ignoring target state change request due to shutdown"); - } - } - GetState(tx) => { - if let Err(_) = tx.send(self.last_broadcasted_state) { - warn!("Unable to send current state to management interface client",); - } - } - SetAccount(account_token) => self.account_token = account_token, - GetAccount(tx) => { - if let Err(_) = tx.send(self.account_token.clone()) { - warn!("Unable to send current account to management interface client"); - } - } - } - Ok(()) - } - - fn handle_management_interface_exited(&self, result: talpid_ipc::Result<()>) -> Result<()> { - let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly"); - match result { - Ok(()) => Err(error.into()), - Err(e) => Err(e).chain_err(|| error), - } - } - - fn handle_trigger_shutdown_event(&mut self) -> Result<()> { - self.shutdown = true; - self.set_target_state(TargetState::Unsecured) - } - - /// Update the state of the client. If it changed, notify the subscribers and trigger - /// appropriate actions. - fn set_state(&mut self, new_state: TunnelState) -> Result<()> { - if new_state != self.state { - debug!("State {:?} => {:?}", self.state, new_state); - self.state = new_state; - self.broadcast_state(); - self.verify_state_consistency()?; - self.apply_target_state() - } else { - // Calling set_state with the same state we already have is an error. Should try to - // mitigate this possibility completely with a better state machine later. - Err(ErrorKind::InvalidState.into()) - } - } - - fn broadcast_state(&mut self) { - let new_daemon_state = DaemonState { - state: self.state.as_security_state(), - target_state: self.target_state, - }; - if self.last_broadcasted_state != new_daemon_state { - self.last_broadcasted_state = new_daemon_state; - self.management_interface_broadcaster.notify_new_state(new_daemon_state); - } - } - - // Check that the current state is valid and consistent. - fn verify_state_consistency(&self) -> Result<()> { - use TunnelState::*; - ensure!( - match self.state { - NotRunning => self.tunnel_close_handle.is_none(), - Connecting => self.tunnel_close_handle.is_some(), - Connected => self.tunnel_close_handle.is_some(), - Exiting => self.tunnel_close_handle.is_none(), - }, - ErrorKind::InvalidState - ); - Ok(()) - } - - /// Set the target state of the client. If it changed trigger the operations needed to progress - /// towards that state. - fn set_target_state(&mut self, new_state: TargetState) -> Result<()> { - if new_state != self.target_state { - debug!("Target state {:?} => {:?}", self.target_state, new_state); - self.target_state = new_state; - self.broadcast_state(); - self.apply_target_state() - } else { - Ok(()) - } - } - - fn apply_target_state(&mut self) -> Result<()> { - match (self.target_state, self.state) { - (TargetState::Secured, TunnelState::NotRunning) => { - debug!("Triggering tunnel start"); - if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") { - error!("{}", e.display()); - self.management_interface_broadcaster.notify_error(&e); - self.set_target_state(TargetState::Unsecured)?; - } - Ok(()) - } - (TargetState::Unsecured, TunnelState::Connecting) | - (TargetState::Unsecured, TunnelState::Connected) => self.kill_tunnel(), - (..) => Ok(()), - } - } - - fn start_tunnel(&mut self) -> Result<()> { - ensure!( - self.state == TunnelState::NotRunning, - ErrorKind::InvalidState - ); - let remote = self.remote_iter.next().unwrap(); - let account_token = self.account_token - .as_ref() - .ok_or(ErrorKind::InvalidSettings("No account token"))? - .clone(); - let tunnel_monitor = self.spawn_tunnel_monitor(remote, &account_token)?; - self.tunnel_close_handle = Some(tunnel_monitor.close_handle()); - self.spawn_tunnel_monitor_wait_thread(tunnel_monitor); - self.set_state(TunnelState::Connecting) - } - - fn spawn_tunnel_monitor(&self, remote: Endpoint, account_token: &str) -> Result<TunnelMonitor> { - // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync - let event_tx = Arc::new(Mutex::new(self.tx.clone())); - let on_tunnel_event = move |event| { - let _ = event_tx.lock().unwrap().send(DaemonEvent::TunnelEvent(event)); - }; - TunnelMonitor::new(remote, account_token, on_tunnel_event) - .chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor")) - } - - fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) { - let error_tx = self.tx.clone(); - thread::spawn( - move || { - let result = tunnel_monitor.wait(); - let _ = error_tx.send(DaemonEvent::TunnelExited(result)); - trace!("Tunnel monitor thread exit"); - }, - ); - } - - fn kill_tunnel(&mut self) -> Result<()> { - ensure!( - self.state == TunnelState::Connecting || self.state == TunnelState::Connected, - ErrorKind::InvalidState - ); - let close_handle = self.tunnel_close_handle.take().unwrap(); - self.set_state(TunnelState::Exiting)?; - let result_tx = self.tx.clone(); - thread::spawn( - move || { - let result = close_handle.close(); - let _ = result_tx.send(DaemonEvent::TunnelKillResult(result)); - trace!("Tunnel kill thread exit"); - }, - ); - Ok(()) - } - - pub fn shutdown_handle(&self) -> DaemonShutdownHandle { - DaemonShutdownHandle { tx: self.tx.clone() } - } -} - -struct DaemonShutdownHandle { - tx: mpsc::Sender<DaemonEvent>, -} - -impl DaemonShutdownHandle { - pub fn shutdown(&self) { - let _ = self.tx.send(DaemonEvent::TriggerShutdown); - } -} - -impl Drop for Daemon { - fn drop(self: &mut Daemon) { - if let Err(e) = rpc_info::remove().chain_err(|| "Unable to clean up rpc address file") { - error!("{}", e.display()); - } - } -} - - -quick_main!(run); - -fn run() -> Result<()> { - let config = cli::get_config(); - init_logger(config.log_level, config.log_file.as_ref())?; - - let daemon = Daemon::new().chain_err(|| "Unable to initialize daemon")?; - - let shutdown_handle = daemon.shutdown_handle(); - shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown()) - .chain_err(|| "Unable to attach shutdown signal handler")?; - - daemon.run()?; - - debug!("Mullvad daemon is quitting"); - Ok(()) -} - -fn init_logger(log_level: log::LogLevelFilter, log_file: Option<&PathBuf>) -> Result<()> { - let silenced_crates = [ - "jsonrpc_core", - "tokio_core", - "jsonrpc_ws_server", - "ws", - "mio", - ]; - let mut config = fern::Dispatch::new() - .format( - |out, message, record| { - out.finish( - format_args!( - "{}[{}][{}] {}", - chrono::Local::now().format("[%Y-%m-%d %H:%M:%S]"), - record.target(), - record.level(), - message - ), - ) - }, - ) - .level(log_level) - .chain(std::io::stdout()); - for silenced_crate in &silenced_crates { - config = config.level_for(*silenced_crate, log::LogLevelFilter::Warn); - } - if let Some(ref log_file) = log_file { - let f = fern::log_file(log_file).chain_err(|| "Failed to open log file for writing")?; - config = config.chain(f); - } - config.apply().chain_err(|| "Failed to bootstrap logging system") -} diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs deleted file mode 100644 index ffba3dca0f..0000000000 --- a/mullvad_daemon/src/management_interface.rs +++ /dev/null @@ -1,379 +0,0 @@ -use error_chain; - -use jsonrpc_core::{Error, ErrorCode, Metadata}; -use jsonrpc_core::futures::{BoxFuture, Future, future, sync}; -use jsonrpc_macros::pubsub; -use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; -use jsonrpc_ws_server; -use mullvad_types::states::{DaemonState, TargetState}; - -use serde; - -use std::collections::HashMap; -use std::collections::hash_map::Entry; -use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, Mutex, RwLock}; - -use talpid_core::mpsc::IntoSender; -use talpid_ipc; -use uuid; - - -pub type AccountToken = String; -pub type CountryCode = String; - -#[derive(Serialize)] -pub struct AccountData { - pub paid_until: String, -} - -#[derive(Serialize)] -pub struct Location { - pub latlong: [f64; 2], - pub country: String, - pub city: String, -} - - -build_rpc_trait! { - pub trait ManagementInterfaceApi { - type Metadata; - - /// Fetches and returns metadata about an account. Returns an error on non-existing - /// accounts. - #[rpc(name = "get_account_data")] - fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>; - - /// Returns available countries. - #[rpc(name = "get_countries")] - fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>; - - /// Set which account to connect with. - #[rpc(name = "set_account")] - fn set_account(&self, Option<AccountToken>) -> Result<(), Error>; - - /// Get which account is configured. - #[rpc(async, name = "get_account")] - fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error>; - - /// Set which country to connect to - #[rpc(name = "set_country")] - fn set_country(&self, CountryCode) -> Result<(), Error>; - - /// Set if the client should automatically establish a tunnel on start or not. - #[rpc(name = "set_autoconnect")] - fn set_autoconnect(&self, bool) -> Result<(), Error>; - - /// Try to connect if disconnected, or do nothing if already connecting/connected. - #[rpc(name = "connect")] - fn connect(&self) -> Result<(), Error>; - - /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already - /// disconnected. - #[rpc(name = "disconnect")] - fn disconnect(&self) -> Result<(), Error>; - - /// Returns the current state of the Mullvad client. Changes to this state will - /// be announced to subscribers of `new_state`. - #[rpc(async, name = "get_state")] - fn get_state(&self) -> BoxFuture<DaemonState, Error>; - - /// Returns the current public IP of this computer. - #[rpc(name = "get_ip")] - fn get_ip(&self) -> Result<IpAddr, Error>; - - /// Performs a geoIP lookup and returns the current location as perceived by the public - /// internet. - #[rpc(name = "get_location")] - fn get_location(&self) -> Result<Location, Error>; - - #[pubsub(name = "new_state")] { - /// Subscribes to the `new_state` event notifications. - #[rpc(name = "new_state_subscribe")] - fn new_state_subscribe(&self, Self::Metadata, pubsub::Subscriber<DaemonState>); - - /// Unsubscribes from the `new_state` event notifications. - #[rpc(name = "new_state_unsubscribe")] - fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; - } - - #[pubsub(name = "error")] { - /// Subscribes to the `error` event notifications. - #[rpc(name = "error_subscribe")] - fn error_subscribe(&self, Self::Metadata, pubsub::Subscriber<Vec<String>>); - - /// Unsubscribes from the `error` event notifications. - #[rpc(name = "error_unsubscribe")] - fn error_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; - } - } -} - - -/// Enum representing commands coming in on the management interface. -#[derive(Debug)] -pub enum TunnelCommand { - /// Change target state. - SetTargetState(TargetState), - /// Request the current state. - GetState(sync::oneshot::Sender<DaemonState>), - /// Set which account token to use for subsequent connection attempts. - SetAccount(Option<AccountToken>), - /// Request the current account token being used. - GetAccount(sync::oneshot::Sender<Option<AccountToken>>), -} - -#[derive(Default)] -struct ActiveSubscriptions { - new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonState>>>, - error_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Vec<String>>>>, -} - -pub struct ManagementInterfaceServer { - server: talpid_ipc::IpcServer, - subscriptions: Arc<ActiveSubscriptions>, -} - -impl ManagementInterfaceServer { - pub fn start<T>(tunnel_tx: IntoSender<TunnelCommand, T>) -> talpid_ipc::Result<Self> - where T: From<TunnelCommand> + 'static + Send - { - let rpc = ManagementInterface::new(tunnel_tx); - let subscriptions = rpc.subscriptions.clone(); - - let mut io = PubSubHandler::default(); - io.extend_with(rpc.to_delegate()); - let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?; - Ok( - ManagementInterfaceServer { - server, - subscriptions, - }, - ) - } - - pub fn address(&self) -> &str { - self.server.address() - } - - pub fn event_broadcaster(&self) -> EventBroadcaster { - EventBroadcaster { subscriptions: self.subscriptions.clone() } - } - - /// Consumes the server and waits for it to finish. Returns an error if the server exited - /// due to an error. - pub fn wait(self) -> talpid_ipc::Result<()> { - self.server.wait() - } -} - - -/// A handle that allows broadcasting messages to all subscribers of the management interface. -pub struct EventBroadcaster { - subscriptions: Arc<ActiveSubscriptions>, -} - -impl EventBroadcaster { - /// Sends a new state update to all `new_state` subscribers of the management interface. - pub fn notify_new_state(&self, new_state: DaemonState) { - self.notify(&self.subscriptions.new_state_subscriptions, new_state); - } - - /// Sends an error to all `error` subscribers of the management interface. - pub fn notify_error<E>(&self, error: &E) - where E: error_chain::ChainedError - { - let error_strings = error.iter().map(|e| e.to_string()).collect(); - self.notify(&self.subscriptions.error_subscriptions, error_strings); - } - - fn notify<T>(&self, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>, - value: T) - where T: serde::Serialize + Clone - { - let subscriptions = subscriptions_lock.read().unwrap(); - for sink in subscriptions.values() { - let _ = sink.notify(Ok(value.clone())).wait(); - } - } -} - -struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { - subscriptions: Arc<ActiveSubscriptions>, - tx: Mutex<IntoSender<TunnelCommand, T>>, -} - -impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { - pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self { - ManagementInterface { - subscriptions: Default::default(), - tx: Mutex::new(tx), - } - } - - fn subscribe<V>(subscriber: pubsub::Subscriber<V>, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) { - let mut subscriptions = subscriptions_lock.write().unwrap(); - loop { - let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); - if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { - if let Ok(sink) = subscriber.assign_id(id.clone()) { - debug!("Accepting new subscription with id {:?}", id); - entry.insert(sink); - } - break; - } - } - } - - fn unsubscribe<V>(id: SubscriptionId, - subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) - -> BoxFuture<(), Error> { - let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some(); - let result = if was_removed { - debug!("Unsubscribing id {:?}", id); - future::ok(()) - } else { - future::err( - Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription".to_owned(), - data: None, - }, - ) - }; - result.boxed() - } -} - -impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { - type Metadata = Meta; - - fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> { - trace!("get_account_data"); - Ok(AccountData { paid_until: "2018-12-31T16:00:00.000Z".to_owned() },) - } - - fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { - trace!("get_countries"); - Ok(HashMap::new()) - } - - fn set_account(&self, account_token: Option<AccountToken>) -> Result<(), Error> { - trace!("set_account"); - self.tx - .lock() - .unwrap() - .send(TunnelCommand::SetAccount(account_token)) - .map_err(|_| Error::internal_error()) - } - - fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error> { - trace!("get_account"); - let (tx, rx) = sync::oneshot::channel(); - match self.tx.lock().unwrap().send(TunnelCommand::GetAccount(tx)) { - Ok(()) => rx.map_err(|_| Error::internal_error()).boxed(), - Err(_) => future::err(Error::internal_error()).boxed(), - } - } - - fn set_country(&self, _country_code: CountryCode) -> Result<(), Error> { - trace!("set_country"); - Ok(()) - } - - fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { - trace!("set_autoconnect"); - Ok(()) - } - - fn connect(&self) -> Result<(), Error> { - trace!("connect"); - self.tx - .lock() - .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Secured)) - .map_err(|_| Error::internal_error()) - } - - fn disconnect(&self) -> Result<(), Error> { - trace!("disconnect"); - self.tx - .lock() - .unwrap() - .send(TunnelCommand::SetTargetState(TargetState::Unsecured)) - .map_err(|_| Error::internal_error()) - } - - fn get_state(&self) -> BoxFuture<DaemonState, Error> { - trace!("get_state"); - let (state_tx, state_rx) = sync::oneshot::channel(); - match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) { - Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(), - Err(_) => future::err(Error::internal_error()).boxed(), - } - } - - fn get_ip(&self) -> Result<IpAddr, Error> { - trace!("get_ip"); - Ok(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))) - } - - fn get_location(&self) -> Result<Location, Error> { - trace!("get_location"); - Ok( - Location { - latlong: [1.0, 2.0], - country: "narnia".to_owned(), - city: "Le city".to_owned(), - }, - ) - } - - fn new_state_subscribe(&self, - _meta: Self::Metadata, - subscriber: pubsub::Subscriber<DaemonState>) { - trace!("new_state_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions); - } - - fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - trace!("new_state_unsubscribe"); - Self::unsubscribe(id, &self.subscriptions.new_state_subscriptions) - } - - fn error_subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<Vec<String>>) { - trace!("error_subscribe"); - Self::subscribe(subscriber, &self.subscriptions.error_subscriptions); - } - - fn error_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { - trace!("error_unsubscribe"); - Self::unsubscribe(id, &self.subscriptions.error_subscriptions) - } -} - - -/// The metadata type. There is one instance associated with each connection. In this pubsub -/// scenario they are created by `meta_extractor` by the server on each new incoming -/// connection. -#[derive(Clone, Debug, Default)] -pub struct Meta { - session: Option<Arc<Session>>, -} - -/// Make the `Meta` type possible to use as jsonrpc metadata type. -impl Metadata for Meta {} - -/// Make the `Meta` type possible to use as a pubsub metadata type. -impl PubSubMetadata for Meta { - fn session(&self) -> Option<Arc<Session>> { - self.session.clone() - } -} - -/// Metadata extractor function for `Meta`. -fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { - Meta { session: Some(Arc::new(Session::new(context.sender()))) } -} diff --git a/mullvad_daemon/src/mock_ipc.rs b/mullvad_daemon/src/mock_ipc.rs deleted file mode 100644 index 90859401c2..0000000000 --- a/mullvad_daemon/src/mock_ipc.rs +++ /dev/null @@ -1,217 +0,0 @@ -use ipc_api::*; - -use jsonrpc_core::{self, Error, ErrorCode, Metadata}; -use jsonrpc_core::futures::{BoxFuture, Future, future}; -use jsonrpc_macros::pubsub; -use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; -use jsonrpc_ws_server; - -use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, RwLock, atomic}; - -use talpid_ipc; - -type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>; - -pub struct IpcServer { - server: talpid_ipc::IpcServer, -} - -impl IpcServer { - pub fn start() -> talpid_ipc::Result<Self> { - let active_subscriptions = ActiveSubscriptions::default(); - let rpc = MockIpcApi::new(active_subscriptions.clone()); - let mut io = PubSubHandler::default(); - io.extend_with(rpc.to_delegate()); - let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?; - - Self::spawn_broadcast_thread(active_subscriptions); - Ok(IpcServer { server }) - } - - pub fn address(&self) -> &str { - self.server.address() - } - - pub fn wait(self) -> talpid_ipc::Result<()> { - self.server.wait() - } - - // TODO(linus): This thread will never die. But this is just mock anyway so not important. - fn spawn_broadcast_thread(active_subscriptions: ActiveSubscriptions) { - ::std::thread::spawn( - move || loop { - { - let subscribers = active_subscriptions.read().unwrap(); - for sink in subscribers.values() { - let _ = sink.notify(Ok("Hello World!".into())).wait(); - } - } - ::std::thread::sleep(::std::time::Duration::from_secs(1)); - }, - ); - } -} - - - -/// The metadata type. There is one instance associated with each connection. In this pubsub -/// scenario they are created by `From<Sender<String>>::from` by the server on each new incoming -/// connection. -#[derive(Clone, Debug, Default)] -pub struct Meta { - session: Option<Arc<Session>>, -} - -/// Make the `Meta` type possible to use as jsonrpc metadata type. -impl Metadata for Meta {} - -/// Make the `Meta` type possible to use as a pubsub metadata type. -impl PubSubMetadata for Meta { - fn session(&self) -> Option<Arc<Session>> { - self.session.clone() - } -} - -/// Metadata extractor function for `Meta`. -fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { - Meta { session: Some(Arc::new(Session::new(context.sender()))) } -} - -/// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for -/// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to -/// a handle) to the jsonrpc client talking with `talpid_core`. -pub struct MockIpcApi { - next_subscription_id: atomic::AtomicUsize, - active: ActiveSubscriptions, - country: RwLock<CountryCode>, - tunnel_is_up: atomic::AtomicBool, -} - -impl MockIpcApi { - pub fn new(active: ActiveSubscriptions) -> Self { - MockIpcApi { - next_subscription_id: atomic::AtomicUsize::new(0), - active: active, - country: RwLock::new("se".to_owned()), - tunnel_is_up: atomic::AtomicBool::new(false), - } - } -} - -impl IpcApi for MockIpcApi { - type Metadata = Meta; - - fn get_account_data(&self, account_token: AccountToken) -> Result<AccountData, Error> { - debug!("Login for {}", account_token); - - let paid_until = if account_token.starts_with("1111") { - // accounts starting with 1111 expire in one month - Ok("2018-12-31T16:00:00.000Z".to_owned()) - } else if account_token.starts_with("2222") { - Ok("2012-12-31T16:00:00.000Z".to_owned()) - } else if account_token.starts_with("3333") { - Ok("2037-12-31T16:00:00.000Z".to_owned()) - } else { - Err(jsonrpc_core::Error::invalid_params("You are not welcome")) - }?; - Ok(AccountData { paid_until: paid_until }) - } - - fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { - let mut countries = HashMap::new(); - countries.insert("se".to_owned(), "Sweden".to_owned()); - countries.insert("de".to_owned(), "Denmark".to_owned()); - countries.insert("na".to_owned(), "Narnia".to_owned()); - Ok(countries) - } - - fn set_account(&self, _account_token: AccountToken) -> Result<(), Error> { - Ok(()) - } - - fn set_country(&self, country_code: CountryCode) -> Result<(), Error> { - *self.country.write().unwrap() = country_code; - Ok(()) - } - - fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { - Ok(()) - } - - fn connect(&self) -> Result<(), Error> { - if self.country.read().unwrap().starts_with("se") { - Err(jsonrpc_core::Error::invalid_params("Invalid server")) - } else { - self.tunnel_is_up.store(true, atomic::Ordering::SeqCst); - Ok(()) - } - } - - fn disconnect(&self) -> Result<(), Error> { - self.tunnel_is_up.store(false, atomic::Ordering::SeqCst); - Ok(()) - } - - fn get_state(&self) -> Result<SecurityState, Error> { - if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { - Ok(SecurityState::Secured) - } else { - Ok(SecurityState::Unsecured) - } - } - - fn get_ip(&self) -> Result<IpAddr, Error> { - let ip = if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { - IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)) - } else { - IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)) - } - .to_owned(); - Ok(ip) - } - - fn get_location(&self) -> Result<Location, Error> { - Ok( - if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { - Location { - latlong: [1.0, 2.0], - country: "narnia".to_owned(), - city: "Le city".to_owned(), - } - } else { - Location { - latlong: [60.0, 61.0], - country: "sweden".to_owned(), - city: "bollebygd".to_owned(), - } - }, - ) - } - - fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>) { - let id = self.next_subscription_id.fetch_add(1, atomic::Ordering::SeqCst); - let sub_id = SubscriptionId::Number(id as u64); - if let Ok(sink) = subscriber.assign_id(sub_id.clone()) { - debug!("Accepting new subscription with id {}", id); - self.active.write().unwrap().insert(sub_id, sink); - } - } - - fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> { - debug!("Unsubscribing id {:?}", id); - if self.active.write().unwrap().remove(&id).is_some() { - future::ok(true).boxed() - } else { - future::err( - Error { - code: ErrorCode::InvalidParams, - message: "Invalid subscription.".into(), - data: None, - }, - ) - .boxed() - } - } -} diff --git a/mullvad_daemon/src/rpc_info.rs b/mullvad_daemon/src/rpc_info.rs deleted file mode 100644 index 25078ff899..0000000000 --- a/mullvad_daemon/src/rpc_info.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::fs::{self, File, OpenOptions}; -use std::io::{self, Write}; -use std::path::{Path, PathBuf}; - -error_chain! { - errors { - WriteFailed(path: PathBuf) { - description("Failed to write RCP address to file") - display("Failed to write RPC address to {}", path.to_string_lossy()) - } - RemoveFailed(path: PathBuf) { - description("Failed to remove file") - display("Failed to remove {}", path.to_string_lossy()) - } - } -} - -lazy_static! { - /// The path to the file where we write the RPC address - static ref RPC_ADDRESS_FILE_PATH: PathBuf = ::std::env::temp_dir().join(".mullvad_rpc_address"); -} - -/// Writes down the RPC address to some API to a file. -pub fn write(rpc_address: &str) -> Result<()> { - open_file(RPC_ADDRESS_FILE_PATH.as_path()) - .and_then(|mut file| file.write_all(rpc_address.as_bytes())) - .chain_err(|| ErrorKind::WriteFailed(RPC_ADDRESS_FILE_PATH.to_owned()))?; - - debug!( - "Wrote RPC address to {}", - RPC_ADDRESS_FILE_PATH.to_string_lossy() - ); - Ok(()) -} - -pub fn remove() -> Result<()> { - fs::remove_file(RPC_ADDRESS_FILE_PATH.as_path()) - .chain_err(|| ErrorKind::RemoveFailed(RPC_ADDRESS_FILE_PATH.to_owned())) -} - -fn open_file(path: &Path) -> io::Result<File> { - OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(path) -} diff --git a/mullvad_daemon/src/shutdown.rs b/mullvad_daemon/src/shutdown.rs deleted file mode 100644 index 4da73c4d05..0000000000 --- a/mullvad_daemon/src/shutdown.rs +++ /dev/null @@ -1,42 +0,0 @@ -error_chain!{} - -#[cfg(unix)] -mod platform { - extern crate simple_signal; - - use self::simple_signal::Signal; - use super::Result; - - pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()> - where F: Fn() + 'static + Send - { - simple_signal::set_handler( - &[Signal::Term, Signal::Int], move |s| { - debug!("Process received signal: {:?}", s); - f(); - } - ); - Ok(()) - } -} - -#[cfg(windows)] -mod platform { - extern crate ctrlc; - - use super::{Result, ResultExt}; - - pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()> - where F: Fn() + 'static + Send - { - ctrlc::set_handler( - move || { - debug!("Process received Ctrl-c"); - f(); - }, - ) - .chain_err(|| "Unable to attach ctrl-c handler") - } -} - -pub use self::platform::*; |
