diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-06-12 10:25:26 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-06-14 12:14:24 +0200 |
| commit | 26f4d41a8103039e07402ee1ff9d3e94f618a5e1 (patch) | |
| tree | ca4c48ba67e9b8245b1d6943b04a98ad77aee63a | |
| parent | 06cfc7b7d5ec0d8b47e81d48ced2ef9c4c54f363 (diff) | |
| download | mullvadvpn-26f4d41a8103039e07402ee1ff9d3e94f618a5e1.tar.xz mullvadvpn-26f4d41a8103039e07402ee1ff9d3e94f618a5e1.zip | |
Add Daemon and main event handling
| -rw-r--r-- | mullvad_daemon/Cargo.toml | 5 | ||||
| -rw-r--r-- | mullvad_daemon/src/main.rs | 305 |
2 files changed, 292 insertions, 18 deletions
diff --git a/mullvad_daemon/Cargo.toml b/mullvad_daemon/Cargo.toml index d98c32b0b1..ce2ffafa98 100644 --- a/mullvad_daemon/Cargo.toml +++ b/mullvad_daemon/Cargo.toml @@ -15,9 +15,10 @@ jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close- jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } uuid = { version = "0.5", features = ["v4"] } +lazy_static = "0.2" -[dependencies.talpid_ipc] -path = "../talpid_ipc" +talpid_core = { path = "../talpid_core" } +talpid_ipc = { path = "../talpid_ipc" } [dev-dependencies] assert_matches = "1.0" diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index 95033b1564..8ab172b464 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -8,37 +8,310 @@ extern crate serde; #[macro_use] extern crate serde_derive; -extern crate talpid_ipc; - extern crate jsonrpc_core; extern crate jsonrpc_pubsub; #[macro_use] extern crate jsonrpc_macros; extern crate jsonrpc_ws_server; +extern crate uuid; +#[macro_use] +extern crate lazy_static; + +extern crate talpid_core; +extern crate talpid_ipc; + +mod management_interface; +mod states; + +use management_interface::{ManagementInterfaceServer, TunnelCommand}; +use states::{SecurityState, TargetState}; + +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; + +use talpid_core::net::RemoteAddr; +use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; + +error_chain!{ + errors { + /// The client is in the wrong state for the requested operation. Optimally the code should + /// be written in such a way so such states can't exist. + InvalidState { + description("Client is in an invalid state for the requested operation") + } + TunnelError(msg: &'static str) { + description("Error in the tunnel monitor") + display("Tunnel monitor error: {}", msg) + } + ManagementInterfaceError(msg: &'static str) { + description("Error in the management interface") + display("Management interface error: {}", msg) + } + } +} + +lazy_static! { + // Temporary store of hardcoded remotes. + static ref REMOTES: [RemoteAddr; 3] = [ + RemoteAddr::new("se5.mullvad.net", 1300), + RemoteAddr::new("se6.mullvad.net", 1300), + RemoteAddr::new("se7.mullvad.net", 1300), + ]; +} + +pub enum DaemonEvent { + TunnelEvent(TunnelEvent), + TunnelExit(tunnel::Result<()>), + ManagementInterfaceEvent(TunnelCommand), + ManagementInterfaceExit(talpid_ipc::Result<()>), +} + +impl From<TunnelEvent> for DaemonEvent { + fn from(tunnel_event: TunnelEvent) -> Self { + DaemonEvent::TunnelEvent(tunnel_event) + } +} + +impl From<TunnelCommand> for DaemonEvent { + fn from(tunnel_command: TunnelCommand) -> Self { + DaemonEvent::ManagementInterfaceEvent(tunnel_command) + } +} + +/// Represents the internal state of the actual tunnel. +#[derive(Debug, Eq, PartialEq)] +pub enum TunnelState { + /// No tunnel is running. + NotRunning, + /// The tunnel has been started, but it is not established/functional. + Down, + /// The tunnel is up and working. + Up, +} + +impl TunnelState { + pub fn as_security_state(&self) -> SecurityState { + match *self { + TunnelState::Up => SecurityState::Secured, + _ => SecurityState::Unsecured, + } + } +} + -pub mod ipc_api; -pub mod mock_ipc; +struct Daemon { + state: TunnelState, + last_broadcasted_state: SecurityState, + target_state: TargetState, + rx: mpsc::Receiver<DaemonEvent>, + tx: mpsc::Sender<DaemonEvent>, + tunnel_close_handle: Option<tunnel::CloseHandle>, + management_interface_subscribers: management_interface::EventBroadcaster, + + // Just for testing. A cyclic iterator iterating over the hardcoded remotes, + // picking a new one for each retry. + remote_iter: std::iter::Cycle<std::iter::Cloned<std::slice::Iter<'static, RemoteAddr>>>, +} + +impl Daemon { + pub fn new() -> Result<Self> { + let (tx, rx) = mpsc::channel(); + let management_interface_subscribers = Self::start_management_interface(tx.clone())?; + Ok( + Daemon { + state: TunnelState::NotRunning, + last_broadcasted_state: SecurityState::Unsecured, + target_state: TargetState::Unsecured, + rx, + tx, + tunnel_close_handle: None, + remote_iter: REMOTES.iter().cloned().cycle(), + management_interface_subscribers, + }, + ) + } + + // Starts the management interface and spawns a thread that will process it. + // Returns a handle that allows notifying all subscribers on events. + fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) + -> Result<management_interface::EventBroadcaster> { + let server = Self::start_management_interface_server(event_tx.clone())?; + let event_broadcaster = server.event_broadcaster(); + Self::spawn_management_interface_wait_thread(server, event_tx); + Ok(event_broadcaster) + } + + fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>) + -> Result<ManagementInterfaceServer> { + let server = + ManagementInterfaceServer::start(event_tx.clone()) + .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; + info!( + "Mullvad management interface listening on {}", + server.address() + ); + Ok(server) + } + + fn spawn_management_interface_wait_thread(server: ManagementInterfaceServer, + exit_tx: mpsc::Sender<DaemonEvent>) { + thread::spawn( + move || { + let result = server.wait(); + debug!("Mullvad management interface shut down"); + let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExit(result)); + }, + ); + } + + /// Consume the `Daemon` and run the main event loop. Blocks until an error happens. + pub fn run(mut self) -> Result<()> { + while let Ok(event) = self.rx.recv() { + self.handle_event(event)?; + } + Ok(()) + } + + fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { + use DaemonEvent::*; + match event { + TunnelEvent(event) => Ok(self.handle_tunnel_event(event)), + TunnelExit(result) => self.handle_tunnel_exit(result), + ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), + ManagementInterfaceExit(result) => self.handle_management_interface_exit(result), + } + } + + fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) { + info!("Tunnel event: {:?}", tunnel_event); + let new_state = match tunnel_event { + TunnelEvent::Up => TunnelState::Up, + TunnelEvent::Down => TunnelState::Down, + }; + self.set_state(new_state); + } + + fn handle_tunnel_exit(&mut self, result: tunnel::Result<()>) -> Result<()> { + self.tunnel_close_handle = None; + if let Err(e) = result { + error!("Tunnel exited in an unexpected way:"); + for e in e.iter() { + error!("Caused by {}", e); + } + } + self.set_state(TunnelState::NotRunning); + if self.target_state == TargetState::Secured { + self.start_tunnel()?; + } + Ok(()) + } + + fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> { + match event { + TunnelCommand::SetTargetState(state) => self.set_target_state(state)?, + TunnelCommand::GetState(tx) => { + if let Err(_) = tx.send(self.last_broadcasted_state) { + warn!("Unable to send current state to management interface client",); + } + } + } + Ok(()) + } + + fn handle_management_interface_exit(&self, result: talpid_ipc::Result<()>) -> Result<()> { + let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly"); + match result { + Ok(()) => Err(error.into()), + e => e.chain_err(|| error), + } + } + + /// Set the target state of the client. If it changed trigger the operations needed to progress + /// towards that state. + fn set_target_state(&mut self, new_state: TargetState) -> Result<()> { + if new_state != self.target_state { + self.target_state = new_state; + match self.target_state { + TargetState::Secured => { + if self.state == TunnelState::NotRunning { + debug!("Triggering tunnel start from management interface event"); + self.start_tunnel()?; + } + } + TargetState::Unsecured => { + if let Some(ref close_handle) = self.tunnel_close_handle { + debug!("Triggering tunnel stop from management interface event"); + close_handle + .close() + .chain_err(|| ErrorKind::TunnelError("Unable to kill tunnel"))?; + } + } + } + } + Ok(()) + } + + /// Update the state of the client. If it changed, notify the subscribers. + fn set_state(&mut self, new_state: TunnelState) { + if new_state != self.state { + self.state = new_state; + let new_security_state = self.state.as_security_state(); + if self.last_broadcasted_state != new_security_state { + self.last_broadcasted_state = new_security_state; + self.management_interface_subscribers.notify_new_state(new_security_state); + } + } + } + + fn start_tunnel(&mut self) -> Result<()> { + ensure!( + self.state == TunnelState::NotRunning, + ErrorKind::InvalidState + ); + let remote = self.remote_iter.next().unwrap(); + let tunnel_monitor = self.spawn_tunnel_monitor(remote)?; + self.tunnel_close_handle = Some(tunnel_monitor.close_handle()); + self.spawn_tunnel_monitor_wait_thread(tunnel_monitor); + + self.set_state(TunnelState::Down); + Ok(()) + } + + fn spawn_tunnel_monitor(&self, remote: RemoteAddr) -> Result<TunnelMonitor> { + // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync + let event_tx = Arc::new(Mutex::new(self.tx.clone())); + let on_tunnel_event = move |event| { + let _ = event_tx.lock().unwrap().send(DaemonEvent::TunnelEvent(event)); + }; + TunnelMonitor::new(remote, on_tunnel_event) + .chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor")) + } + + fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) { + let error_tx = self.tx.clone(); + thread::spawn( + move || { + let result = tunnel_monitor.wait(); + let _ = error_tx.send(DaemonEvent::TunnelExit(result)); + trace!("Tunnel monitor thread exit"); + }, + ); + } +} -error_chain!{} quick_main!(run); fn run() -> Result<()> { init_logger()?; - let server = start_ipc()?; - info!("Mullvad daemon listening on {}", server.address()); - main_loop(server) + let daemon = Daemon::new().chain_err(|| "Unable to initialize daemon")?; + daemon.run()?; + + debug!("Mullvad daemon is quitting"); + Ok(()) } fn init_logger() -> Result<()> { env_logger::init().chain_err(|| "Failed to bootstrap logging system") } - -fn start_ipc() -> Result<mock_ipc::IpcServer> { - mock_ipc::IpcServer::start().chain_err(|| "Failed to start IPC server") -} - -fn main_loop(server: mock_ipc::IpcServer) -> Result<()> { - server.wait().chain_err(|| "Error while waiting for server to process") -} |
