summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad_daemon/src/main.rs11
-rw-r--r--mullvad_daemon/src/mock_ipc.rs92
2 files changed, 71 insertions, 32 deletions
diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs
index e660609552..5bd9563bfd 100644
--- a/mullvad_daemon/src/main.rs
+++ b/mullvad_daemon/src/main.rs
@@ -34,15 +34,10 @@ fn init_logger() -> Result<()> {
env_logger::init().chain_err(|| "Failed to bootstrap logging system")
}
-fn start_ipc() -> Result<talpid_ipc::IpcServer> {
- talpid_ipc::IpcServer::start_with_metadata(
- mock_ipc::build_router(),
- mock_ipc::meta_extractor,
- 0,
- )
- .chain_err(|| "Failed to start IPC server")
+fn start_ipc() -> Result<mock_ipc::IpcServer> {
+ mock_ipc::IpcServer::start().chain_err(|| "Failed to start IPC server")
}
-fn main_loop(server: talpid_ipc::IpcServer) -> Result<()> {
+fn main_loop(server: mock_ipc::IpcServer) -> Result<()> {
server.wait().chain_err(|| "Error while waiting for server to process")
}
diff --git a/mullvad_daemon/src/mock_ipc.rs b/mullvad_daemon/src/mock_ipc.rs
index a47e5b03bf..41b0d894a8 100644
--- a/mullvad_daemon/src/mock_ipc.rs
+++ b/mullvad_daemon/src/mock_ipc.rs
@@ -1,6 +1,6 @@
use ipc_api::*;
-use jsonrpc_core::{self, Error, ErrorCode, MetaIoHandler, Metadata};
+use jsonrpc_core::{self, Error, ErrorCode, Metadata};
use jsonrpc_core::futures::{BoxFuture, Future, future};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
@@ -10,30 +10,64 @@ use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::{Arc, RwLock, atomic};
-pub fn build_router() -> MetaIoHandler<Meta> {
- let mut io = PubSubHandler::default();
- let rpc = MockIpcApi::default();
- let active_subscriptions = rpc.active.clone();
+use talpid_ipc;
- // Spawn a thread that never dies and broadcasts "Hello world!" to all subscribers.
- // This is super ugly since it's not in any way connected with the events. But we have to sort
- // that out when we know more of the chain between the tunnel monitors and the frontend.
- ::std::thread::spawn(
- move || loop {
- {
- let subscribers = active_subscriptions.read().unwrap();
- for sink in subscribers.values() {
- let _ = sink.notify(Ok("Hello World!".into())).wait();
+type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>;
+
+pub struct IpcServer {
+ server: talpid_ipc::IpcServer,
+}
+
+impl IpcServer {
+ pub fn start() -> talpid_ipc::Result<Self> {
+ let active_subscriptions = ActiveSubscriptions::default();
+ let mut last_error = None;
+ for i in 0..10 {
+ match Self::try_start(active_subscriptions.clone(), i) {
+ Ok(server) => {
+ Self::spawn_broadcast_thread(active_subscriptions);
+ return Ok(IpcServer { server });
}
+ Err(e) => last_error = Some(e),
}
- ::std::thread::sleep(::std::time::Duration::from_secs(1));
- },
- );
- io.extend_with(rpc.to_delegate());
- io.into()
+ }
+ bail!(last_error.unwrap());
+ }
+
+ pub fn address(&self) -> &str {
+ &self.server.address()
+ }
+
+ pub fn wait(self) -> talpid_ipc::Result<()> {
+ self.server.wait()
+ }
+
+ fn try_start(active_subscriptions: ActiveSubscriptions,
+ port_offset: u8)
+ -> talpid_ipc::Result<talpid_ipc::IpcServer> {
+ let rpc = MockIpcApi::new(active_subscriptions);
+ let mut io = PubSubHandler::default();
+ io.extend_with(rpc.to_delegate());
+ talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor, port_offset)
+ }
+
+ fn spawn_broadcast_thread(active_subscriptions: ActiveSubscriptions) {
+ ::std::thread::spawn(
+ move || loop {
+ {
+ let subscribers = active_subscriptions.read().unwrap();
+ for sink in subscribers.values() {
+ let _ = sink.notify(Ok("Hello World!".into())).wait();
+ }
+ }
+ ::std::thread::sleep(::std::time::Duration::from_secs(1));
+ },
+ );
+ }
}
+
/// The metadata type. There is one instance associated with each connection. In this pubsub
/// scenario they are created by `From<Sender<String>>::from` by the server on each new incoming
/// connection.
@@ -53,21 +87,31 @@ impl PubSubMetadata for Meta {
}
/// Metadata extractor function for `Meta`.
-pub fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta {
+fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta {
Meta { session: Some(Arc::new(Session::new(context.sender()))) }
}
/// A mock implementation of the Mullvad frontend API. A very simplified explanation is that for
/// the real implementation `tunnel_is_up` should be replaced with some kind of handle (or proxy to
/// a handle) to the jsonrpc client talking with talpid_core.
-#[derive(Default)]
pub struct MockIpcApi {
- uid: atomic::AtomicUsize,
- active: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<String>>>>,
+ next_subscription_id: atomic::AtomicUsize,
+ active: ActiveSubscriptions,
country: RwLock<CountryCode>,
tunnel_is_up: atomic::AtomicBool,
}
+impl MockIpcApi {
+ pub fn new(active: ActiveSubscriptions) -> Self {
+ MockIpcApi {
+ next_subscription_id: atomic::AtomicUsize::new(0),
+ active: active,
+ country: RwLock::new("se".to_owned()),
+ tunnel_is_up: atomic::AtomicBool::new(false),
+ }
+ }
+}
+
impl IpcApi for MockIpcApi {
type Metadata = Meta;
@@ -159,7 +203,7 @@ impl IpcApi for MockIpcApi {
}
fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<String>) {
- let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
+ let id = self.next_subscription_id.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
if let Ok(sink) = subscriber.assign_id(sub_id.clone()) {
debug!("Accepting new subscription with id {}", id);