diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-05-09 17:17:26 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-05-09 17:56:12 +0200 |
| commit | 7cb5f4713766b1b0632dcef053d84b3bacf82fe5 (patch) | |
| tree | 74ac2ed794ed59862126a0f725970067780f1c5b | |
| parent | 699e00b8882761171ae454fcf999eef51ab5d118 (diff) | |
| download | mullvadvpn-7cb5f4713766b1b0632dcef053d84b3bacf82fe5.tar.xz mullvadvpn-7cb5f4713766b1b0632dcef053d84b3bacf82fe5.zip | |
Change mullvad_daemon to new ipc with pubsub
| -rw-r--r-- | mullvad_daemon/Cargo.toml | 4 | ||||
| -rw-r--r-- | mullvad_daemon/src/frontend_ipc_router.rs | 145 | ||||
| -rw-r--r-- | mullvad_daemon/src/ipc_api.rs | 90 | ||||
| -rw-r--r-- | mullvad_daemon/src/main.rs | 30 | ||||
| -rw-r--r-- | mullvad_daemon/src/mock_ipc.rs | 185 |
5 files changed, 299 insertions, 155 deletions
diff --git a/mullvad_daemon/Cargo.toml b/mullvad_daemon/Cargo.toml index 42a00a29f6..c3f4e22ae5 100644 --- a/mullvad_daemon/Cargo.toml +++ b/mullvad_daemon/Cargo.toml @@ -8,10 +8,12 @@ description = "The meat of Mullvad, the core if you wish" error-chain = "0.10" serde = "1.0" serde_derive = "1.0" -serde_json = "1.0" log = "0.3" env_logger = "0.4" jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc" } +jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc" } +jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc" } [dependencies.talpid_ipc] path = "../talpid_ipc" diff --git a/mullvad_daemon/src/frontend_ipc_router.rs b/mullvad_daemon/src/frontend_ipc_router.rs deleted file mode 100644 index 1c1b74950c..0000000000 --- a/mullvad_daemon/src/frontend_ipc_router.rs +++ /dev/null @@ -1,145 +0,0 @@ -extern crate jsonrpc_core; -extern crate serde_json; - -use serde; - -static mut MOCK_IS_CONNECTED: bool = false; -type Result<T> = ::std::result::Result<T, String>; - -pub fn build_router() -> jsonrpc_core::IoHandler { - let mut router = jsonrpc_core::IoHandler::default(); - - add_route(&mut router, "login", mock_login); - add_route(&mut router, "logout", mock_logout); - add_route(&mut router, "connect", mock_connect); - add_route(&mut router, "disconnect", mock_disconnect); - add_route(&mut router, "get_connection", mock_get_connection_info); - add_route(&mut router, "get_location", mock_get_location); - - router -} - -fn add_route<T, U, F>(router: &mut jsonrpc_core::IoHandler, method: &str, handler: F) - where for<'de> T: serde::Deserialize<'de>, - U: serde::Serialize + 'static, - F: Fn(&T) -> Result<U> + Send + Sync + 'static -{ - let c = move |params: jsonrpc_core::params::Params| { - println!("Got rpc request with params {:?}", params); - let parsed_params: T = params.parse()?; - - let response: U = handler(&parsed_params) - .map_err( - |e| { - error!("Failed responding to RPC request: {}", e); - jsonrpc_core::Error::internal_error() - }, - )?; - - serde_json::to_value(response).map_err( - |e| { - error!("Unable to serialize response to RPC request: {}", e); - jsonrpc_core::Error::internal_error() - }, - ) - }; - router.add_method(method, c); -} - -#[derive(Deserialize)] -struct LoginRequest { - #[serde(rename = "accountNumber")] - account_number: String, -} -fn mock_login(request: &LoginRequest) -> Result<::std::collections::HashMap<String, String>> { - let ref account_number = request.account_number; - - let mut reply = ::std::collections::HashMap::new(); - - if account_number.starts_with("1111") { - // accounts starting with 1111 expire in one month - reply.insert( - "paidUntil".to_owned(), - "2018-12-31T16:00:00.000Z".to_owned(), - ); - // res.paidUntil = moment().startOf('day').add(15, 'days').toISOString(); - } else if account_number.starts_with("2222") { - // expired in 2013 - reply.insert( - "paidUntil".to_owned(), - "2012-12-31T16:00:00.000Z".to_owned(), - ); - } else if account_number.starts_with("3333") { - // expire in 2038 - reply.insert( - "paidUntil".to_owned(), - "2037-12-31T16:00:00.000Z".to_owned(), - ); - } else { - bail!("you are not welcome {}!", account_number) - } - - Ok(reply) -} - -fn mock_logout(_: &()) -> Result<()> { - Ok(()) -} - -#[derive(Deserialize)] -struct ConnectRequest { - address: String, -} -fn mock_connect(request: &ConnectRequest) -> Result<()> { - let ref server_address = request.address; - if server_address.starts_with("se") { - bail!("{} is unreachable", server_address) - } - - unsafe { MOCK_IS_CONNECTED = true }; - Ok(()) -} - -fn mock_disconnect(_: &()) -> Result<()> { - unsafe { MOCK_IS_CONNECTED = false }; - Ok(()) -} - -#[derive(Serialize)] -struct ConnectionInfo { - ip: String, -} -fn mock_get_connection_info(_: &()) -> Result<ConnectionInfo> { - let ip = if unsafe { MOCK_IS_CONNECTED } { - "1.2.3.4" - } else { - "192.168.1.2" - } - .to_owned(); - - Ok(ConnectionInfo { ip: ip }) -} - -#[derive(Serialize)] -struct Location { - latlong: [u32; 2], - country: String, - city: String, -} -fn mock_get_location(_: &()) -> Result<Location> { - let response = if unsafe { MOCK_IS_CONNECTED } { - Location { - latlong: [1, 2], - country: "narnia".to_owned(), - city: "Le city".to_owned(), - } - } else { - Location { - latlong: [60, 61], - country: "sweden".to_owned(), - city: "bollebygd".to_owned(), - } - }; - - Ok(response) -} diff --git a/mullvad_daemon/src/ipc_api.rs b/mullvad_daemon/src/ipc_api.rs new file mode 100644 index 0000000000..84932a0b8f --- /dev/null +++ b/mullvad_daemon/src/ipc_api.rs @@ -0,0 +1,90 @@ +use jsonrpc_core::Error; +use jsonrpc_core::futures::BoxFuture; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; + +use std::collections::HashMap; +use std::net::IpAddr; + +pub type AccountToken = String; +pub type CountryCode = String; + +build_rpc_trait! { + pub trait IpcApi { + type Metadata; + + /// Fetches and returns metadata about an account. Returns an error on non-existing + /// accounts. + #[rpc(name = "get_account_data")] + fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>; + + /// Returns available countries. + #[rpc(name = "get_countries")] + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>; + + /// Set which account to connect with + #[rpc(name = "set_account")] + fn set_account(&self, AccountToken) -> Result<(), Error>; + + /// Set which country to connect to + #[rpc(name = "set_country")] + fn set_country(&self, CountryCode) -> Result<(), Error>; + + /// Set if the backend should automatically establish a tunnel on start or not. + #[rpc(name = "set_autoconnect")] + fn set_autoconnect(&self, bool) -> Result<(), Error>; + + /// Try to connect if disconnected, or do nothing if already connecting/connected. + #[rpc(name = "connect")] + fn connect(&self) -> Result<(), Error>; + + /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already + /// disconnected. + #[rpc(name = "disconnect")] + fn disconnect(&self) -> Result<(), Error>; + + /// Returns the current security state of the Mullvad client. Changes to this state will + /// be announced to subscribers of `event`. + #[rpc(name = "get_state")] + fn get_state(&self) -> Result<SecurityState, Error>; + + /// Returns the current public IP of this computer. + #[rpc(name = "get_ip")] + fn get_ip(&self) -> Result<IpAddr, Error>; + + /// Performs a geoIP lookup and returns the current location as perceived by the public + /// internet. + #[rpc(name = "get_location")] + fn get_location(&self) -> Result<Location, Error>; + + #[pubsub(name = "event")] { + /// Subscribes to the `event` notifications. + #[rpc(name = "event_subscribe")] + fn subscribe(&self, Self::Metadata, pubsub::Subscriber<String>); + + /// Unsubscribes from the `event` notifications. + #[rpc(name = "event_unsubscribe")] + fn unsubscribe(&self, SubscriptionId) -> BoxFuture<bool, Error>; + } + } +} + +#[derive(Serialize)] +pub struct AccountData { + pub paid_until: String, +} + +#[derive(Serialize)] +pub struct Location { + pub latlong: [f64; 2], + pub country: String, + pub city: String, +} + +#[derive(Serialize)] +pub enum SecurityState { + Unsecured, + Securing, + Secured, + Unsecuring, +} diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index ff4b62dcb3..e660609552 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -3,12 +3,21 @@ extern crate log; extern crate env_logger; #[macro_use] extern crate error_chain; + extern crate serde; #[macro_use] extern crate serde_derive; + extern crate talpid_ipc; -mod frontend_ipc_router; +extern crate jsonrpc_core; +extern crate jsonrpc_pubsub; +#[macro_use] +extern crate jsonrpc_macros; +extern crate jsonrpc_ws_server; + +pub mod ipc_api; +pub mod mock_ipc; error_chain!{} @@ -16,8 +25,9 @@ quick_main!(run); fn run() -> Result<()> { init_logger()?; - let _server = start_ipc()?; - main_loop() + + let server = start_ipc()?; + main_loop(server) } fn init_logger() -> Result<()> { @@ -25,12 +35,14 @@ fn init_logger() -> Result<()> { } fn start_ipc() -> Result<talpid_ipc::IpcServer> { - talpid_ipc::IpcServer::start(frontend_ipc_router::build_router().into(), 0) - .chain_err(|| "Failed to start IPC server") + talpid_ipc::IpcServer::start_with_metadata( + mock_ipc::build_router(), + mock_ipc::meta_extractor, + 0, + ) + .chain_err(|| "Failed to start IPC server") } -fn main_loop() -> Result<()> { - let (_tx, rx) = ::std::sync::mpsc::channel::<u8>(); - let _ = rx.recv(); - Ok(()) +fn main_loop(server: talpid_ipc::IpcServer) -> Result<()> { + server.wait().chain_err(|| "Error while waiting for server to process") } diff --git a/mullvad_daemon/src/mock_ipc.rs b/mullvad_daemon/src/mock_ipc.rs new file mode 100644 index 0000000000..a47e5b03bf --- /dev/null +++ b/mullvad_daemon/src/mock_ipc.rs @@ -0,0 +1,185 @@ +use ipc_api::*; + +use jsonrpc_core::{self, Error, ErrorCode, MetaIoHandler, Metadata}; +use jsonrpc_core::futures::{BoxFuture, Future, future}; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; +use jsonrpc_ws_server; + +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, RwLock, atomic}; + +pub fn build_router() -> MetaIoHandler<Meta> { + let mut io = PubSubHandler::default(); + let rpc = MockIpcApi::default(); + let active_subscriptions = rpc.active.clone(); + + // Spawn a thread that never dies and broadcasts "Hello world!" to all subscribers. + // This is super ugly since it's not in any way connected with the events. But we have to sort + // that out when we know more of the chain between the tunnel monitors and the frontend. + ::std::thread::spawn( + move || loop { + { + let subscribers = active_subscriptions.read().unwrap(); + for sink in subscribers.values() { + let _ = sink.notify(Ok("Hello World!".into())).wait(); + } + } + ::std::thread::sleep(::std::time::Duration::from_secs(1)); + }, + ); + io.extend_with(rpc.to_delegate()); + io.into() +} + + +/// The metadata type. There is one instance associated with each connection. In this pubsub +/// scenario they are created by `From<Sender<String>>::from` by the server on each new incoming +/// connection. +#[derive(Clone, Debug, Default)] +pub struct Meta { + session: Option<Arc<Session>>, +} + +/// Make the `Meta` type possible to use as jsonrpc metadata type. +impl Metadata for Meta {} + +/// Make the `Meta` type possible to use as a pubsub metadata type. +impl PubSubMetadata for Meta { + fn session(&self) -> Option<Arc<Session>> { + self.session.clone() + } +} + +/// Metadata extractor function for `Meta`. +pub fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { + Meta { session: Some(Arc::new(Session::new(context.sender()))) } +} + +/// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for +/// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to +/// a handle) to the jsonrpc client talking with talpid_core. +#[derive(Default)] +pub struct MockIpcApi { + uid: atomic::AtomicUsize, + active: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>, + country: RwLock<CountryCode>, + tunnel_is_up: atomic::AtomicBool, +} + +impl IpcApi for MockIpcApi { + type Metadata = Meta; + + fn get_account_data(&self, account_token: AccountToken) -> Result<AccountData, Error> { + debug!("Login for {}", account_token); + + let paid_until = if account_token.starts_with("1111") { + // accounts starting with 1111 expire in one month + Ok("2018-12-31T16:00:00.000Z".to_owned()) + } else if account_token.starts_with("2222") { + Ok("2012-12-31T16:00:00.000Z".to_owned()) + } else if account_token.starts_with("3333") { + Ok("2037-12-31T16:00:00.000Z".to_owned()) + } else { + Err(jsonrpc_core::Error::invalid_params("You are not welcome")) + }?; + Ok(AccountData { paid_until: paid_until }) + } + + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { + let mut countries = HashMap::new(); + countries.insert("se".to_owned(), "Sweden".to_owned()); + countries.insert("de".to_owned(), "Denmark".to_owned()); + countries.insert("na".to_owned(), "Narnia".to_owned()); + Ok(countries) + } + + fn set_account(&self, _account_token: AccountToken) -> Result<(), Error> { + Ok(()) + } + + fn set_country(&self, country_code: CountryCode) -> Result<(), Error> { + *self.country.write().unwrap() = country_code; + Ok(()) + } + + fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { + Ok(()) + } + + fn connect(&self) -> Result<(), Error> { + if self.country.read().unwrap().starts_with("se") { + Err(jsonrpc_core::Error::invalid_params("Invalid server")) + } else { + self.tunnel_is_up.store(true, atomic::Ordering::SeqCst); + Ok(()) + } + } + + fn disconnect(&self) -> Result<(), Error> { + self.tunnel_is_up.store(false, atomic::Ordering::SeqCst); + Ok(()) + } + + fn get_state(&self) -> Result<SecurityState, Error> { + if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + Ok(SecurityState::Secured) + } else { + Ok(SecurityState::Unsecured) + } + } + + fn get_ip(&self) -> Result<IpAddr, Error> { + let ip = if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)) + } else { + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)) + } + .to_owned(); + Ok(ip) + } + + fn get_location(&self) -> Result<Location, Error> { + Ok( + if self.tunnel_is_up.load(atomic::Ordering::SeqCst) { + Location { + latlong: [1.0, 2.0], + country: "narnia".to_owned(), + city: "Le city".to_owned(), + } + } else { + Location { + latlong: [60.0, 61.0], + country: "sweden".to_owned(), + city: "bollebygd".to_owned(), + } + }, + ) + } + + fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>) { + let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); + let sub_id = SubscriptionId::Number(id as u64); + if let Ok(sink) = subscriber.assign_id(sub_id.clone()) { + debug!("Accepting new subscription with id {}", id); + self.active.write().unwrap().insert(sub_id, sink); + } + } + + fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> { + debug!("Unsubscribing id {:?}", id); + if self.active.write().unwrap().remove(&id).is_some() { + future::ok(true).boxed() + } else { + future::err( + Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription.".into(), + data: None, + }, + ) + .boxed() + } + } +} |
