use jsonrpc_core::{ futures::{ future, sync::{self, oneshot::Sender as OneshotSender}, Future, }, Error, ErrorCode, MetaIoHandler, Metadata, }; use jsonrpc_ipc_server; use jsonrpc_macros::{build_rpc_trait, metadata, pubsub}; use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; use mullvad_paths; use mullvad_rpc; use mullvad_types::{ account::{AccountData, AccountToken}, location::GeoIpLocation, relay_constraints::RelaySettingsUpdate, relay_list::RelayList, settings::{self, Settings}, states::TargetState, version, DaemonEvent, }; use std::{ collections::{hash_map::Entry, HashMap}, sync::{Arc, Mutex, RwLock}, }; use talpid_core::mpsc::IntoSender; use talpid_ipc; use talpid_types::{ net::{openvpn, wireguard}, tunnel::TunnelStateTransition, ErrorExt, }; use uuid; /// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core /// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196). /// Remove this and use the one in jsonrpc_core when that is released. pub type BoxFuture = Box + Send>; build_rpc_trait! { pub trait ManagementInterfaceApi { type Metadata; /// Fetches and returns metadata about an account. Returns an error on non-existing /// accounts. #[rpc(meta, name = "get_account_data")] fn get_account_data(&self, Self::Metadata, AccountToken) -> BoxFuture; /// Returns available countries. #[rpc(meta, name = "get_relay_locations")] fn get_relay_locations(&self, Self::Metadata) -> BoxFuture; /// Triggers a relay list update #[rpc(meta, name = "update_relay_locations")] fn update_relay_locations(&self, Self::Metadata) -> BoxFuture<(), Error>; /// Set which account to connect with. #[rpc(meta, name = "set_account")] fn set_account(&self, Self::Metadata, Option) -> BoxFuture<(), Error>; /// Update constraints put on the type of tunnel connection to use #[rpc(meta, name = "update_relay_settings")] fn update_relay_settings( &self, Self::Metadata, RelaySettingsUpdate ) -> BoxFuture<(), Error>; /// Set if the client should allow communication with the LAN while in secured state. #[rpc(meta, name = "set_allow_lan")] fn set_allow_lan(&self, Self::Metadata, bool) -> BoxFuture<(), Error>; /// Set if the client should allow network communication when in the disconnected state. #[rpc(meta, name = "set_block_when_disconnected")] fn set_block_when_disconnected(&self, Self::Metadata, bool) -> BoxFuture<(), Error>; /// Set if the daemon should automatically establish a tunnel on start or not. #[rpc(meta, name = "set_auto_connect")] fn set_auto_connect(&self, Self::Metadata, bool) -> BoxFuture<(), Error>; /// Try to connect if disconnected, or do nothing if already connecting/connected. #[rpc(meta, name = "connect")] fn connect(&self, Self::Metadata) -> BoxFuture<(), Error>; /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already /// disconnected. #[rpc(meta, name = "disconnect")] fn disconnect(&self, Self::Metadata) -> BoxFuture<(), Error>; /// Returns the current state of the Mullvad client. Changes to this state will /// be announced to subscribers of `new_state`. #[rpc(meta, name = "get_state")] fn get_state(&self, Self::Metadata) -> BoxFuture; /// Performs a geoIP lookup and returns the current location as perceived by the public /// internet. #[rpc(meta, name = "get_current_location")] fn get_current_location(&self, Self::Metadata) -> BoxFuture, Error>; /// Makes the daemon exit its main loop and quit. #[rpc(meta, name = "shutdown")] fn shutdown(&self, Self::Metadata) -> BoxFuture<(), Error>; /// Get previously used account tokens from the account history #[rpc(meta, name = "get_account_history")] fn get_account_history(&self, Self::Metadata) -> BoxFuture, Error>; /// Remove given account token from the account history #[rpc(meta, name = "remove_account_from_history")] fn remove_account_from_history(&self, Self::Metadata, AccountToken) -> BoxFuture<(), Error>; /// Sets openvpn's mssfix parameter #[rpc(meta, name = "set_openvpn_mssfix")] fn set_openvpn_mssfix(&self, Self::Metadata, Option) -> BoxFuture<(), Error>; /// Sets proxy details for OpenVPN #[rpc(meta, name = "set_openvpn_proxy")] fn set_openvpn_proxy(&self, Self::Metadata, Option) -> BoxFuture<(), Error>; /// Set if IPv6 is enabled in the tunnel #[rpc(meta, name = "set_enable_ipv6")] fn set_enable_ipv6(&self, Self::Metadata, bool) -> BoxFuture<(), Error>; /// Set MTU for wireguard tunnels #[rpc(meta, name = "set_wireguard_mtu")] fn set_wireguard_mtu(&self, Self::Metadata, Option) -> BoxFuture<(), Error>; /// Returns the current daemon settings #[rpc(meta, name = "get_settings")] fn get_settings(&self, Self::Metadata) -> BoxFuture; /// Generates new wireguard key for current account #[rpc(meta, name = "generate_wireguard_key")] fn generate_wireguard_key(&self, Self::Metadata) -> BoxFuture<(), Error>; /// Retrieve a public key for current account if the account has one. #[rpc(meta, name = "get_wireguard_key")] fn get_wireguard_key(&self, Self::Metadata) -> BoxFuture, Error>; /// Verify if current wireguard key is still valid #[rpc(meta, name = "verify_wireguard_key")] fn verify_wireguard_key(&self, Self::Metadata) -> BoxFuture; /// Retreive version of the app #[rpc(meta, name = "get_current_version")] fn get_current_version(&self, Self::Metadata) -> BoxFuture; /// Retrieve information about the currently running and latest versions of the app #[rpc(meta, name = "get_version_info")] fn get_version_info(&self, Self::Metadata) -> BoxFuture; #[pubsub(name = "daemon_event")] { /// Subscribes to events from the daemon. #[rpc(name = "daemon_event_subscribe")] fn daemon_event_subscribe( &self, Self::Metadata, pubsub::Subscriber ); /// Unsubscribes from the `daemon_event` event notifications. #[rpc(name = "daemon_event_unsubscribe")] fn daemon_event_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; } } } /// Enum representing commands coming in on the management interface. pub enum ManagementCommand { /// Change target state. SetTargetState(OneshotSender>, TargetState), /// Request the current state. GetState(OneshotSender), /// Get the current geographical location. GetCurrentLocation(OneshotSender>), /// Request the metadata for an account. GetAccountData( OneshotSender>, AccountToken, ), /// Request account history GetAccountHistory(OneshotSender>), /// Request account history RemoveAccountFromHistory(OneshotSender<()>, AccountToken), /// Get the list of countries and cities where there are relays. GetRelayLocations(OneshotSender), /// Trigger an asynchronous relay list update. This returns before the relay list is actually /// updated. UpdateRelayLocations, /// Set which account token to use for subsequent connection attempts. SetAccount(OneshotSender<()>, Option), /// Place constraints on the type of tunnel and relay UpdateRelaySettings(OneshotSender<()>, RelaySettingsUpdate), /// Set the allow LAN setting. SetAllowLan(OneshotSender<()>, bool), /// Set the block_when_disconnected setting. SetBlockWhenDisconnected(OneshotSender<()>, bool), /// Set the auto-connect setting. SetAutoConnect(OneshotSender<()>, bool), /// Set the mssfix argument for OpenVPN SetOpenVpnMssfix(OneshotSender<()>, Option), /// Set proxy details for OpenVPN SetOpenVpnProxy( OneshotSender>, Option, ), /// Set if IPv6 should be enabled in the tunnel SetEnableIpv6(OneshotSender<()>, bool), /// Set MTU for wireguard tunnels SetWireguardMtu(OneshotSender<()>, Option), /// Get the daemon settings GetSettings(OneshotSender), /// Generate new wireguard key GenerateWireguardKey(OneshotSender>), /// Return a public key of the currently set wireguard private key, if there is one GetWireguardKey(OneshotSender>), /// Verify if the currently set wireguard key is valid. VerifyWireguardKey(OneshotSender), /// Get information about the currently running and latest app versions GetVersionInfo(OneshotSender>), /// Get current version of the app GetCurrentVersion(OneshotSender), /// Makes the daemon exit the main loop and quit. Shutdown, } pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, subscriptions: Arc>>>, } impl ManagementInterfaceServer { pub fn start(tunnel_tx: IntoSender) -> Result where T: From + 'static + Send, { let rpc = ManagementInterface::new(tunnel_tx); let subscriptions = rpc.subscriptions.clone(); let mut io = PubSubHandler::default(); io.extend_with(rpc.to_delegate()); let meta_io: MetaIoHandler = io.into(); let path = mullvad_paths::get_rpc_socket_path(); let server = talpid_ipc::IpcServer::start_with_metadata( meta_io, meta_extractor, &path.to_string_lossy(), )?; Ok(ManagementInterfaceServer { server, subscriptions, }) } pub fn socket_path(&self) -> &str { self.server.path() } pub fn event_broadcaster(&self) -> EventBroadcaster { EventBroadcaster { subscriptions: self.subscriptions.clone(), } } /// Consumes the server and waits for it to finish. Returns an error if the server exited /// due to an error. pub fn wait(self) { self.server.wait() } } /// A handle that allows broadcasting messages to all subscribers of the management interface. #[derive(Clone)] pub struct EventBroadcaster { subscriptions: Arc>>>, } impl EventBroadcaster { /// Sends a new state update to all `new_state` subscribers of the management interface. pub fn notify_new_state(&self, new_state: TunnelStateTransition) { log::debug!("Broadcasting new state: {:?}", new_state); self.notify(DaemonEvent::StateTransition(new_state)); } /// Sends settings to all `settings` subscribers of the management interface. pub fn notify_settings(&self, settings: Settings) { log::debug!("Broadcasting new settings"); self.notify(DaemonEvent::Settings(settings)); } /// Sends settings to all `settings` subscribers of the management interface. pub fn notify_relay_list(&self, relay_list: RelayList) { log::debug!("Broadcasting new relay list"); self.notify(DaemonEvent::RelayList(relay_list)); } fn notify(&self, value: DaemonEvent) { let subscriptions = self.subscriptions.read().unwrap(); for sink in subscriptions.values() { let _ = sink.notify(Ok(value.clone())).wait(); } } } struct ManagementInterface + 'static + Send> { subscriptions: Arc>>>, tx: Mutex>, } impl + 'static + Send> ManagementInterface { pub fn new(tx: IntoSender) -> Self { ManagementInterface { subscriptions: Default::default(), tx: Mutex::new(tx), } } /// Sends a command to the daemon and maps the error to an RPC error. fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> { Box::new( future::result(self.tx.lock().unwrap().send(command)) .map_err(|_| Error::internal_error()), ) } /// Converts the given error to an error that can be given to the caller of the API. /// Will let any actual RPC error through as is, any other error is changed to an internal /// error. fn map_rpc_error(error: &mullvad_rpc::Error) -> Error { match error.kind() { mullvad_rpc::ErrorKind::JsonRpcError(ref rpc_error) => { // We have to manually copy the error since we have different // versions of the jsonrpc_core library at the moment. Error { code: ErrorCode::from(rpc_error.code.code()), message: rpc_error.message.clone(), data: rpc_error.data.clone(), } } _ => Error::internal_error(), } } } impl + 'static + Send> ManagementInterfaceApi for ManagementInterface { type Metadata = Meta; fn get_account_data( &self, _: Self::Metadata, account_token: AccountToken, ) -> BoxFuture { log::debug!("get_account_data"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetAccountData(tx, account_token)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|rpc_future| { rpc_future.map_err(|error: mullvad_rpc::Error| { log::error!( "Unable to get account data from API: {}", error.display_chain() ); Self::map_rpc_error(&error) }) }); Box::new(future) } fn get_relay_locations(&self, _: Self::Metadata) -> BoxFuture { log::debug!("get_relay_locations"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetRelayLocations(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn update_relay_locations(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("update_relay_locations"); self.send_command_to_daemon(ManagementCommand::UpdateRelayLocations) } fn set_account( &self, _: Self::Metadata, account_token: Option, ) -> BoxFuture<(), Error> { log::debug!("set_account"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetAccount(tx, account_token.clone())) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn update_relay_settings( &self, _: Self::Metadata, constraints_update: RelaySettingsUpdate, ) -> BoxFuture<(), Error> { log::debug!("update_relay_settings"); let (tx, rx) = sync::oneshot::channel(); let message = ManagementCommand::UpdateRelaySettings(tx, constraints_update); let future = self .send_command_to_daemon(message) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn set_allow_lan(&self, _: Self::Metadata, allow_lan: bool) -> BoxFuture<(), Error> { log::debug!("set_allow_lan({})", allow_lan); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetAllowLan(tx, allow_lan)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn set_block_when_disconnected( &self, _: Self::Metadata, block_when_disconnected: bool, ) -> BoxFuture<(), Error> { log::debug!("set_block_when_disconnected({})", block_when_disconnected); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetBlockWhenDisconnected( tx, block_when_disconnected, )) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn set_auto_connect(&self, _: Self::Metadata, auto_connect: bool) -> BoxFuture<(), Error> { log::debug!("set_auto_connect({})", auto_connect); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetAutoConnect(tx, auto_connect)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn connect(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("connect"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetTargetState(tx, TargetState::Secured)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|result| match result { Ok(()) => future::ok(()), Err(()) => future::err(Error { code: ErrorCode::ServerError(-900), message: "No account token configured".to_owned(), data: None, }), }); Box::new(future) } fn disconnect(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("disconnect"); let (tx, _) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetTargetState( tx, TargetState::Unsecured, )) .then(|_| future::ok(())); Box::new(future) } fn get_state(&self, _: Self::Metadata) -> BoxFuture { log::debug!("get_state"); let (state_tx, state_rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetState(state_tx)) .and_then(|_| state_rx.map_err(|_| Error::internal_error())); Box::new(future) } fn get_current_location(&self, _: Self::Metadata) -> BoxFuture, Error> { log::debug!("get_current_location"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetCurrentLocation(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn shutdown(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("shutdown"); self.send_command_to_daemon(ManagementCommand::Shutdown) } fn get_account_history(&self, _: Self::Metadata) -> BoxFuture, Error> { log::debug!("get_account_history"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetAccountHistory(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn remove_account_from_history( &self, _: Self::Metadata, account_token: AccountToken, ) -> BoxFuture<(), Error> { log::debug!("remove_account_from_history"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::RemoveAccountFromHistory( tx, account_token, )) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn set_openvpn_mssfix(&self, _: Self::Metadata, mssfix: Option) -> BoxFuture<(), Error> { log::debug!("set_openvpn_mssfix({:?})", mssfix); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetOpenVpnMssfix(tx, mssfix)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn set_openvpn_proxy( &self, _: Self::Metadata, proxy: Option, ) -> BoxFuture<(), Error> { log::debug!("set_openvpn_proxy({:?})", proxy); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetOpenVpnProxy(tx, proxy)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|settings_result| { settings_result.map_err(|error| match error { settings::Error::InvalidProxyData(reason) => { Error::invalid_params(reason.to_owned()) } _ => Error::internal_error(), }) }); Box::new(future) } fn set_enable_ipv6(&self, _: Self::Metadata, enable_ipv6: bool) -> BoxFuture<(), Error> { log::debug!("set_enable_ipv6({})", enable_ipv6); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetEnableIpv6(tx, enable_ipv6)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } /// Set MTU for wireguard tunnels fn set_wireguard_mtu(&self, _: Self::Metadata, mtu: Option) -> BoxFuture<(), Error> { log::debug!("set_wireguard_mtu({:?})", mtu); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::SetWireguardMtu(tx, mtu)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn get_settings(&self, _: Self::Metadata) -> BoxFuture { log::debug!("get_settings"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetSettings(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn generate_wireguard_key(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("generate_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GenerateWireguardKey(tx)) .and_then(|_| { rx.map_err(|_| Error::internal_error()) .and_then(|res| future::result(res.map_err(|_| Error::internal_error()))) }); Box::new(future) } fn get_wireguard_key( &self, _: Self::Metadata, ) -> BoxFuture, Error> { log::debug!("get_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetWireguardKey(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn verify_wireguard_key(&self, _: Self::Metadata) -> BoxFuture { log::debug!("verify_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::VerifyWireguardKey(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn get_current_version(&self, _: Self::Metadata) -> BoxFuture { log::debug!("get_current_version"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetCurrentVersion(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn get_version_info(&self, _: Self::Metadata) -> BoxFuture { log::debug!("get_version_info"); let (tx, rx) = sync::oneshot::channel(); let future = self .send_command_to_daemon(ManagementCommand::GetVersionInfo(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|version_future| { version_future.map_err(|error| { log::error!( "Unable to get version data from API: {}", error.display_chain() ); Self::map_rpc_error(&error) }) }); Box::new(future) } fn daemon_event_subscribe( &self, _: Self::Metadata, subscriber: pubsub::Subscriber, ) { log::debug!("daemon_event_subscribe"); let mut subscriptions = self.subscriptions.write().unwrap(); loop { let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); if let Entry::Vacant(entry) = subscriptions.entry(id.clone()) { if let Ok(sink) = subscriber.assign_id(id.clone()) { log::debug!("Accepting new subscription with id {:?}", id); entry.insert(sink); } break; } } } fn daemon_event_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { log::debug!("daemon_event_unsubscribe"); let was_removed = self.subscriptions.write().unwrap().remove(&id).is_some(); let result = if was_removed { log::debug!("Unsubscribing id {:?}", id); future::ok(()) } else { future::err(Error { code: ErrorCode::InvalidParams, message: "Invalid subscription".to_owned(), data: None, }) }; Box::new(result) } } /// The metadata type. There is one instance associated with each connection. In this pubsub /// scenario they are created by `meta_extractor` by the server on each new incoming /// connection. #[derive(Clone, Debug, Default)] pub struct Meta { session: Option>, } /// Make the `Meta` type possible to use as jsonrpc metadata type. impl Metadata for Meta {} /// Make the `Meta` type possible to use as a pubsub metadata type. impl PubSubMetadata for Meta { fn session(&self) -> Option> { self.session.clone() } } /// Metadata extractor function for `Meta`. fn meta_extractor(context: &jsonrpc_ipc_server::RequestContext<'_>) -> Meta { Meta { session: Some(Arc::new(Session::new(context.sender.clone()))), } }