summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/mod.rs65
1 files changed, 45 insertions, 20 deletions
diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs
index 316a000d77..3ab7b87475 100644
--- a/mullvad-daemon/src/tunnel_state_machine/mod.rs
+++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs
@@ -8,11 +8,12 @@ mod disconnecting_state;
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::path::PathBuf;
+use std::sync::mpsc as sync_mpsc;
use std::thread;
use error_chain::ChainedError;
use futures::sync::mpsc;
-use futures::{Async, Poll, Stream};
+use futures::{Async, Future, Poll, Stream};
use tokio_core::reactor::Core;
use mullvad_types::account::AccountToken;
@@ -26,44 +27,68 @@ use self::disconnected_state::DisconnectedState;
use self::disconnecting_state::{AfterDisconnect, DisconnectingState};
use super::{OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME};
-error_chain!{}
+error_chain! {
+ errors {
+ ReactorError {
+ description("Failed to initialize tunnel state machine event loop executor")
+ }
+ }
+}
/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
pub fn spawn<T>(
state_change_listener: IntoSender<TunnelStateTransition, T>,
-) -> mpsc::UnboundedSender<TunnelCommand>
+) -> Result<mpsc::UnboundedSender<TunnelCommand>>
where
T: From<TunnelStateTransition> + Send + 'static,
{
let (command_tx, command_rx) = mpsc::unbounded();
+ let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
- thread::spawn(move || {
- if let Err(error) = event_loop(command_rx, state_change_listener) {
- error!("{}", error.display_chain());
- }
- });
+ thread::spawn(
+ move || match create_event_loop(command_rx, state_change_listener) {
+ Ok((mut reactor, event_loop)) => {
+ startup_result_tx.send(Ok(())).expect(
+ "Tunnel state machine won't be started because the owner thread crashed",
+ );
+
+ if let Err(error) = reactor.run(event_loop) {
+ let chained_error =
+ Error::with_chain(error, "Tunnel state machine exited with an error");
+ error!("{}", chained_error.display_chain());
+ }
+ }
+ Err(startup_error) => {
+ startup_result_tx
+ .send(Err(startup_error))
+ .expect("Failed to send startup error");
+ }
+ },
+ );
- command_tx
+ startup_result_rx
+ .recv()
+ .expect("Failed to start tunnel state machine thread")
+ .map(|_| command_tx)
}
-fn event_loop<T>(
+fn create_event_loop<T>(
commands: mpsc::UnboundedReceiver<TunnelCommand>,
state_change_listener: IntoSender<TunnelStateTransition, T>,
-) -> Result<()>
+) -> Result<(Core, impl Future<Item = (), Error = Error>)>
where
T: From<TunnelStateTransition> + Send + 'static,
{
- let mut reactor =
- Core::new().chain_err(|| "Failed to initialize tunnel state machine event loop")?;
-
+ let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?;
let state_machine = TunnelStateMachine::new(commands);
- reactor
- .run(state_machine.for_each(|state_change_event| {
- state_change_listener
- .send(state_change_event)
- .chain_err(|| "Failed to send state change event to listener")
- })).chain_err(|| "Tunnel state machine finished with an error")
+ let future = state_machine.for_each(move |state_change_event| {
+ state_change_listener
+ .send(state_change_event)
+ .chain_err(|| "Failed to send state change event to listener")
+ });
+
+ Ok((reactor, future))
}
/// Representation of external commands for the tunnel state machine.