summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2024-04-20 17:46:41 +0200
committerDavid Lönnhager <david.l@mullvad.net>2024-04-30 16:22:52 +0200
commitd2cc168d13df8fa2c84c0bf40dc48e67dbcfa675 (patch)
tree4ea31774990515c6daa5dba92bf2e69d822a8ed8
parent5e947ca34ca69e234f8de240a321aac0b27f4419 (diff)
downloadmullvadvpn-d2cc168d13df8fa2c84c0bf40dc48e67dbcfa675.tar.xz
mullvadvpn-d2cc168d13df8fa2c84c0bf40dc48e67dbcfa675.zip
Refactor process monitor
-rw-r--r--talpid-core/src/split_tunnel/macos/process.rs178
1 files changed, 93 insertions, 85 deletions
diff --git a/talpid-core/src/split_tunnel/macos/process.rs b/talpid-core/src/split_tunnel/macos/process.rs
index a69c05b55b..f913b9bb49 100644
--- a/talpid-core/src/split_tunnel/macos/process.rs
+++ b/talpid-core/src/split_tunnel/macos/process.rs
@@ -31,9 +31,6 @@ static MIN_OS_VERSION: Lazy<MacosVersion> =
#[derive(thiserror::Error, Debug)]
pub enum Error {
- /// Failed to detect macOS version
- #[error("Failed to detect macOS version")]
- DetectMacosVersion(#[source] io::Error),
/// Only macOS 13 and later is supported
#[error("Unsupported macOS version: {actual}, expected at least {}", *MIN_OS_VERSION)]
UnsupportedMacosVersion {
@@ -73,89 +70,10 @@ impl ProcessMonitor {
check_os_version_support()?;
let states = ProcessStates::new()?;
- let mut cmd = tokio::process::Command::new("/usr/bin/eslogger");
- cmd.args(["exec", "fork", "exit"])
- .kill_on_drop(true)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped());
-
- let mut proc = cmd.spawn().map_err(Error::StartMonitor)?;
-
- let stdout = proc.stdout.take().unwrap();
- let stderr = proc.stderr.take().unwrap();
-
- let states_clone = states.clone();
-
+ let proc = spawn_eslogger()?;
let (stop_proc_tx, stop_rx): (_, oneshot::Receiver<oneshot::Sender<_>>) =
oneshot::channel();
-
- let mut proc_task = tokio::spawn(async move {
- tokio::spawn(async move {
- let reader = BufReader::new(stdout);
- let mut lines = reader.lines();
-
- while let Ok(Some(line)) = lines.next_line().await {
- // Each line from eslogger is a JSON object, one of several types of messages;
- // see `ESMessage`
- let val: ESMessage = match serde_json::from_str(&line) {
- Ok(val) => val,
- Err(error) => {
- log::error!("Failed to parse eslogger message: {error}");
- continue;
- }
- };
-
- let mut inner = states_clone.inner.lock().unwrap();
- inner.handle_message(val);
- }
- });
- let last_stderr = tokio::spawn(async move {
- let reader = BufReader::new(stderr);
- let mut lines = reader.lines();
- let mut last_error = None;
-
- while let Ok(Some(line)) = lines.next_line().await {
- last_error = Some(line);
- }
- last_error
- });
-
- let result = tokio::select! {
- result = proc.wait() => {
- match result {
- Ok(status) => {
- if let Ok(Some(last_error)) = last_stderr.await {
- log::error!("eslogger error: {last_error}");
- if let Some(error) = parse_eslogger_error(&last_error) {
- return Err(error);
- }
- }
- Err(Error::MonitorFailed(io::Error::other(format!("eslogger stopped unexpectedly: {status}"))))
- }
- Err(error) => Err(Error::MonitorFailed(error)),
- }
- }
- Ok(response_tx) = stop_rx => {
- if let Err(error) = proc.kill().await {
- log::error!("Failed to kill eslogger: {error}");
- }
- if tokio::time::timeout(SHUTDOWN_TIMEOUT, proc.wait())
- .await
- .is_err()
- {
- log::error!("Failed to wait for ST process handler");
- }
- let _ = response_tx.send(());
-
- Ok(())
- }
- };
-
- log::debug!("Process monitor stopped");
-
- result
- });
+ let mut proc_task = tokio::spawn(handle_eslogger_output(proc, states.clone(), stop_rx));
match tokio::time::timeout(EARLY_FAIL_TIMEOUT, &mut proc_task).await {
// On timeout, all is well
@@ -174,6 +92,96 @@ impl ProcessMonitor {
}
}
+/// Run until the process exits or `stop_rx` is signaled
+async fn handle_eslogger_output(
+ mut proc: tokio::process::Child,
+ states: ProcessStates,
+ stop_rx: oneshot::Receiver<oneshot::Sender<()>>,
+) -> Result<(), Error> {
+ let stdout = proc.stdout.take().unwrap();
+ let stderr = proc.stderr.take().unwrap();
+
+ // Parse each line from stdout as an ESMessage
+ tokio::spawn(async move {
+ let reader = BufReader::new(stdout);
+ let mut lines = reader.lines();
+
+ while let Ok(Some(line)) = lines.next_line().await {
+ // Each line from eslogger is a JSON object, one of several types of messages;
+ // see `ESMessage`
+ let val: ESMessage = match serde_json::from_str(&line) {
+ Ok(val) => val,
+ Err(error) => {
+ log::error!("Failed to parse eslogger message: {error}");
+ continue;
+ }
+ };
+
+ let mut inner = states.inner.lock().unwrap();
+ inner.handle_message(val);
+ }
+ });
+
+ // Store the most recent stderr line in case we need to return an error
+ let last_stderr = tokio::spawn(async move {
+ let reader = BufReader::new(stderr);
+ let mut lines = reader.lines();
+ let mut last_error = None;
+
+ while let Ok(Some(line)) = lines.next_line().await {
+ last_error = Some(line);
+ }
+ last_error
+ });
+
+ // Wait for a stop signal or process exit
+ let result = tokio::select! {
+ result = proc.wait() => {
+ match result {
+ Ok(status) => {
+ if let Ok(Some(last_error)) = last_stderr.await {
+ log::error!("eslogger error: {last_error}");
+ if let Some(error) = parse_eslogger_error(&last_error) {
+ return Err(error);
+ }
+ }
+ Err(Error::MonitorFailed(io::Error::other(format!("eslogger stopped unexpectedly: {status}"))))
+ }
+ Err(error) => Err(Error::MonitorFailed(error)),
+ }
+ }
+ Ok(response_tx) = stop_rx => {
+ if let Err(error) = proc.kill().await {
+ log::error!("Failed to kill eslogger: {error}");
+ }
+ if tokio::time::timeout(SHUTDOWN_TIMEOUT, proc.wait())
+ .await
+ .is_err()
+ {
+ log::error!("Failed to wait for ST process handler");
+ }
+ let _ = response_tx.send(());
+
+ Ok(())
+ }
+ };
+
+ log::debug!("Process monitor stopped");
+
+ result
+}
+
+/// Launch a new instance of `eslogger`, listening for exec, fork, and exit syscalls
+fn spawn_eslogger() -> Result<tokio::process::Child, Error> {
+ let mut cmd = tokio::process::Command::new("/usr/bin/eslogger");
+ cmd.args(["exec", "fork", "exit"])
+ .kill_on_drop(true)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped());
+ cmd.spawn().map_err(Error::StartMonitor)
+}
+
impl ProcessMonitorHandle {
pub async fn shutdown(&mut self) {
let Some(stop_tx) = self.stop_proc_tx.take() else {
@@ -473,7 +481,7 @@ fn parse_eslogger_error(stderr_str: &str) -> Option<Error> {
/// Check whether the current macOS version is supported, and return an error otherwise
fn check_os_version_support() -> Result<(), Error> {
- match MacosVersion::new().map_err(Error::DetectMacosVersion) {
+ match MacosVersion::new() {
Ok(version) => check_os_version_support_inner(version),
Err(error) => {
log::error!("Failed to detect macOS version: {error}");