summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2020-08-18 16:04:33 +0200
committerDavid Lönnhager <david.l@mullvad.net>2020-09-01 14:17:21 +0200
commita4f5636ae7f593f6060fc7f993005d3fb5b57b4a (patch)
tree1ac587fd0cfc3c521d9f48017bb126df5c5e2222
parentbf0e1e1d845e75f67a320d3876a488aa1ef0db18 (diff)
downloadmullvadvpn-a4f5636ae7f593f6060fc7f993005d3fb5b57b4a.tar.xz
mullvadvpn-a4f5636ae7f593f6060fc7f993005d3fb5b57b4a.zip
Share tokio runtime between the management interface and daemon
-rw-r--r--Cargo.lock1
-rw-r--r--mullvad-daemon/src/lib.rs62
-rw-r--r--mullvad-daemon/src/main.rs56
-rw-r--r--mullvad-daemon/src/management_interface.rs29
-rw-r--r--mullvad-daemon/src/system_service.rs41
-rw-r--r--mullvad-problem-report/Cargo.toml1
-rw-r--r--mullvad-problem-report/src/lib.rs13
-rw-r--r--mullvad-rpc/Cargo.toml2
-rw-r--r--mullvad-rpc/src/bin/relay_list.rs12
-rw-r--r--mullvad-rpc/src/event_loop.rs14
-rw-r--r--mullvad-rpc/src/lib.rs30
11 files changed, 128 insertions, 133 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 25eefb80d7..d6a7fa6605 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1244,6 +1244,7 @@ dependencies = [
"regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"rs-release 0.1.7 (git+https://github.com/mullvad/rs-release?branch=snailquote-unescape)",
"talpid-types 0.1.0",
+ "tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"winres 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 37cfbecf77..abb51942a6 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -481,7 +481,7 @@ impl<L> Daemon<L>
where
L: EventListener + Clone + Send + 'static,
{
- pub fn start(
+ pub async fn start(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
settings_dir: PathBuf,
@@ -493,8 +493,11 @@ where
let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) =
oneshot::channel();
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir)
- .map_err(Error::InitRpcFactory)?;
+ let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(
+ tokio::runtime::Handle::current(),
+ &cache_dir,
+ )
+ .map_err(Error::InitRpcFactory)?;
let rpc_handle = rpc_runtime.mullvad_rest_handle();
let relay_list_listener = event_listener.clone();
@@ -525,7 +528,7 @@ where
app_version_info.clone(),
settings.show_beta_releases,
);
- rpc_runtime.runtime().spawn(version_updater.run());
+ tokio::spawn(version_updater.run());
let account_history =
account_history::AccountHistory::new(&cache_dir, &settings_dir, rpc_handle.clone())
.map_err(Error::LoadAccountHistory)?;
@@ -559,27 +562,26 @@ where
let tunnel_parameters_generator = MullvadTunnelParametersGenerator {
tx: internal_event_tx.clone(),
};
- let tunnel_command_tx = rpc_runtime
- .runtime()
- .block_on(tunnel_state_machine::spawn(
- settings.allow_lan,
- settings.block_when_disconnected,
- tunnel_parameters_generator,
- log_dir,
- resource_dir,
- cache_dir.clone(),
- internal_event_tx.to_specialized_sender(),
- tunnel_state_machine_shutdown_tx,
- #[cfg(target_os = "android")]
- android_context,
- ))
- .map_err(Error::TunnelError)?;
+ let tunnel_command_tx = tunnel_state_machine::spawn(
+ settings.allow_lan,
+ settings.block_when_disconnected,
+ tunnel_parameters_generator,
+ log_dir,
+ resource_dir,
+ cache_dir.clone(),
+ internal_event_tx.to_specialized_sender(),
+ tunnel_state_machine_shutdown_tx,
+ #[cfg(target_os = "android")]
+ android_context,
+ )
+ .await
+ .map_err(Error::TunnelError)?;
let wireguard_key_manager =
wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone());
// Attempt to download a fresh relay list
- rpc_runtime.runtime().block_on(relay_selector.update());
+ relay_selector.update().await;
let initial_target_state = if settings.get_account_token().is_some() {
if settings.auto_connect {
@@ -667,7 +669,7 @@ where
cb();
}
- rpc_runtime.runtime().block_on(async {
+ rpc_runtime.handle().block_on(async {
let shutdown_signal = tokio::time::timeout(
TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT,
tunnel_state_machine_shutdown_signal,
@@ -979,14 +981,14 @@ where
F: std::future::Future + Send + 'static,
F::Output: Send,
{
- self.rpc_runtime.runtime().spawn(fut);
+ self.rpc_runtime.handle().spawn(fut);
}
fn block_on_future<F>(&mut self, fut: F) -> F::Output
where
F: std::future::Future,
{
- self.rpc_runtime.runtime().block_on(fut)
+ self.rpc_runtime.handle().block_on(fut)
}
@@ -1171,7 +1173,7 @@ where
match &self.tunnel_state {
Disconnected => {
let location = self.get_geo_location();
- self.rpc_runtime.runtime().spawn(async {
+ self.rpc_runtime.handle().spawn(async {
Self::oneshot_send(tx, location.await.ok(), "current location");
});
}
@@ -1184,7 +1186,7 @@ where
Connected { location, .. } => {
let relay_location = location.clone();
let location = self.get_geo_location();
- self.rpc_runtime.runtime().spawn(async {
+ self.rpc_runtime.handle().spawn(async {
Self::oneshot_send(
tx,
location.await.ok().map(|fetched_location| GeoIpLocation {
@@ -1259,7 +1261,7 @@ where
Ok(())
});
- self.rpc_runtime.runtime().spawn(async {
+ self.rpc_runtime.handle().spawn(async {
if future.compat().await.is_err() {
log::error!("Failed to spawn future for creating a new account");
}
@@ -1279,7 +1281,7 @@ where
.map(|expiry| AccountData { expiry });
Self::oneshot_send(tx, result, "account data");
};
- self.rpc_runtime.runtime().spawn(rpc_call);
+ self.rpc_runtime.handle().spawn(rpc_call);
}
fn on_get_www_auth_token(
@@ -1292,7 +1294,7 @@ where
let result = old_future.compat().await;
Self::oneshot_send(tx, result, "get_www_auth_token response");
};
- self.rpc_runtime.runtime().spawn(rpc_call);
+ self.rpc_runtime.handle().spawn(rpc_call);
}
}
@@ -1307,7 +1309,7 @@ where
let result = old_future.compat().await;
Self::oneshot_send(tx, result, "submit_voucher response");
};
- self.rpc_runtime.runtime().spawn(rpc_call);
+ self.rpc_runtime.handle().spawn(rpc_call);
}
}
@@ -1525,7 +1527,7 @@ where
if settings_changed {
self.event_listener
.notify_settings(self.settings.to_settings());
- let runtime = self.rpc_runtime.runtime();
+ let runtime = self.rpc_runtime.handle();
let mut handle = self.version_updater_handle.clone();
runtime.block_on(handle.set_show_beta_releases(enabled));
}
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 9afd231c93..1f58db5caf 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -23,7 +23,17 @@ fn main() {
eprintln!("{}", error);
std::process::exit(1)
});
- let exit_code = match run_platform(config, log_dir) {
+
+ let mut runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build()
+ .unwrap_or_else(|error| {
+ eprintln!("{}", error.display_chain());
+ std::process::exit(1);
+ });
+
+ let exit_code = match runtime.block_on(run_platform(config, log_dir)) {
Ok(_) => 0,
Err(error) => {
error!("{}", error);
@@ -64,7 +74,7 @@ fn get_log_dir(config: &cli::Config) -> Result<Option<PathBuf>, String> {
}
#[cfg(windows)]
-fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
+async fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
if config.run_as_service {
system_service::run()
} else {
@@ -75,33 +85,26 @@ fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), St
}
install_result
} else {
- run_standalone(log_dir)
+ run_standalone(log_dir).await
}
}
}
#[cfg(not(windows))]
-fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
- run_standalone(log_dir)
+async fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
+ run_standalone(log_dir).await
}
-fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
- {
- let mut runtime = tokio::runtime::Builder::new()
- .basic_scheduler()
- .enable_all()
- .build()
- .map_err(|e| e.display_chain())?;
- if runtime.block_on(rpc_uniqueness_check::is_another_instance_running()) {
- return Err("Another instance of the daemon is already running".to_owned());
- }
+async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
+ if rpc_uniqueness_check::is_another_instance_running().await {
+ return Err("Another instance of the daemon is already running".to_owned());
}
if !running_as_admin() {
warn!("Running daemon as a non-administrator user, clients might refuse to connect");
}
- let daemon = create_daemon(log_dir)?;
+ let daemon = create_daemon(log_dir).await?;
let shutdown_handle = daemon.shutdown_handle();
shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown())
@@ -114,7 +117,7 @@ fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
Ok(())
}
-fn create_daemon(
+async fn create_daemon(
log_dir: Option<PathBuf>,
) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> {
let resource_dir = mullvad_paths::get_resource_dir();
@@ -124,7 +127,7 @@ fn create_daemon(
.map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?;
let command_channel = DaemonCommandChannel::new();
- let event_listener = spawn_management_interface(command_channel.sender())?;
+ let event_listener = spawn_management_interface(command_channel.sender()).await?;
Daemon::start(
log_dir,
@@ -134,23 +137,22 @@ fn create_daemon(
event_listener,
command_channel,
)
+ .await
.map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
}
-fn spawn_management_interface(
+async fn spawn_management_interface(
command_sender: DaemonCommandSender,
) -> Result<ManagementInterfaceEventBroadcaster, String> {
- let server = ManagementInterfaceServer::start(command_sender).map_err(|error| {
- error.display_chain_with_msg("Unable to start management interface server")
- })?;
+ let server = ManagementInterfaceServer::start(command_sender)
+ .await
+ .map_err(|error| {
+ error.display_chain_with_msg("Unable to start management interface server")
+ })?;
let event_broadcaster = server.event_broadcaster();
info!("Management interface listening on {}", server.socket_path());
-
- thread::spawn(|| {
- server.wait();
- info!("Management interface shut down");
- });
+ tokio::spawn(server.run());
Ok(event_broadcaster)
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 5fe25fc15c..37c0f5dc12 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -35,10 +35,6 @@ pub enum Error {
// Unable to start the management interface server
#[error(display = "Unable to start management interface server")]
SetupError(#[error(source)] mullvad_management_interface::Error),
-
- // Unable to start the tokio runtime
- #[error(display = "Failed to create the tokio runtime")]
- TokioRuntimeError(#[error(source)] tokio::io::Error),
}
struct ManagementServiceImpl {
@@ -1441,7 +1437,6 @@ fn convert_proto_location(location: types::RelayLocation) -> Constraint<Location
pub struct ManagementInterfaceServer {
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
socket_path: String,
- runtime: tokio::runtime::Runtime,
server_abort_tx: triggered::Trigger,
server_join_handle: Option<
tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>,
@@ -1449,15 +1444,7 @@ pub struct ManagementInterfaceServer {
}
impl ManagementInterfaceServer {
- pub fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
- // TODO: don't spawn a tokio runtime here; make this function async
- let mut runtime = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(1)
- .enable_all()
- .build()
- .map_err(Error::TokioRuntimeError)?;
-
+ pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default();
let socket_path = mullvad_paths::get_rpc_socket_path()
@@ -1470,15 +1457,15 @@ impl ManagementInterfaceServer {
daemon_tx: tunnel_tx,
subscriptions: subscriptions.clone(),
};
- let server_join_handle = runtime.spawn(mullvad_management_interface::spawn_rpc_server(
+ let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server(
server,
start_tx,
server_abort_rx,
));
if let Err(_) = start_rx.recv() {
- return Err(runtime
- .block_on(server_join_handle)
+ return Err(server_join_handle
+ .await
.expect("Failed to resolve quit handle future")
.map_err(Error::SetupError)
.unwrap_err());
@@ -1487,7 +1474,6 @@ impl ManagementInterfaceServer {
Ok(ManagementInterfaceServer {
subscriptions,
socket_path,
- runtime,
server_abort_tx,
server_join_handle: Some(server_join_handle),
})
@@ -1499,7 +1485,6 @@ impl ManagementInterfaceServer {
pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster {
ManagementInterfaceEventBroadcaster {
- runtime: self.runtime.handle().clone(),
subscriptions: self.subscriptions.clone(),
close_handle: self.server_abort_tx.clone(),
}
@@ -1507,11 +1492,12 @@ impl ManagementInterfaceServer {
/// Consumes the server and waits for it to finish. Returns an error if the server exited
/// due to an error.
- pub fn wait(mut self) {
+ pub async fn run(self) {
if let Some(server_join_handle) = self.server_join_handle {
- if let Err(error) = self.runtime.block_on(server_join_handle) {
+ if let Err(error) = server_join_handle.await {
log::error!("Management server panic: {:?}", error);
}
+ log::info!("Management interface shut down");
}
}
}
@@ -1519,7 +1505,6 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
- runtime: tokio::runtime::Handle,
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
close_handle: triggered::Trigger,
}
diff --git a/mullvad-daemon/src/system_service.rs b/mullvad-daemon/src/system_service.rs
index 8b40e8965c..ee7b680de8 100644
--- a/mullvad-daemon/src/system_service.rs
+++ b/mullvad-daemon/src/system_service.rs
@@ -102,21 +102,38 @@ fn run_service() -> Result<(), String> {
let clean_shutdown = Arc::new(AtomicBool::new(false));
let log_dir = crate::get_log_dir(cli::get_config()).expect("Log dir should be available here");
- let result = crate::create_daemon(log_dir).and_then(|daemon| {
- let shutdown_handle = daemon.shutdown_handle();
- // Register monitor that translates `ServiceControl` to Daemon events
- start_event_monitor(
- persistent_service_status.clone(),
- shutdown_handle,
- event_rx,
- clean_shutdown.clone(),
- );
+ let runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build();
+ let mut runtime = match runtime {
+ Err(error) => {
+ persistent_service_status
+ .set_stopped(ServiceExitCode::ServiceSpecific(1))
+ .unwrap();
+ return Err(error.display_chain());
+ }
+ Ok(runtime) => runtime,
+ };
+
+ let result = runtime
+ .block_on(crate::create_daemon(log_dir))
+ .and_then(|daemon| {
+ let shutdown_handle = daemon.shutdown_handle();
+
+ // Register monitor that translates `ServiceControl` to Daemon events
+ start_event_monitor(
+ persistent_service_status.clone(),
+ shutdown_handle,
+ event_rx,
+ clean_shutdown.clone(),
+ );
- persistent_service_status.set_running().unwrap();
+ persistent_service_status.set_running().unwrap();
- daemon.run().map_err(|e| e.display_chain())
- });
+ daemon.run().map_err(|e| e.display_chain())
+ });
let exit_code = match result {
Ok(()) => {
diff --git a/mullvad-problem-report/Cargo.toml b/mullvad-problem-report/Cargo.toml
index 2642fa6acb..68925c5f62 100644
--- a/mullvad-problem-report/Cargo.toml
+++ b/mullvad-problem-report/Cargo.toml
@@ -16,6 +16,7 @@ futures01 = { version = "0.1", package = "futures" }
lazy_static = "1.0"
regex = "1.0"
uuid = { version = "0.7", features = ["v4"] }
+tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] }
mullvad-paths = { path = "../mullvad-paths" }
mullvad-rpc = { path = "../mullvad-rpc" }
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index 019dd82f62..e89bd13aea 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -67,6 +67,9 @@ pub enum Error {
#[error(display = "Error during RPC call")]
SendRpcError(#[error(source)] mullvad_rpc::rest::Error),
+
+ #[error(display = "Unable to spawn Tokio runtime")]
+ CreateRuntime(#[error(source)] io::Error),
}
/// These are errors that can happen during problem report collection.
@@ -263,8 +266,14 @@ pub fn send_problem_report(
let metadata =
ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
- let mut rpc_manager =
- mullvad_rpc::MullvadRpcRuntime::new().map_err(Error::CreateRpcClientError)?;
+ let runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build()
+ .map_err(Error::CreateRuntime)?;
+
+ let mut rpc_manager = mullvad_rpc::MullvadRpcRuntime::new(runtime.handle().clone())
+ .map_err(Error::CreateRpcClientError)?;
let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle());
rpc_client
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml
index e06985e34b..18fa0994e7 100644
--- a/mullvad-rpc/Cargo.toml
+++ b/mullvad-rpc/Cargo.toml
@@ -20,7 +20,7 @@ regex = "1"
serde = "1"
serde_json = "1.0"
hyper-rustls = "0.20"
-tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] }
+tokio = { version = "0.2", features = [ "macros", "time", "rt-threaded", "net", "io-std", "io-driver" ] }
tokio-rustls = "0.13"
urlencoding = "1"
webpki = { version = "0.21", features = [] }
diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs
index 27601d4b21..8213d736da 100644
--- a/mullvad-rpc/src/bin/relay_list.rs
+++ b/mullvad-rpc/src/bin/relay_list.rs
@@ -4,12 +4,16 @@ use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy};
use std::process;
use talpid_types::ErrorExt;
-fn main() {
- let mut runtime = MullvadRpcRuntime::new().expect("Failed to load runtime");
+#[tokio::main]
+async fn main() {
+ let mut runtime =
+ MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime");
- let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()).relay_list();
+ let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle())
+ .relay_list()
+ .await;
- let relay_list = match runtime.runtime().block_on(relay_list_request) {
+ let relay_list = match relay_list_request {
Ok(relay_list) => relay_list,
Err(RestError::TimeoutError(_)) => {
eprintln!("Request timed out");
diff --git a/mullvad-rpc/src/event_loop.rs b/mullvad-rpc/src/event_loop.rs
deleted file mode 100644
index ea93de2493..0000000000
--- a/mullvad-rpc/src/event_loop.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use tokio::runtime::{Builder, Runtime};
-
-/// Creates a new tokio runtime to be exclusively used for HTTP requests.
-// FIXME: Remove this once the daemon has migrated.
-pub fn create_runtime() -> Result<Runtime, crate::Error> {
- let runtime = Builder::new()
- .threaded_scheduler()
- .core_threads(2)
- .enable_all()
- .thread_name("mullvad-rpc-event-loop")
- .build();
-
- runtime.map_err(crate::Error::TokioRuntimeError)
-}
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index 796cb56553..5e2e6f80fa 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -16,7 +16,6 @@ use std::{
use talpid_types::net::wireguard;
-pub mod event_loop;
pub mod rest;
mod cached_dns_resolver;
@@ -44,29 +43,27 @@ const API_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(193, 138, 218, 78));
pub struct MullvadRpcRuntime {
cached_dns_resolver: CachedDnsResolver,
https_connector: HttpsConnectorWithSni,
- runtime: tokio::runtime::Runtime,
+ handle: tokio::runtime::Handle,
}
#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Failed to construct a rest client")]
RestError(#[error(source)] rest::Error),
- #[error(display = "Failed to spawn a tokio runtime")]
- TokioRuntimeError(#[error(source)] tokio::io::Error),
}
impl MullvadRpcRuntime {
/// Create a new `MullvadRpcRuntime`.
- pub fn new() -> Result<Self, Error> {
+ pub fn new(handle: tokio::runtime::Handle) -> Result<Self, Error> {
Ok(MullvadRpcRuntime {
cached_dns_resolver: CachedDnsResolver::new(API_HOST.to_owned(), None, API_IP),
- runtime: event_loop::create_runtime()?,
https_connector: HttpsConnectorWithSni::new(),
+ handle,
})
}
/// Create a new `MullvadRpcRuntime` using the specified cache directory.
- pub fn with_cache_dir(cache_dir: &Path) -> Result<Self, Error> {
+ pub fn with_cache_dir(handle: tokio::runtime::Handle, cache_dir: &Path) -> Result<Self, Error> {
let cache_file = cache_dir.join(API_IP_CACHE_FILENAME);
let cached_dns_resolver =
CachedDnsResolver::new(API_HOST.to_owned(), Some(cache_file), API_IP);
@@ -75,8 +72,8 @@ impl MullvadRpcRuntime {
Ok(MullvadRpcRuntime {
cached_dns_resolver,
- runtime: event_loop::create_runtime()?,
https_connector,
+ handle,
})
}
@@ -85,9 +82,9 @@ impl MullvadRpcRuntime {
let mut https_connector = self.https_connector.clone();
https_connector.set_sni_hostname(sni_hostname);
- let service = rest::RequestService::new(https_connector, self.runtime.handle().clone());
+ let service = rest::RequestService::new(https_connector, self.handle.clone());
let handle = service.handle();
- self.runtime.spawn(service.into_future());
+ self.handle.spawn(service.into_future());
handle
}
@@ -106,17 +103,8 @@ impl MullvadRpcRuntime {
self.new_request_service(None)
}
- pub fn runtime(&mut self) -> &mut tokio::runtime::Runtime {
- &mut self.runtime
- }
-}
-
-impl Drop for MullvadRpcRuntime {
- fn drop(&mut self) {
- if let Ok(runtime) = event_loop::create_runtime() {
- let old_runtime = std::mem::replace(&mut self.runtime, runtime);
- old_runtime.shutdown_timeout(std::time::Duration::from_secs(1));
- }
+ pub fn handle(&mut self) -> &mut tokio::runtime::Handle {
+ &mut self.handle
}
}