summaryrefslogtreecommitdiffhomepage
path: root/mullvad-daemon/src
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-07-17 10:36:52 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-07-17 10:48:09 +0200
commitf07f1a0262c908ff83490850b67a167f963efc2d (patch)
tree5a5209baefc0b10ba528f254e7ce2456c23f6ac9 /mullvad-daemon/src
parent6a4202fc8c55752f0fef86c04b628a3f9ada5279 (diff)
downloadmullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.tar.xz
mullvadvpn-f07f1a0262c908ff83490850b67a167f963efc2d.zip
Rename all crates from snake_case to kebab-case
Diffstat (limited to 'mullvad-daemon/src')
-rw-r--r--mullvad-daemon/src/cli.rs45
-rw-r--r--mullvad-daemon/src/main.rs498
-rw-r--r--mullvad-daemon/src/management_interface.rs379
-rw-r--r--mullvad-daemon/src/mock_ipc.rs217
-rw-r--r--mullvad-daemon/src/rpc_info.rs47
-rw-r--r--mullvad-daemon/src/shutdown.rs42
6 files changed, 1228 insertions, 0 deletions
diff --git a/mullvad-daemon/src/cli.rs b/mullvad-daemon/src/cli.rs
new file mode 100644
index 0000000000..670a2bf28b
--- /dev/null
+++ b/mullvad-daemon/src/cli.rs
@@ -0,0 +1,45 @@
+use clap::{App, Arg};
+use log;
+
+use std::path::PathBuf;
+
+pub struct Config {
+ pub log_level: log::LogLevelFilter,
+ pub log_file: Option<PathBuf>,
+}
+
+pub fn get_config() -> Config {
+ let app = create_app();
+ let matches = app.get_matches();
+
+ let log_level = match matches.occurrences_of("v") {
+ 0 => log::LogLevelFilter::Info,
+ 1 => log::LogLevelFilter::Debug,
+ _ => log::LogLevelFilter::Trace,
+ };
+ let log_file = matches.value_of_os("log_file").map(PathBuf::from);
+
+ Config {
+ log_level,
+ log_file,
+ }
+}
+
+fn create_app() -> App<'static, 'static> {
+ App::new(::CRATE_NAME)
+ .version(crate_version!())
+ .author(crate_authors!())
+ .about(crate_description!())
+ .arg(
+ Arg::with_name("v")
+ .short("v")
+ .multiple(true)
+ .help("Sets the level of verbosity."),
+ )
+ .arg(
+ Arg::with_name("log_file")
+ .long("log")
+ .takes_value(true)
+ .help("Activates file logging to the given path"),
+ )
+}
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
new file mode 100644
index 0000000000..b26ca315a8
--- /dev/null
+++ b/mullvad-daemon/src/main.rs
@@ -0,0 +1,498 @@
+#[macro_use]
+extern crate clap;
+extern crate chrono;
+#[macro_use]
+extern crate log;
+#[macro_use]
+extern crate error_chain;
+extern crate fern;
+
+extern crate serde;
+#[macro_use]
+extern crate serde_derive;
+
+extern crate jsonrpc_core;
+extern crate jsonrpc_pubsub;
+#[macro_use]
+extern crate jsonrpc_macros;
+extern crate jsonrpc_ws_server;
+extern crate uuid;
+#[macro_use]
+extern crate lazy_static;
+
+extern crate mullvad_types;
+extern crate talpid_core;
+extern crate talpid_ipc;
+
+mod cli;
+mod management_interface;
+mod rpc_info;
+mod shutdown;
+
+use error_chain::ChainedError;
+use management_interface::{ManagementInterfaceServer, TunnelCommand};
+use mullvad_types::states::{DaemonState, SecurityState, TargetState};
+use std::io;
+
+use std::path::PathBuf;
+use std::sync::{Arc, Mutex, mpsc};
+use std::thread;
+
+use talpid_core::mpsc::IntoSender;
+use talpid_core::net::{Endpoint, TransportProtocol};
+use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor};
+
+error_chain!{
+ errors {
+ /// The client is in the wrong state for the requested operation. Optimally the code should
+ /// be written in such a way so such states can't exist.
+ InvalidState {
+ description("Client is in an invalid state for the requested operation")
+ }
+ TunnelError(msg: &'static str) {
+ description("Error in the tunnel monitor")
+ display("Tunnel monitor error: {}", msg)
+ }
+ ManagementInterfaceError(msg: &'static str) {
+ description("Error in the management interface")
+ display("Management interface error: {}", msg)
+ }
+ InvalidSettings(msg: &'static str) {
+ description("Invalid settings")
+ display("Invalid Settings: {}", msg)
+ }
+ }
+}
+
+lazy_static! {
+ // Temporary store of hardcoded remotes.
+ static ref REMOTES: [Endpoint; 3] = [
+ Endpoint::new("se5.mullvad.net", 1300, TransportProtocol::Udp),
+ Endpoint::new("se6.mullvad.net", 1300, TransportProtocol::Udp),
+ Endpoint::new("se7.mullvad.net", 1300, TransportProtocol::Udp),
+ ];
+}
+
+static CRATE_NAME: &str = "mullvadd";
+
+
+/// All events that can happen in the daemon. Sent from various threads and exposed interfaces.
+pub enum DaemonEvent {
+ /// An event coming from the tunnel software to indicate a change in state.
+ TunnelEvent(TunnelEvent),
+ /// Triggered by the thread waiting for the tunnel process. Means the tunnel process exited.
+ TunnelExited(tunnel::Result<()>),
+ /// Triggered by the thread waiting for a tunnel close operation to complete. Contains the
+ /// result of trying to kill the tunnel.
+ TunnelKillResult(io::Result<()>),
+ /// An event coming from the JSONRPC-2.0 management interface.
+ ManagementInterfaceEvent(TunnelCommand),
+ /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly.
+ ManagementInterfaceExited(talpid_ipc::Result<()>),
+ /// Daemon shutdown triggered by a signal, ctrl-c or similar.
+ TriggerShutdown,
+}
+
+impl From<TunnelEvent> for DaemonEvent {
+ fn from(tunnel_event: TunnelEvent) -> Self {
+ DaemonEvent::TunnelEvent(tunnel_event)
+ }
+}
+
+impl From<TunnelCommand> for DaemonEvent {
+ fn from(tunnel_command: TunnelCommand) -> Self {
+ DaemonEvent::ManagementInterfaceEvent(tunnel_command)
+ }
+}
+
+/// Represents the internal state of the actual tunnel.
+// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub enum TunnelState {
+ /// No tunnel is running.
+ NotRunning,
+ /// The tunnel has been started, but it is not established/functional.
+ Connecting,
+ /// The tunnel is up and working.
+ Connected,
+ /// This state is active from when we manually trigger a tunnel kill until the tunnel wait
+ /// operation (TunnelExit) returned.
+ Exiting,
+}
+
+impl TunnelState {
+ pub fn as_security_state(&self) -> SecurityState {
+ match *self {
+ TunnelState::Connected => SecurityState::Secured,
+ _ => SecurityState::Unsecured,
+ }
+ }
+}
+
+
+struct Daemon {
+ state: TunnelState,
+ // The tunnel_close_handle must only exist in the Connecting and Connected states!
+ tunnel_close_handle: Option<tunnel::CloseHandle>,
+ last_broadcasted_state: DaemonState,
+ target_state: TargetState,
+ shutdown: bool,
+ rx: mpsc::Receiver<DaemonEvent>,
+ tx: mpsc::Sender<DaemonEvent>,
+ management_interface_broadcaster: management_interface::EventBroadcaster,
+
+ // Just for testing. A cyclic iterator iterating over the hardcoded remotes,
+ // picking a new one for each retry.
+ remote_iter: std::iter::Cycle<std::iter::Cloned<std::slice::Iter<'static, Endpoint>>>,
+ // The current account token for now. Should be moved into the settings later.
+ account_token: Option<String>,
+}
+
+impl Daemon {
+ pub fn new() -> Result<Self> {
+ let (tx, rx) = mpsc::channel();
+ let management_interface_broadcaster = Self::start_management_interface(tx.clone())?;
+ let state = TunnelState::NotRunning;
+ let target_state = TargetState::Unsecured;
+ Ok(
+ Daemon {
+ state,
+ tunnel_close_handle: None,
+ target_state,
+ last_broadcasted_state: DaemonState {
+ state: state.as_security_state(),
+ target_state,
+ },
+ shutdown: false,
+ rx,
+ tx,
+ management_interface_broadcaster,
+ remote_iter: REMOTES.iter().cloned().cycle(),
+ account_token: None,
+ },
+ )
+ }
+
+ // Starts the management interface and spawns a thread that will process it.
+ // Returns a handle that allows notifying all subscribers on events.
+ fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>)
+ -> Result<management_interface::EventBroadcaster> {
+ let multiplex_event_tx = IntoSender::from(event_tx.clone());
+ let server = Self::start_management_interface_server(multiplex_event_tx)?;
+ let event_broadcaster = server.event_broadcaster();
+ Self::spawn_management_interface_wait_thread(server, event_tx);
+ Ok(event_broadcaster)
+ }
+
+ fn start_management_interface_server(event_tx: IntoSender<TunnelCommand, DaemonEvent>)
+ -> Result<ManagementInterfaceServer> {
+ let server =
+ ManagementInterfaceServer::start(event_tx)
+ .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?;
+ info!(
+ "Mullvad management interface listening on {}",
+ server.address()
+ );
+ rpc_info::write(server.address()).chain_err(|| ErrorKind::ManagementInterfaceError(
+ "Failed to write RPC address to file"))?;
+ Ok(server)
+ }
+
+ fn spawn_management_interface_wait_thread(server: ManagementInterfaceServer,
+ exit_tx: mpsc::Sender<DaemonEvent>) {
+ thread::spawn(
+ move || {
+ let result = server.wait();
+ debug!("Mullvad management interface shut down");
+ let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited(result));
+ },
+ );
+ }
+
+ /// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a
+ /// shutdown event is received.
+ pub fn run(mut self) -> Result<()> {
+ while let Ok(event) = self.rx.recv() {
+ self.handle_event(event)?;
+ if self.shutdown && self.state == TunnelState::NotRunning {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ fn handle_event(&mut self, event: DaemonEvent) -> Result<()> {
+ use DaemonEvent::*;
+ match event {
+ TunnelEvent(event) => self.handle_tunnel_event(event),
+ TunnelExited(result) => self.handle_tunnel_exited(result),
+ TunnelKillResult(result) => self.handle_tunnel_kill_result(result),
+ ManagementInterfaceEvent(event) => self.handle_management_interface_event(event),
+ ManagementInterfaceExited(result) => self.handle_management_interface_exited(result),
+ TriggerShutdown => self.handle_trigger_shutdown_event(),
+ }
+ }
+
+ fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) -> Result<()> {
+ info!("Tunnel event: {:?}", tunnel_event);
+ if self.state == TunnelState::Connecting && tunnel_event == TunnelEvent::Up {
+ self.set_state(TunnelState::Connected)
+ } else if self.state == TunnelState::Connected && tunnel_event == TunnelEvent::Down {
+ self.kill_tunnel()
+ } else {
+ Ok(())
+ }
+ }
+
+ fn handle_tunnel_exited(&mut self, result: tunnel::Result<()>) -> Result<()> {
+ if let Err(e) = result.chain_err(|| "Tunnel exited in an unexpected way") {
+ error!("{}", e.display());
+ }
+ self.tunnel_close_handle = None;
+ self.set_state(TunnelState::NotRunning)
+ }
+
+ fn handle_tunnel_kill_result(&mut self, result: io::Result<()>) -> Result<()> {
+ result.chain_err(|| "Error while trying to close tunnel")
+ }
+
+ fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> {
+ use TunnelCommand::*;
+ match event {
+ SetTargetState(state) => {
+ if !self.shutdown {
+ self.set_target_state(state)?;
+ } else {
+ warn!("Ignoring target state change request due to shutdown");
+ }
+ }
+ GetState(tx) => {
+ if let Err(_) = tx.send(self.last_broadcasted_state) {
+ warn!("Unable to send current state to management interface client",);
+ }
+ }
+ SetAccount(account_token) => self.account_token = account_token,
+ GetAccount(tx) => {
+ if let Err(_) = tx.send(self.account_token.clone()) {
+ warn!("Unable to send current account to management interface client");
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn handle_management_interface_exited(&self, result: talpid_ipc::Result<()>) -> Result<()> {
+ let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly");
+ match result {
+ Ok(()) => Err(error.into()),
+ Err(e) => Err(e).chain_err(|| error),
+ }
+ }
+
+ fn handle_trigger_shutdown_event(&mut self) -> Result<()> {
+ self.shutdown = true;
+ self.set_target_state(TargetState::Unsecured)
+ }
+
+ /// Update the state of the client. If it changed, notify the subscribers and trigger
+ /// appropriate actions.
+ fn set_state(&mut self, new_state: TunnelState) -> Result<()> {
+ if new_state != self.state {
+ debug!("State {:?} => {:?}", self.state, new_state);
+ self.state = new_state;
+ self.broadcast_state();
+ self.verify_state_consistency()?;
+ self.apply_target_state()
+ } else {
+ // Calling set_state with the same state we already have is an error. Should try to
+ // mitigate this possibility completely with a better state machine later.
+ Err(ErrorKind::InvalidState.into())
+ }
+ }
+
+ fn broadcast_state(&mut self) {
+ let new_daemon_state = DaemonState {
+ state: self.state.as_security_state(),
+ target_state: self.target_state,
+ };
+ if self.last_broadcasted_state != new_daemon_state {
+ self.last_broadcasted_state = new_daemon_state;
+ self.management_interface_broadcaster.notify_new_state(new_daemon_state);
+ }
+ }
+
+ // Check that the current state is valid and consistent.
+ fn verify_state_consistency(&self) -> Result<()> {
+ use TunnelState::*;
+ ensure!(
+ match self.state {
+ NotRunning => self.tunnel_close_handle.is_none(),
+ Connecting => self.tunnel_close_handle.is_some(),
+ Connected => self.tunnel_close_handle.is_some(),
+ Exiting => self.tunnel_close_handle.is_none(),
+ },
+ ErrorKind::InvalidState
+ );
+ Ok(())
+ }
+
+ /// Set the target state of the client. If it changed trigger the operations needed to progress
+ /// towards that state.
+ fn set_target_state(&mut self, new_state: TargetState) -> Result<()> {
+ if new_state != self.target_state {
+ debug!("Target state {:?} => {:?}", self.target_state, new_state);
+ self.target_state = new_state;
+ self.broadcast_state();
+ self.apply_target_state()
+ } else {
+ Ok(())
+ }
+ }
+
+ fn apply_target_state(&mut self) -> Result<()> {
+ match (self.target_state, self.state) {
+ (TargetState::Secured, TunnelState::NotRunning) => {
+ debug!("Triggering tunnel start");
+ if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") {
+ error!("{}", e.display());
+ self.management_interface_broadcaster.notify_error(&e);
+ self.set_target_state(TargetState::Unsecured)?;
+ }
+ Ok(())
+ }
+ (TargetState::Unsecured, TunnelState::Connecting) |
+ (TargetState::Unsecured, TunnelState::Connected) => self.kill_tunnel(),
+ (..) => Ok(()),
+ }
+ }
+
+ fn start_tunnel(&mut self) -> Result<()> {
+ ensure!(
+ self.state == TunnelState::NotRunning,
+ ErrorKind::InvalidState
+ );
+ let remote = self.remote_iter.next().unwrap();
+ let account_token = self.account_token
+ .as_ref()
+ .ok_or(ErrorKind::InvalidSettings("No account token"))?
+ .clone();
+ let tunnel_monitor = self.spawn_tunnel_monitor(remote, &account_token)?;
+ self.tunnel_close_handle = Some(tunnel_monitor.close_handle());
+ self.spawn_tunnel_monitor_wait_thread(tunnel_monitor);
+ self.set_state(TunnelState::Connecting)
+ }
+
+ fn spawn_tunnel_monitor(&self, remote: Endpoint, account_token: &str) -> Result<TunnelMonitor> {
+ // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync
+ let event_tx = Arc::new(Mutex::new(self.tx.clone()));
+ let on_tunnel_event = move |event| {
+ let _ = event_tx.lock().unwrap().send(DaemonEvent::TunnelEvent(event));
+ };
+ TunnelMonitor::new(remote, account_token, on_tunnel_event)
+ .chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor"))
+ }
+
+ fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) {
+ let error_tx = self.tx.clone();
+ thread::spawn(
+ move || {
+ let result = tunnel_monitor.wait();
+ let _ = error_tx.send(DaemonEvent::TunnelExited(result));
+ trace!("Tunnel monitor thread exit");
+ },
+ );
+ }
+
+ fn kill_tunnel(&mut self) -> Result<()> {
+ ensure!(
+ self.state == TunnelState::Connecting || self.state == TunnelState::Connected,
+ ErrorKind::InvalidState
+ );
+ let close_handle = self.tunnel_close_handle.take().unwrap();
+ self.set_state(TunnelState::Exiting)?;
+ let result_tx = self.tx.clone();
+ thread::spawn(
+ move || {
+ let result = close_handle.close();
+ let _ = result_tx.send(DaemonEvent::TunnelKillResult(result));
+ trace!("Tunnel kill thread exit");
+ },
+ );
+ Ok(())
+ }
+
+ pub fn shutdown_handle(&self) -> DaemonShutdownHandle {
+ DaemonShutdownHandle { tx: self.tx.clone() }
+ }
+}
+
+struct DaemonShutdownHandle {
+ tx: mpsc::Sender<DaemonEvent>,
+}
+
+impl DaemonShutdownHandle {
+ pub fn shutdown(&self) {
+ let _ = self.tx.send(DaemonEvent::TriggerShutdown);
+ }
+}
+
+impl Drop for Daemon {
+ fn drop(self: &mut Daemon) {
+ if let Err(e) = rpc_info::remove().chain_err(|| "Unable to clean up rpc address file") {
+ error!("{}", e.display());
+ }
+ }
+}
+
+
+quick_main!(run);
+
+fn run() -> Result<()> {
+ let config = cli::get_config();
+ init_logger(config.log_level, config.log_file.as_ref())?;
+
+ let daemon = Daemon::new().chain_err(|| "Unable to initialize daemon")?;
+
+ let shutdown_handle = daemon.shutdown_handle();
+ shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown())
+ .chain_err(|| "Unable to attach shutdown signal handler")?;
+
+ daemon.run()?;
+
+ debug!("Mullvad daemon is quitting");
+ Ok(())
+}
+
+fn init_logger(log_level: log::LogLevelFilter, log_file: Option<&PathBuf>) -> Result<()> {
+ let silenced_crates = [
+ "jsonrpc_core",
+ "tokio_core",
+ "jsonrpc_ws_server",
+ "ws",
+ "mio",
+ ];
+ let mut config = fern::Dispatch::new()
+ .format(
+ |out, message, record| {
+ out.finish(
+ format_args!(
+ "{}[{}][{}] {}",
+ chrono::Local::now().format("[%Y-%m-%d %H:%M:%S]"),
+ record.target(),
+ record.level(),
+ message
+ ),
+ )
+ },
+ )
+ .level(log_level)
+ .chain(std::io::stdout());
+ for silenced_crate in &silenced_crates {
+ config = config.level_for(*silenced_crate, log::LogLevelFilter::Warn);
+ }
+ if let Some(ref log_file) = log_file {
+ let f = fern::log_file(log_file).chain_err(|| "Failed to open log file for writing")?;
+ config = config.chain(f);
+ }
+ config.apply().chain_err(|| "Failed to bootstrap logging system")
+}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
new file mode 100644
index 0000000000..ffba3dca0f
--- /dev/null
+++ b/mullvad-daemon/src/management_interface.rs
@@ -0,0 +1,379 @@
+use error_chain;
+
+use jsonrpc_core::{Error, ErrorCode, Metadata};
+use jsonrpc_core::futures::{BoxFuture, Future, future, sync};
+use jsonrpc_macros::pubsub;
+use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
+use jsonrpc_ws_server;
+use mullvad_types::states::{DaemonState, TargetState};
+
+use serde;
+
+use std::collections::HashMap;
+use std::collections::hash_map::Entry;
+use std::net::{IpAddr, Ipv4Addr};
+use std::sync::{Arc, Mutex, RwLock};
+
+use talpid_core::mpsc::IntoSender;
+use talpid_ipc;
+use uuid;
+
+
+pub type AccountToken = String;
+pub type CountryCode = String;
+
+#[derive(Serialize)]
+pub struct AccountData {
+ pub paid_until: String,
+}
+
+#[derive(Serialize)]
+pub struct Location {
+ pub latlong: [f64; 2],
+ pub country: String,
+ pub city: String,
+}
+
+
+build_rpc_trait! {
+ pub trait ManagementInterfaceApi {
+ type Metadata;
+
+ /// Fetches and returns metadata about an account. Returns an error on non-existing
+ /// accounts.
+ #[rpc(name = "get_account_data")]
+ fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>;
+
+ /// Returns available countries.
+ #[rpc(name = "get_countries")]
+ fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>;
+
+ /// Set which account to connect with.
+ #[rpc(name = "set_account")]
+ fn set_account(&self, Option<AccountToken>) -> Result<(), Error>;
+
+ /// Get which account is configured.
+ #[rpc(async, name = "get_account")]
+ fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error>;
+
+ /// Set which country to connect to
+ #[rpc(name = "set_country")]
+ fn set_country(&self, CountryCode) -> Result<(), Error>;
+
+ /// Set if the client should automatically establish a tunnel on start or not.
+ #[rpc(name = "set_autoconnect")]
+ fn set_autoconnect(&self, bool) -> Result<(), Error>;
+
+ /// Try to connect if disconnected, or do nothing if already connecting/connected.
+ #[rpc(name = "connect")]
+ fn connect(&self) -> Result<(), Error>;
+
+ /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already
+ /// disconnected.
+ #[rpc(name = "disconnect")]
+ fn disconnect(&self) -> Result<(), Error>;
+
+ /// Returns the current state of the Mullvad client. Changes to this state will
+ /// be announced to subscribers of `new_state`.
+ #[rpc(async, name = "get_state")]
+ fn get_state(&self) -> BoxFuture<DaemonState, Error>;
+
+ /// Returns the current public IP of this computer.
+ #[rpc(name = "get_ip")]
+ fn get_ip(&self) -> Result<IpAddr, Error>;
+
+ /// Performs a geoIP lookup and returns the current location as perceived by the public
+ /// internet.
+ #[rpc(name = "get_location")]
+ fn get_location(&self) -> Result<Location, Error>;
+
+ #[pubsub(name = "new_state")] {
+ /// Subscribes to the `new_state` event notifications.
+ #[rpc(name = "new_state_subscribe")]
+ fn new_state_subscribe(&self, Self::Metadata, pubsub::Subscriber<DaemonState>);
+
+ /// Unsubscribes from the `new_state` event notifications.
+ #[rpc(name = "new_state_unsubscribe")]
+ fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
+ }
+
+ #[pubsub(name = "error")] {
+ /// Subscribes to the `error` event notifications.
+ #[rpc(name = "error_subscribe")]
+ fn error_subscribe(&self, Self::Metadata, pubsub::Subscriber<Vec<String>>);
+
+ /// Unsubscribes from the `error` event notifications.
+ #[rpc(name = "error_unsubscribe")]
+ fn error_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>;
+ }
+ }
+}
+
+
+/// Enum representing commands coming in on the management interface.
+#[derive(Debug)]
+pub enum TunnelCommand {
+ /// Change target state.
+ SetTargetState(TargetState),
+ /// Request the current state.
+ GetState(sync::oneshot::Sender<DaemonState>),
+ /// Set which account token to use for subsequent connection attempts.
+ SetAccount(Option<AccountToken>),
+ /// Request the current account token being used.
+ GetAccount(sync::oneshot::Sender<Option<AccountToken>>),
+}
+
+#[derive(Default)]
+struct ActiveSubscriptions {
+ new_state_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonState>>>,
+ error_subscriptions: RwLock<HashMap<SubscriptionId, pubsub::Sink<Vec<String>>>>,
+}
+
+pub struct ManagementInterfaceServer {
+ server: talpid_ipc::IpcServer,
+ subscriptions: Arc<ActiveSubscriptions>,
+}
+
+impl ManagementInterfaceServer {
+ pub fn start<T>(tunnel_tx: IntoSender<TunnelCommand, T>) -> talpid_ipc::Result<Self>
+ where T: From<TunnelCommand> + 'static + Send
+ {
+ let rpc = ManagementInterface::new(tunnel_tx);
+ let subscriptions = rpc.subscriptions.clone();
+
+ let mut io = PubSubHandler::default();
+ io.extend_with(rpc.to_delegate());
+ let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?;
+ Ok(
+ ManagementInterfaceServer {
+ server,
+ subscriptions,
+ },
+ )
+ }
+
+ pub fn address(&self) -> &str {
+ self.server.address()
+ }
+
+ pub fn event_broadcaster(&self) -> EventBroadcaster {
+ EventBroadcaster { subscriptions: self.subscriptions.clone() }
+ }
+
+ /// Consumes the server and waits for it to finish. Returns an error if the server exited
+ /// due to an error.
+ pub fn wait(self) -> talpid_ipc::Result<()> {
+ self.server.wait()
+ }
+}
+
+
+/// A handle that allows broadcasting messages to all subscribers of the management interface.
+pub struct EventBroadcaster {
+ subscriptions: Arc<ActiveSubscriptions>,
+}
+
+impl EventBroadcaster {
+ /// Sends a new state update to all `new_state` subscribers of the management interface.
+ pub fn notify_new_state(&self, new_state: DaemonState) {
+ self.notify(&self.subscriptions.new_state_subscriptions, new_state);
+ }
+
+ /// Sends an error to all `error` subscribers of the management interface.
+ pub fn notify_error<E>(&self, error: &E)
+ where E: error_chain::ChainedError
+ {
+ let error_strings = error.iter().map(|e| e.to_string()).collect();
+ self.notify(&self.subscriptions.error_subscriptions, error_strings);
+ }
+
+ fn notify<T>(&self,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<T>>>,
+ value: T)
+ where T: serde::Serialize + Clone
+ {
+ let subscriptions = subscriptions_lock.read().unwrap();
+ for sink in subscriptions.values() {
+ let _ = sink.notify(Ok(value.clone())).wait();
+ }
+ }
+}
+
+struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> {
+ subscriptions: Arc<ActiveSubscriptions>,
+ tx: Mutex<IntoSender<TunnelCommand, T>>,
+}
+
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
+ pub fn new(tx: IntoSender<TunnelCommand, T>) -> Self {
+ ManagementInterface {
+ subscriptions: Default::default(),
+ tx: Mutex::new(tx),
+ }
+ }
+
+ fn subscribe<V>(subscriber: pubsub::Subscriber<V>,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>) {
+ let mut subscriptions = subscriptions_lock.write().unwrap();
+ loop {
+ let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string());
+ if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) {
+ if let Ok(sink) = subscriber.assign_id(id.clone()) {
+ debug!("Accepting new subscription with id {:?}", id);
+ entry.insert(sink);
+ }
+ break;
+ }
+ }
+ }
+
+ fn unsubscribe<V>(id: SubscriptionId,
+ subscriptions_lock: &RwLock<HashMap<SubscriptionId, pubsub::Sink<V>>>)
+ -> BoxFuture<(), Error> {
+ let was_removed = subscriptions_lock.write().unwrap().remove(&id).is_some();
+ let result = if was_removed {
+ debug!("Unsubscribing id {:?}", id);
+ future::ok(())
+ } else {
+ future::err(
+ Error {
+ code: ErrorCode::InvalidParams,
+ message: "Invalid subscription".to_owned(),
+ data: None,
+ },
+ )
+ };
+ result.boxed()
+ }
+}
+
+impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> {
+ type Metadata = Meta;
+
+ fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> {
+ trace!("get_account_data");
+ Ok(AccountData { paid_until: "2018-12-31T16:00:00.000Z".to_owned() },)
+ }
+
+ fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> {
+ trace!("get_countries");
+ Ok(HashMap::new())
+ }
+
+ fn set_account(&self, account_token: Option<AccountToken>) -> Result<(), Error> {
+ trace!("set_account");
+ self.tx
+ .lock()
+ .unwrap()
+ .send(TunnelCommand::SetAccount(account_token))
+ .map_err(|_| Error::internal_error())
+ }
+
+ fn get_account(&self) -> BoxFuture<Option<AccountToken>, Error> {
+ trace!("get_account");
+ let (tx, rx) = sync::oneshot::channel();
+ match self.tx.lock().unwrap().send(TunnelCommand::GetAccount(tx)) {
+ Ok(()) => rx.map_err(|_| Error::internal_error()).boxed(),
+ Err(_) => future::err(Error::internal_error()).boxed(),
+ }
+ }
+
+ fn set_country(&self, _country_code: CountryCode) -> Result<(), Error> {
+ trace!("set_country");
+ Ok(())
+ }
+
+ fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> {
+ trace!("set_autoconnect");
+ Ok(())
+ }
+
+ fn connect(&self) -> Result<(), Error> {
+ trace!("connect");
+ self.tx
+ .lock()
+ .unwrap()
+ .send(TunnelCommand::SetTargetState(TargetState::Secured))
+ .map_err(|_| Error::internal_error())
+ }
+
+ fn disconnect(&self) -> Result<(), Error> {
+ trace!("disconnect");
+ self.tx
+ .lock()
+ .unwrap()
+ .send(TunnelCommand::SetTargetState(TargetState::Unsecured))
+ .map_err(|_| Error::internal_error())
+ }
+
+ fn get_state(&self) -> BoxFuture<DaemonState, Error> {
+ trace!("get_state");
+ let (state_tx, state_rx) = sync::oneshot::channel();
+ match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx)) {
+ Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(),
+ Err(_) => future::err(Error::internal_error()).boxed(),
+ }
+ }
+
+ fn get_ip(&self) -> Result<IpAddr, Error> {
+ trace!("get_ip");
+ Ok(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)))
+ }
+
+ fn get_location(&self) -> Result<Location, Error> {
+ trace!("get_location");
+ Ok(
+ Location {
+ latlong: [1.0, 2.0],
+ country: "narnia".to_owned(),
+ city: "Le city".to_owned(),
+ },
+ )
+ }
+
+ fn new_state_subscribe(&self,
+ _meta: Self::Metadata,
+ subscriber: pubsub::Subscriber<DaemonState>) {
+ trace!("new_state_subscribe");
+ Self::subscribe(subscriber, &self.subscriptions.new_state_subscriptions);
+ }
+
+ fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
+ trace!("new_state_unsubscribe");
+ Self::unsubscribe(id, &self.subscriptions.new_state_subscriptions)
+ }
+
+ fn error_subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<Vec<String>>) {
+ trace!("error_subscribe");
+ Self::subscribe(subscriber, &self.subscriptions.error_subscriptions);
+ }
+
+ fn error_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> {
+ trace!("error_unsubscribe");
+ Self::unsubscribe(id, &self.subscriptions.error_subscriptions)
+ }
+}
+
+
+/// The metadata type. There is one instance associated with each connection. In this pubsub
+/// scenario they are created by `meta_extractor` by the server on each new incoming
+/// connection.
+#[derive(Clone, Debug, Default)]
+pub struct Meta {
+ session: Option<Arc<Session>>,
+}
+
+/// Make the `Meta` type possible to use as jsonrpc metadata type.
+impl Metadata for Meta {}
+
+/// Make the `Meta` type possible to use as a pubsub metadata type.
+impl PubSubMetadata for Meta {
+ fn session(&self) -> Option<Arc<Session>> {
+ self.session.clone()
+ }
+}
+
+/// Metadata extractor function for `Meta`.
+fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta {
+ Meta { session: Some(Arc::new(Session::new(context.sender()))) }
+}
diff --git a/mullvad-daemon/src/mock_ipc.rs b/mullvad-daemon/src/mock_ipc.rs
new file mode 100644
index 0000000000..90859401c2
--- /dev/null
+++ b/mullvad-daemon/src/mock_ipc.rs
@@ -0,0 +1,217 @@
+use ipc_api::*;
+
+use jsonrpc_core::{self, Error, ErrorCode, Metadata};
+use jsonrpc_core::futures::{BoxFuture, Future, future};
+use jsonrpc_macros::pubsub;
+use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
+use jsonrpc_ws_server;
+
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr};
+use std::sync::{Arc, RwLock, atomic};
+
+use talpid_ipc;
+
+type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>;
+
+pub struct IpcServer {
+ server: talpid_ipc::IpcServer,
+}
+
+impl IpcServer {
+ pub fn start() -> talpid_ipc::Result<Self> {
+ let active_subscriptions = ActiveSubscriptions::default();
+ let rpc = MockIpcApi::new(active_subscriptions.clone());
+ let mut io = PubSubHandler::default();
+ io.extend_with(rpc.to_delegate());
+ let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?;
+
+ Self::spawn_broadcast_thread(active_subscriptions);
+ Ok(IpcServer { server })
+ }
+
+ pub fn address(&self) -> &str {
+ self.server.address()
+ }
+
+ pub fn wait(self) -> talpid_ipc::Result<()> {
+ self.server.wait()
+ }
+
+ // TODO(linus): This thread will never die. But this is just mock anyway so not important.
+ fn spawn_broadcast_thread(active_subscriptions: ActiveSubscriptions) {
+ ::std::thread::spawn(
+ move || loop {
+ {
+ let subscribers = active_subscriptions.read().unwrap();
+ for sink in subscribers.values() {
+ let _ = sink.notify(Ok("Hello World!".into())).wait();
+ }
+ }
+ ::std::thread::sleep(::std::time::Duration::from_secs(1));
+ },
+ );
+ }
+}
+
+
+
+/// The metadata type. There is one instance associated with each connection. In this pubsub
+/// scenario they are created by `From<Sender<String>>::from` by the server on each new incoming
+/// connection.
+#[derive(Clone, Debug, Default)]
+pub struct Meta {
+ session: Option<Arc<Session>>,
+}
+
+/// Make the `Meta` type possible to use as jsonrpc metadata type.
+impl Metadata for Meta {}
+
+/// Make the `Meta` type possible to use as a pubsub metadata type.
+impl PubSubMetadata for Meta {
+ fn session(&self) -> Option<Arc<Session>> {
+ self.session.clone()
+ }
+}
+
+/// Metadata extractor function for `Meta`.
+fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta {
+ Meta { session: Some(Arc::new(Session::new(context.sender()))) }
+}
+
+/// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for
+/// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to
+/// a handle) to the jsonrpc client talking with `talpid_core`.
+pub struct MockIpcApi {
+ next_subscription_id: atomic::AtomicUsize,
+ active: ActiveSubscriptions,
+ country: RwLock<CountryCode>,
+ tunnel_is_up: atomic::AtomicBool,
+}
+
+impl MockIpcApi {
+ pub fn new(active: ActiveSubscriptions) -> Self {
+ MockIpcApi {
+ next_subscription_id: atomic::AtomicUsize::new(0),
+ active: active,
+ country: RwLock::new("se".to_owned()),
+ tunnel_is_up: atomic::AtomicBool::new(false),
+ }
+ }
+}
+
+impl IpcApi for MockIpcApi {
+ type Metadata = Meta;
+
+ fn get_account_data(&self, account_token: AccountToken) -> Result<AccountData, Error> {
+ debug!("Login for {}", account_token);
+
+ let paid_until = if account_token.starts_with("1111") {
+ // accounts starting with 1111 expire in one month
+ Ok("2018-12-31T16:00:00.000Z".to_owned())
+ } else if account_token.starts_with("2222") {
+ Ok("2012-12-31T16:00:00.000Z".to_owned())
+ } else if account_token.starts_with("3333") {
+ Ok("2037-12-31T16:00:00.000Z".to_owned())
+ } else {
+ Err(jsonrpc_core::Error::invalid_params("You are not welcome"))
+ }?;
+ Ok(AccountData { paid_until: paid_until })
+ }
+
+ fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> {
+ let mut countries = HashMap::new();
+ countries.insert("se".to_owned(), "Sweden".to_owned());
+ countries.insert("de".to_owned(), "Denmark".to_owned());
+ countries.insert("na".to_owned(), "Narnia".to_owned());
+ Ok(countries)
+ }
+
+ fn set_account(&self, _account_token: AccountToken) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn set_country(&self, country_code: CountryCode) -> Result<(), Error> {
+ *self.country.write().unwrap() = country_code;
+ Ok(())
+ }
+
+ fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn connect(&self) -> Result<(), Error> {
+ if self.country.read().unwrap().starts_with("se") {
+ Err(jsonrpc_core::Error::invalid_params("Invalid server"))
+ } else {
+ self.tunnel_is_up.store(true, atomic::Ordering::SeqCst);
+ Ok(())
+ }
+ }
+
+ fn disconnect(&self) -> Result<(), Error> {
+ self.tunnel_is_up.store(false, atomic::Ordering::SeqCst);
+ Ok(())
+ }
+
+ fn get_state(&self) -> Result<SecurityState, Error> {
+ if self.tunnel_is_up.load(atomic::Ordering::SeqCst) {
+ Ok(SecurityState::Secured)
+ } else {
+ Ok(SecurityState::Unsecured)
+ }
+ }
+
+ fn get_ip(&self) -> Result<IpAddr, Error> {
+ let ip = if self.tunnel_is_up.load(atomic::Ordering::SeqCst) {
+ IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))
+ } else {
+ IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
+ }
+ .to_owned();
+ Ok(ip)
+ }
+
+ fn get_location(&self) -> Result<Location, Error> {
+ Ok(
+ if self.tunnel_is_up.load(atomic::Ordering::SeqCst) {
+ Location {
+ latlong: [1.0, 2.0],
+ country: "narnia".to_owned(),
+ city: "Le city".to_owned(),
+ }
+ } else {
+ Location {
+ latlong: [60.0, 61.0],
+ country: "sweden".to_owned(),
+ city: "bollebygd".to_owned(),
+ }
+ },
+ )
+ }
+
+ fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>) {
+ let id = self.next_subscription_id.fetch_add(1, atomic::Ordering::SeqCst);
+ let sub_id = SubscriptionId::Number(id as u64);
+ if let Ok(sink) = subscriber.assign_id(sub_id.clone()) {
+ debug!("Accepting new subscription with id {}", id);
+ self.active.write().unwrap().insert(sub_id, sink);
+ }
+ }
+
+ fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
+ debug!("Unsubscribing id {:?}", id);
+ if self.active.write().unwrap().remove(&id).is_some() {
+ future::ok(true).boxed()
+ } else {
+ future::err(
+ Error {
+ code: ErrorCode::InvalidParams,
+ message: "Invalid subscription.".into(),
+ data: None,
+ },
+ )
+ .boxed()
+ }
+ }
+}
diff --git a/mullvad-daemon/src/rpc_info.rs b/mullvad-daemon/src/rpc_info.rs
new file mode 100644
index 0000000000..25078ff899
--- /dev/null
+++ b/mullvad-daemon/src/rpc_info.rs
@@ -0,0 +1,47 @@
+use std::fs::{self, File, OpenOptions};
+use std::io::{self, Write};
+use std::path::{Path, PathBuf};
+
+error_chain! {
+ errors {
+ WriteFailed(path: PathBuf) {
+ description("Failed to write RCP address to file")
+ display("Failed to write RPC address to {}", path.to_string_lossy())
+ }
+ RemoveFailed(path: PathBuf) {
+ description("Failed to remove file")
+ display("Failed to remove {}", path.to_string_lossy())
+ }
+ }
+}
+
+lazy_static! {
+ /// The path to the file where we write the RPC address
+ static ref RPC_ADDRESS_FILE_PATH: PathBuf = ::std::env::temp_dir().join(".mullvad_rpc_address");
+}
+
+/// Writes down the RPC address to some API to a file.
+pub fn write(rpc_address: &str) -> Result<()> {
+ open_file(RPC_ADDRESS_FILE_PATH.as_path())
+ .and_then(|mut file| file.write_all(rpc_address.as_bytes()))
+ .chain_err(|| ErrorKind::WriteFailed(RPC_ADDRESS_FILE_PATH.to_owned()))?;
+
+ debug!(
+ "Wrote RPC address to {}",
+ RPC_ADDRESS_FILE_PATH.to_string_lossy()
+ );
+ Ok(())
+}
+
+pub fn remove() -> Result<()> {
+ fs::remove_file(RPC_ADDRESS_FILE_PATH.as_path())
+ .chain_err(|| ErrorKind::RemoveFailed(RPC_ADDRESS_FILE_PATH.to_owned()))
+}
+
+fn open_file(path: &Path) -> io::Result<File> {
+ OpenOptions::new()
+ .write(true)
+ .truncate(true)
+ .create(true)
+ .open(path)
+}
diff --git a/mullvad-daemon/src/shutdown.rs b/mullvad-daemon/src/shutdown.rs
new file mode 100644
index 0000000000..4da73c4d05
--- /dev/null
+++ b/mullvad-daemon/src/shutdown.rs
@@ -0,0 +1,42 @@
+error_chain!{}
+
+#[cfg(unix)]
+mod platform {
+ extern crate simple_signal;
+
+ use self::simple_signal::Signal;
+ use super::Result;
+
+ pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()>
+ where F: Fn() + 'static + Send
+ {
+ simple_signal::set_handler(
+ &[Signal::Term, Signal::Int], move |s| {
+ debug!("Process received signal: {:?}", s);
+ f();
+ }
+ );
+ Ok(())
+ }
+}
+
+#[cfg(windows)]
+mod platform {
+ extern crate ctrlc;
+
+ use super::{Result, ResultExt};
+
+ pub fn set_shutdown_signal_handler<F>(f: F) -> Result<()>
+ where F: Fn() + 'static + Send
+ {
+ ctrlc::set_handler(
+ move || {
+ debug!("Process received Ctrl-c");
+ f();
+ },
+ )
+ .chain_err(|| "Unable to attach ctrl-c handler")
+ }
+}
+
+pub use self::platform::*;