diff options
| author | David Lönnhager <david.l@mullvad.net> | 2024-04-20 17:46:41 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2024-04-30 16:22:52 +0200 |
| commit | d2cc168d13df8fa2c84c0bf40dc48e67dbcfa675 (patch) | |
| tree | 4ea31774990515c6daa5dba92bf2e69d822a8ed8 | |
| parent | 5e947ca34ca69e234f8de240a321aac0b27f4419 (diff) | |
| download | mullvadvpn-d2cc168d13df8fa2c84c0bf40dc48e67dbcfa675.tar.xz mullvadvpn-d2cc168d13df8fa2c84c0bf40dc48e67dbcfa675.zip | |
Refactor process monitor
| -rw-r--r-- | talpid-core/src/split_tunnel/macos/process.rs | 178 |
1 files changed, 93 insertions, 85 deletions
diff --git a/talpid-core/src/split_tunnel/macos/process.rs b/talpid-core/src/split_tunnel/macos/process.rs index a69c05b55b..f913b9bb49 100644 --- a/talpid-core/src/split_tunnel/macos/process.rs +++ b/talpid-core/src/split_tunnel/macos/process.rs @@ -31,9 +31,6 @@ static MIN_OS_VERSION: Lazy<MacosVersion> = #[derive(thiserror::Error, Debug)] pub enum Error { - /// Failed to detect macOS version - #[error("Failed to detect macOS version")] - DetectMacosVersion(#[source] io::Error), /// Only macOS 13 and later is supported #[error("Unsupported macOS version: {actual}, expected at least {}", *MIN_OS_VERSION)] UnsupportedMacosVersion { @@ -73,89 +70,10 @@ impl ProcessMonitor { check_os_version_support()?; let states = ProcessStates::new()?; - let mut cmd = tokio::process::Command::new("/usr/bin/eslogger"); - cmd.args(["exec", "fork", "exit"]) - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let mut proc = cmd.spawn().map_err(Error::StartMonitor)?; - - let stdout = proc.stdout.take().unwrap(); - let stderr = proc.stderr.take().unwrap(); - - let states_clone = states.clone(); - + let proc = spawn_eslogger()?; let (stop_proc_tx, stop_rx): (_, oneshot::Receiver<oneshot::Sender<_>>) = oneshot::channel(); - - let mut proc_task = tokio::spawn(async move { - tokio::spawn(async move { - let reader = BufReader::new(stdout); - let mut lines = reader.lines(); - - while let Ok(Some(line)) = lines.next_line().await { - // Each line from eslogger is a JSON object, one of several types of messages; - // see `ESMessage` - let val: ESMessage = match serde_json::from_str(&line) { - Ok(val) => val, - Err(error) => { - log::error!("Failed to parse eslogger message: {error}"); - continue; - } - }; - - let mut inner = states_clone.inner.lock().unwrap(); - inner.handle_message(val); - } - }); - let last_stderr = tokio::spawn(async move { - let reader = BufReader::new(stderr); - let mut lines = reader.lines(); - let mut last_error = None; - - while let Ok(Some(line)) = lines.next_line().await { - last_error = Some(line); - } - last_error - }); - - let result = tokio::select! { - result = proc.wait() => { - match result { - Ok(status) => { - if let Ok(Some(last_error)) = last_stderr.await { - log::error!("eslogger error: {last_error}"); - if let Some(error) = parse_eslogger_error(&last_error) { - return Err(error); - } - } - Err(Error::MonitorFailed(io::Error::other(format!("eslogger stopped unexpectedly: {status}")))) - } - Err(error) => Err(Error::MonitorFailed(error)), - } - } - Ok(response_tx) = stop_rx => { - if let Err(error) = proc.kill().await { - log::error!("Failed to kill eslogger: {error}"); - } - if tokio::time::timeout(SHUTDOWN_TIMEOUT, proc.wait()) - .await - .is_err() - { - log::error!("Failed to wait for ST process handler"); - } - let _ = response_tx.send(()); - - Ok(()) - } - }; - - log::debug!("Process monitor stopped"); - - result - }); + let mut proc_task = tokio::spawn(handle_eslogger_output(proc, states.clone(), stop_rx)); match tokio::time::timeout(EARLY_FAIL_TIMEOUT, &mut proc_task).await { // On timeout, all is well @@ -174,6 +92,96 @@ impl ProcessMonitor { } } +/// Run until the process exits or `stop_rx` is signaled +async fn handle_eslogger_output( + mut proc: tokio::process::Child, + states: ProcessStates, + stop_rx: oneshot::Receiver<oneshot::Sender<()>>, +) -> Result<(), Error> { + let stdout = proc.stdout.take().unwrap(); + let stderr = proc.stderr.take().unwrap(); + + // Parse each line from stdout as an ESMessage + tokio::spawn(async move { + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + + while let Ok(Some(line)) = lines.next_line().await { + // Each line from eslogger is a JSON object, one of several types of messages; + // see `ESMessage` + let val: ESMessage = match serde_json::from_str(&line) { + Ok(val) => val, + Err(error) => { + log::error!("Failed to parse eslogger message: {error}"); + continue; + } + }; + + let mut inner = states.inner.lock().unwrap(); + inner.handle_message(val); + } + }); + + // Store the most recent stderr line in case we need to return an error + let last_stderr = tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + let mut last_error = None; + + while let Ok(Some(line)) = lines.next_line().await { + last_error = Some(line); + } + last_error + }); + + // Wait for a stop signal or process exit + let result = tokio::select! { + result = proc.wait() => { + match result { + Ok(status) => { + if let Ok(Some(last_error)) = last_stderr.await { + log::error!("eslogger error: {last_error}"); + if let Some(error) = parse_eslogger_error(&last_error) { + return Err(error); + } + } + Err(Error::MonitorFailed(io::Error::other(format!("eslogger stopped unexpectedly: {status}")))) + } + Err(error) => Err(Error::MonitorFailed(error)), + } + } + Ok(response_tx) = stop_rx => { + if let Err(error) = proc.kill().await { + log::error!("Failed to kill eslogger: {error}"); + } + if tokio::time::timeout(SHUTDOWN_TIMEOUT, proc.wait()) + .await + .is_err() + { + log::error!("Failed to wait for ST process handler"); + } + let _ = response_tx.send(()); + + Ok(()) + } + }; + + log::debug!("Process monitor stopped"); + + result +} + +/// Launch a new instance of `eslogger`, listening for exec, fork, and exit syscalls +fn spawn_eslogger() -> Result<tokio::process::Child, Error> { + let mut cmd = tokio::process::Command::new("/usr/bin/eslogger"); + cmd.args(["exec", "fork", "exit"]) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + cmd.spawn().map_err(Error::StartMonitor) +} + impl ProcessMonitorHandle { pub async fn shutdown(&mut self) { let Some(stop_tx) = self.stop_proc_tx.take() else { @@ -473,7 +481,7 @@ fn parse_eslogger_error(stderr_str: &str) -> Option<Error> { /// Check whether the current macOS version is supported, and return an error otherwise fn check_os_version_support() -> Result<(), Error> { - match MacosVersion::new().map_err(Error::DetectMacosVersion) { + match MacosVersion::new() { Ok(version) => check_os_version_support_inner(version), Err(error) => { log::error!("Failed to detect macOS version: {error}"); |
