summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock317
-rw-r--r--mullvad-daemon/Cargo.toml4
-rw-r--r--mullvad-daemon/src/account_history.rs113
-rw-r--r--mullvad-daemon/src/event_loop.rs38
-rw-r--r--mullvad-daemon/src/lib.rs646
-rw-r--r--mullvad-daemon/src/main.rs58
-rw-r--r--mullvad-daemon/src/management_interface.rs563
-rw-r--r--mullvad-daemon/src/relays.rs6
-rw-r--r--mullvad-daemon/src/runtime.rs11
-rw-r--r--mullvad-daemon/src/system_service.rs25
-rw-r--r--mullvad-daemon/src/version_check.rs6
-rw-r--r--mullvad-daemon/src/wireguard.rs42
-rw-r--r--mullvad-jni/Cargo.toml5
-rw-r--r--mullvad-jni/src/daemon_interface.rs51
-rw-r--r--mullvad-jni/src/lib.rs18
-rw-r--r--mullvad-problem-report/Cargo.toml1
-rw-r--r--mullvad-problem-report/src/lib.rs13
-rw-r--r--mullvad-rpc/Cargo.toml2
-rw-r--r--mullvad-rpc/src/bin/relay_list.rs12
-rw-r--r--mullvad-rpc/src/event_loop.rs14
-rw-r--r--mullvad-rpc/src/lib.rs30
-rw-r--r--mullvad-rpc/src/rest.rs13
-rw-r--r--talpid-core/Cargo.toml5
-rw-r--r--talpid-core/src/future_retry.rs4
-rw-r--r--talpid-core/src/offline/android.rs6
-rw-r--r--talpid-core/src/offline/linux.rs59
-rw-r--r--talpid-core/src/offline/macos.rs8
-rw-r--r--talpid-core/src/offline/mod.rs21
-rw-r--r--talpid-core/src/offline/windows.rs8
-rw-r--r--talpid-core/src/routing/linux.rs6
-rw-r--r--talpid-core/src/routing/macos.rs4
-rw-r--r--talpid-core/src/routing/unix.rs14
-rw-r--r--talpid-core/src/tunnel/openvpn.rs9
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs126
-rw-r--r--talpid-openvpn-plugin/src/processing.rs1
35 files changed, 929 insertions, 1330 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 31d6dd3547..d530e53bd9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -182,15 +182,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bytes"
-version = "0.4.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -284,40 +275,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
-name = "crossbeam-deque"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "crossbeam-epoch"
-version = "0.8.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "memoffset 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
- "scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "crossbeam-queue"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -979,31 +936,6 @@ dependencies = [
]
[[package]]
-name = "jsonrpc-client-core"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "error-chain 0.12.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "jsonrpc-core"
-version = "8.0.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1078,14 +1010,6 @@ version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
-name = "memoffset"
-version = "0.5.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "miniz_oxide"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1238,8 +1162,6 @@ dependencies = [
"talpid-core 0.1.0",
"talpid-types 0.1.0",
"tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"triggered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1262,11 +1184,9 @@ name = "mullvad-jni"
version = "0.1.0"
dependencies = [
"err-derive 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"ipnetwork 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jnix 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-client-core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"log-panics 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1279,6 +1199,7 @@ dependencies = [
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"talpid-core 0.1.0",
"talpid-types 0.1.0",
+ "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1324,6 +1245,7 @@ dependencies = [
"regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"rs-release 0.1.7 (git+https://github.com/mullvad/rs-release?branch=snailquote-unescape)",
"talpid-types 0.1.0",
+ "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"winres 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2197,11 +2119,6 @@ dependencies = [
]
[[package]]
-name = "scoped-tls"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-
-[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2322,11 +2239,6 @@ dependencies = [
[[package]]
name = "slab"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-
-[[package]]
-name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2526,9 +2438,6 @@ dependencies = [
"talpid-types 0.1.0",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tonic 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tonic-build 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"triggered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2648,29 +2557,6 @@ dependencies = [
[[package]]
name = "tokio"
-version = "0.1.22"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-current-thread 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-fs 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-tcp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-udp 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-uds 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
@@ -2693,72 +2579,6 @@ dependencies = [
]
[[package]]
-name = "tokio-codec"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-core"
-version = "0.1.17"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-current-thread"
-version = "0.1.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-executor"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-fs"
-version = "0.1.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-io"
-version = "0.1.13"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "tokio-macros"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2769,24 +2589,6 @@ dependencies = [
]
[[package]]
-name = "tokio-reactor"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "tokio-rustls"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2798,95 +2600,6 @@ dependencies = [
]
[[package]]
-name = "tokio-sync"
-version = "0.1.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "fnv 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-tcp"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-threadpool"
-version = "0.1.18"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-timer"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-timer"
-version = "0.2.13"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-udp"
-version = "0.1.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "tokio-uds"
-version = "0.2.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
- "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "libc 0.2.76 (registry+https://github.com/rust-lang/crates.io-index)",
- "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio-uds 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
name = "tokio-util"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3506,7 +3219,6 @@ dependencies = [
"checksum blake2b_simd 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
"checksum bumpalo 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
"checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
-"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
"checksum bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
"checksum cc 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)" = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381"
"checksum cesu8 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
@@ -3519,9 +3231,6 @@ dependencies = [
"checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
"checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
"checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
-"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
-"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
-"checksum crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum ct-logs 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4d3686f5fa27dbc1d76c751300376e167c5a43387f44bb451fd1c24776e49113"
"checksum ctrlc 3.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d0b676fa23f995faf587496dcd1c80fead847ed58d2da52ac1caca9a72790dd2"
@@ -3595,8 +3304,6 @@ dependencies = [
"checksum jnix 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6354ae923ca4df982181ae2cd77eb4214f8c11d11d0c0cd8606c9347ac2abc57"
"checksum jnix-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c78132fe420156f13b30518fcda9449b0ab8ae3b5584e8a1c53ce390fe770b44"
"checksum js-sys 0.3.44 (registry+https://github.com/rust-lang/crates.io-index)" = "85a7e2c92a4804dd459b86c339278d0fe87cf93757fae222c3fa3ae75458bc73"
-"checksum jsonrpc-client-core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f29cb249837420fb0cee7fb0fbf1d22679e121b160e71bb5e0d90b9df241c23e"
-"checksum jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ddf83704f4e79979a424d1082dd2c1e52683058056c9280efa19ac5f6bc9033c"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
"checksum lazycell 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
@@ -3608,7 +3315,6 @@ dependencies = [
"checksum log-panics 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ae0136257df209261daa18d6c16394757c63e032e27aafd8b07788b051082bef"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
-"checksum memoffset 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f"
"checksum miniz_oxide 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f"
"checksum mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
"checksum mio-extras 2.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
@@ -3707,7 +3413,6 @@ dependencies = [
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum same-file 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
"checksum schannel 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
-"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28"
"checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
"checksum sct 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c"
"checksum security-framework 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535"
@@ -3722,7 +3427,6 @@ dependencies = [
"checksum shell-escape 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
"checksum signal-hook-registry 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035"
"checksum simple-signal 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53f7da44adcc42667d57483bd93f81295f27d66897804b757573b61b6f13288b"
-"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum smallvec 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6"
"checksum snailquote 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fc3e2894a343234fb8a8653cf9d49ef6aea44e6581612ca311c91c4bd356dec4"
@@ -3749,24 +3453,9 @@ dependencies = [
"checksum thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
-"checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"
"checksum tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
-"checksum tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b"
-"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71"
-"checksum tokio-current-thread 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e"
-"checksum tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
-"checksum tokio-fs 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4"
-"checksum tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
"checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
-"checksum tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351"
"checksum tokio-rustls 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)" = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4"
-"checksum tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee"
-"checksum tokio-tcp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72"
-"checksum tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89"
-"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc"
-"checksum tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296"
-"checksum tokio-udp 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82"
-"checksum tokio-uds 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0"
"checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930"
"checksum tokio-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
"checksum toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a"
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index 86abaabfaf..4847bb7465 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -24,9 +24,7 @@ rand = "0.7"
regex = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
-tokio02 = { package = "tokio", version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream", "fs"] }
-tokio-core = "0.1"
-tokio-timer = "0.1"
+tokio = { version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream", "fs"] }
uuid = { version = "0.7", features = ["v4"] }
mullvad-paths = { path = "../mullvad-paths" }
diff --git a/mullvad-daemon/src/account_history.rs b/mullvad-daemon/src/account_history.rs
index 05ea1c7d3b..8cceecda42 100644
--- a/mullvad-daemon/src/account_history.rs
+++ b/mullvad-daemon/src/account_history.rs
@@ -6,6 +6,7 @@ use std::{
future::Future,
io::{self, Seek, Write},
path::Path,
+ sync::{Arc, Mutex},
};
use talpid_types::ErrorExt;
@@ -22,6 +23,9 @@ pub enum Error {
#[error(display = "Unable to write account history file")]
Write(#[error(source)] io::Error),
+
+ #[error(display = "Write task panicked or was cancelled")]
+ WriteCancelled(#[error(source)] tokio::task::JoinError),
}
static ACCOUNT_HISTORY_FILE: &str = "account-history.json";
@@ -29,19 +33,19 @@ static ACCOUNT_HISTORY_LIMIT: usize = 3;
/// A trivial MRU cache of account data
pub struct AccountHistory {
- file: io::BufWriter<fs::File>,
- accounts: VecDeque<AccountEntry>,
+ file: Arc<Mutex<io::BufWriter<fs::File>>>,
+ accounts: Arc<Mutex<VecDeque<AccountEntry>>>,
rpc_handle: MullvadRestHandle,
}
impl AccountHistory {
- pub fn new(
+ pub async fn new(
cache_dir: &Path,
settings_dir: &Path,
rpc_handle: MullvadRestHandle,
) -> Result<AccountHistory> {
- Self::migrate_from_old_file_location(cache_dir, settings_dir);
+ Self::migrate_from_old_file_location(cache_dir, settings_dir).await;
let mut options = fs::OpenOptions::new();
#[cfg(unix)]
@@ -83,30 +87,32 @@ impl AccountHistory {
};
let file = io::BufWriter::new(reader.into_inner());
let mut history = AccountHistory {
- file,
- accounts,
+ file: Arc::new(Mutex::new(file)),
+ accounts: Arc::new(Mutex::new(accounts)),
rpc_handle,
};
- if let Err(e) = history.save_to_disk() {
+ if let Err(e) = history.save_to_disk().await {
log::error!("Failed to save account cache after opening it: {}", e);
}
Ok(history)
}
- fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) {
+ async fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) {
+ use tokio::fs;
+
let old_path = old_dir.join(ACCOUNT_HISTORY_FILE);
let new_path = new_dir.join(ACCOUNT_HISTORY_FILE);
if !old_path.exists() || new_path.exists() || new_path == old_path {
return;
}
- if let Err(error) = fs::copy(&old_path, &new_path) {
+ if let Err(error) = fs::copy(&old_path, &new_path).await {
log::error!(
"{}",
error.display_chain_with_msg("Failed to migrate account history file location")
);
} else {
- let _ = fs::remove_file(old_path);
+ let _ = fs::remove_file(old_path).await;
}
}
@@ -123,9 +129,11 @@ impl AccountHistory {
/// Gets account data for a certain account id and bumps it's entry to the top of the list if
/// it isn't there already. Returns None if the account entry is not available.
- pub fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> {
+ pub async fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> {
let (idx, entry) = match self
.accounts
+ .lock()
+ .unwrap()
.iter()
.enumerate()
.find(|(_idx, entry)| &entry.account == account)
@@ -139,19 +147,19 @@ impl AccountHistory {
if idx == 0 {
return Ok(Some(entry));
}
- self.insert(entry.clone())?;
+ self.insert(entry.clone()).await?;
Ok(Some(entry))
}
/// Bumps history of an account token. If the account token is not in history, it will be
/// added.
- pub fn bump_history(&mut self, account: &AccountToken) -> Result<()> {
- if self.get(account)?.is_none() {
+ pub async fn bump_history(&mut self, account: &AccountToken) -> Result<()> {
+ if self.get(account).await?.is_none() {
let new_entry = AccountEntry {
account: account.to_string(),
wireguard: None,
};
- self.insert(new_entry)?;
+ self.insert(new_entry).await?;
}
Ok(())
}
@@ -173,58 +181,58 @@ impl AccountHistory {
}
/// Always inserts a new entry at the start of the list
- pub fn insert(&mut self, new_entry: AccountEntry) -> Result<()> {
- self.accounts
- .retain(|entry| entry.account != new_entry.account);
-
- self.accounts.push_front(new_entry);
+ pub async fn insert(&mut self, new_entry: AccountEntry) -> Result<()> {
+ let mut accounts = self.accounts.lock().unwrap();
+ accounts.retain(|entry| entry.account != new_entry.account);
+ accounts.push_front(new_entry);
- if self.accounts.len() > ACCOUNT_HISTORY_LIMIT {
- let last_entry = self.accounts.pop_back().unwrap();
+ if accounts.len() > ACCOUNT_HISTORY_LIMIT {
+ let last_entry = accounts.pop_back().unwrap();
if let Some(wg_data) = last_entry.wireguard {
- self.rpc_handle
- .service()
- .spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data));
+ tokio::spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data));
}
}
- self.save_to_disk()
+ std::mem::drop(accounts);
+ self.save_to_disk().await
}
/// Retrieve account history.
pub fn get_account_history(&self) -> Vec<AccountToken> {
self.accounts
+ .lock()
+ .unwrap()
.iter()
.map(|entry| entry.account.clone())
.collect()
}
/// Remove account data
- pub fn remove_account(&mut self, account: &str) -> Result<()> {
- let entry = self.get(&String::from(account))?;
+ pub async fn remove_account(&mut self, account: &str) -> Result<()> {
+ let entry = self.get(&String::from(account)).await?;
let entry = match entry {
Some(entry) => entry,
None => return Ok(()),
};
if let Some(wg_data) = entry.wireguard {
- self.rpc_handle
- .service()
- .spawn(self.create_remove_wg_key_rpc(account, &wg_data))
+ tokio::spawn(self.create_remove_wg_key_rpc(account, &wg_data));
}
- let _ = self.accounts.pop_front();
- self.save_to_disk()
+ let _ = self.accounts.lock().unwrap().pop_front();
+ self.save_to_disk().await
}
/// Remove account history
- pub fn clear(&mut self) -> Result<()> {
+ pub async fn clear(&mut self) -> Result<()> {
log::debug!("account_history::clear");
let rpc = WireguardKeyProxy::new(self.rpc_handle.clone());
let removal: Vec<_> = self
.accounts
+ .lock()
+ .unwrap()
.drain(0..)
.filter_map(move |entry| {
let account = entry.account.clone();
@@ -241,21 +249,34 @@ impl AccountHistory {
.collect();
- let joined_futs = futures::future::join_all(removal);
- self.rpc_handle.service().block_on(joined_futs);
+ futures::future::join_all(removal).await;
+
+ self.accounts = Arc::new(Mutex::new(VecDeque::new()));
+ self.save_to_disk().await
+ }
+
+ async fn save_to_disk(&mut self) -> Result<()> {
+ let file = self.file.clone();
+ let accounts = self.accounts.clone();
- self.accounts = VecDeque::new();
- self.save_to_disk()
+ tokio::task::spawn_blocking(move || {
+ let mut file = file.lock().unwrap();
+ let accounts = accounts.lock().unwrap();
+ Self::save_to_disk_inner(&mut *file, &*accounts)
+ })
+ .await
+ .map_err(Error::WriteCancelled)?
}
- fn save_to_disk(&mut self) -> Result<()> {
- self.file.get_mut().set_len(0).map_err(Error::Write)?;
- self.file
- .seek(io::SeekFrom::Start(0))
- .map_err(Error::Write)?;
- serde_json::to_writer_pretty(&mut self.file, &self.accounts).map_err(Error::Serialize)?;
- self.file.flush().map_err(Error::Write)?;
- self.file.get_mut().sync_all().map_err(Error::Write)
+ fn save_to_disk_inner(
+ mut file: &mut io::BufWriter<fs::File>,
+ accounts: &VecDeque<AccountEntry>,
+ ) -> Result<()> {
+ file.get_mut().set_len(0).map_err(Error::Write)?;
+ file.seek(io::SeekFrom::Start(0)).map_err(Error::Write)?;
+ serde_json::to_writer_pretty(&mut file, accounts).map_err(Error::Serialize)?;
+ file.flush().map_err(Error::Write)?;
+ file.get_mut().sync_all().map_err(Error::Write)
}
}
diff --git a/mullvad-daemon/src/event_loop.rs b/mullvad-daemon/src/event_loop.rs
deleted file mode 100644
index 58f638a2ac..0000000000
--- a/mullvad-daemon/src/event_loop.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-use futures01::{sync::oneshot, Future};
-use std::thread;
-use tokio_core::reactor::{Core, Remote};
-
-pub struct CoreHandle {
- /// Remote used to spawn futures on the daemon's event loop.
- pub remote: Remote,
- /// A sender that will cause the event loop to stop once it's dropped.
- shutdown_tx: Option<oneshot::Sender<()>>,
-}
-
-impl Drop for CoreHandle {
- fn drop(&mut self) {
- if let Some(shutdown_tx) = self.shutdown_tx.take() {
- if shutdown_tx.send(()).is_err() {
- log::error!("Core already shut down");
- }
- }
- }
-}
-
-/// Panics if a new tokio event loop can't be spawned.
-pub fn spawn() -> CoreHandle {
- let (tx, rx) = oneshot::channel();
- let (shutdown_tx, shutdown_rx) = oneshot::channel();
- thread::spawn(move || {
- let mut core = Core::new().expect("Failed to spawn event loop");
- let remote = core.remote();
- let _ = tx.send(remote);
- let _ = core.run(shutdown_rx);
- });
- let remote = rx.wait().expect("Failed to spawn event loop");
-
- CoreHandle {
- remote,
- shutdown_tx: Some(shutdown_tx),
- }
-}
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 414d64e267..e159e33be7 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -14,20 +14,18 @@ pub mod management_interface;
mod relays;
#[cfg(not(target_os = "android"))]
pub mod rpc_uniqueness_check;
+pub mod runtime;
mod settings;
pub mod version;
mod version_check;
-use futures::future::{abortable, AbortHandle};
-use futures01::{
- future::{self, Executor},
- stream::Wait,
- sync::{
- mpsc::{UnboundedReceiver, UnboundedSender},
- oneshot,
- },
- Future, Stream,
+use futures::{
+ channel::{mpsc, oneshot},
+ compat::Future01CompatExt,
+ future::{abortable, AbortHandle, Future},
+ StreamExt,
};
+use futures01::Future as Future01;
use log::{debug, error, info, warn};
use mullvad_rpc::AccountsProxy;
use mullvad_types::{
@@ -53,7 +51,7 @@ use std::{
marker::PhantomData,
mem,
path::PathBuf,
- sync::{mpsc, Arc, Weak},
+ sync::{mpsc as sync_mpsc, Arc, Weak},
time::Duration,
};
#[cfg(target_os = "linux")]
@@ -74,12 +72,6 @@ use talpid_types::{
mod wireguard;
const TARGET_START_STATE_FILE: &str = "target-start-state.json";
-mod event_loop;
-
-/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core
-/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196).
-/// Remove this and use the one in jsonrpc_core when that is released.
-type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;
const TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
@@ -151,24 +143,24 @@ pub enum Error {
/// Enum representing commands that can be sent to the daemon.
pub enum DaemonCommand {
/// Set target state. Does nothing if the daemon already has the state that is being set.
- SetTargetState(oneshot::Sender<std::result::Result<(), ()>>, TargetState),
+ SetTargetState(oneshot::Sender<()>, TargetState),
/// Reconnect the tunnel, if one is connecting/connected.
Reconnect,
/// Request the current state.
GetState(oneshot::Sender<TunnelState>),
/// Get the current geographical location.
GetCurrentLocation(oneshot::Sender<Option<GeoIpLocation>>),
- CreateNewAccount(oneshot::Sender<std::result::Result<String, mullvad_rpc::rest::Error>>),
+ CreateNewAccount(oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>),
/// Request the metadata for an account.
GetAccountData(
- oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::rest::Error>>,
+ oneshot::Sender<Result<AccountData, mullvad_rpc::rest::Error>>,
AccountToken,
),
/// Request www auth token for an account
- GetWwwAuthToken(oneshot::Sender<BoxFuture<String, mullvad_rpc::rest::Error>>),
+ GetWwwAuthToken(oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>),
/// Submit voucher to add time to the current account. Returns time added in seconds
SubmitVoucher(
- oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::rest::Error>>,
+ oneshot::Sender<Result<VoucherSubmission, mullvad_rpc::rest::Error>>,
String,
),
/// Request account history
@@ -197,15 +189,9 @@ pub enum DaemonCommand {
/// Set the mssfix argument for OpenVPN
SetOpenVpnMssfix(oneshot::Sender<()>, Option<u16>),
/// Set proxy details for OpenVPN
- SetBridgeSettings(
- oneshot::Sender<std::result::Result<(), settings::Error>>,
- BridgeSettings,
- ),
+ SetBridgeSettings(oneshot::Sender<Result<(), settings::Error>>, BridgeSettings),
/// Set proxy state
- SetBridgeState(
- oneshot::Sender<std::result::Result<(), settings::Error>>,
- BridgeState,
- ),
+ SetBridgeState(oneshot::Sender<Result<(), settings::Error>>, BridgeState),
/// Set if IPv6 should be enabled in the tunnel
SetEnableIpv6(oneshot::Sender<()>, bool),
/// Set MTU for wireguard tunnels
@@ -252,7 +238,7 @@ pub(crate) enum InternalDaemonEvent {
TunnelStateTransition(TunnelStateTransition),
/// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay.
GenerateTunnelParameters(
- mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
+ sync_mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
u32,
),
/// A command sent to the daemon.
@@ -338,12 +324,12 @@ impl DaemonExecutionState {
pub struct DaemonCommandChannel {
sender: DaemonCommandSender,
- receiver: UnboundedReceiver<InternalDaemonEvent>,
+ receiver: mpsc::UnboundedReceiver<InternalDaemonEvent>,
}
impl DaemonCommandChannel {
pub fn new() -> Self {
- let (untracked_sender, receiver) = futures01::sync::mpsc::unbounded();
+ let (untracked_sender, receiver) = mpsc::unbounded();
let sender = DaemonCommandSender(Arc::new(untracked_sender));
Self { sender, receiver }
@@ -353,7 +339,12 @@ impl DaemonCommandChannel {
self.sender.clone()
}
- fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) {
+ fn destructure(
+ self,
+ ) -> (
+ DaemonEventSender,
+ mpsc::UnboundedReceiver<InternalDaemonEvent>,
+ ) {
let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0));
(event_sender, self.receiver)
@@ -361,7 +352,7 @@ impl DaemonCommandChannel {
}
#[derive(Clone)]
-pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>);
+pub struct DaemonCommandSender(Arc<mpsc::UnboundedSender<InternalDaemonEvent>>);
impl DaemonCommandSender {
pub fn send(&self, command: DaemonCommand) -> Result<(), Error> {
@@ -372,7 +363,7 @@ impl DaemonCommandSender {
}
pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> {
- sender: Weak<UnboundedSender<InternalDaemonEvent>>,
+ sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>,
_event: PhantomData<E>,
}
@@ -389,7 +380,7 @@ where
}
impl DaemonEventSender {
- pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self {
+ pub fn new(sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>) -> Self {
DaemonEventSender {
sender,
_event: PhantomData,
@@ -454,13 +445,13 @@ pub trait EventListener {
}
pub struct Daemon<L: EventListener> {
- tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>,
+ tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: TargetState,
state: DaemonExecutionState,
#[cfg(target_os = "linux")]
exclude_pids: split_tunnel::PidManager,
- rx: Wait<UnboundedReceiver<InternalDaemonEvent>>,
+ rx: mpsc::UnboundedReceiver<InternalDaemonEvent>,
tx: DaemonEventSender,
reconnection_job: Option<AbortHandle>,
event_listener: L,
@@ -471,7 +462,6 @@ pub struct Daemon<L: EventListener> {
rpc_handle: mullvad_rpc::rest::MullvadRestHandle,
wireguard_key_manager: wireguard::KeyManager,
version_updater_handle: version_check::VersionUpdaterHandle,
- core_handle: event_loop::CoreHandle,
relay_selector: relays::RelaySelector,
last_generated_relay: Option<Relay>,
last_generated_bridge_relay: Option<Relay>,
@@ -486,7 +476,7 @@ impl<L> Daemon<L>
where
L: EventListener + Clone + Send + 'static,
{
- pub fn start(
+ pub async fn start(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
settings_dir: PathBuf,
@@ -498,12 +488,13 @@ where
let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) =
oneshot::channel();
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir)
- .map_err(Error::InitRpcFactory)?;
+ let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(
+ tokio::runtime::Handle::current(),
+ &cache_dir,
+ )
+ .map_err(Error::InitRpcFactory)?;
let rpc_handle = rpc_runtime.mullvad_rest_handle();
- let core_handle = event_loop::spawn();
-
let relay_list_listener = event_listener.clone();
let on_relay_list_update = move |relay_list: &RelayList| {
relay_list_listener.notify_relay_list(relay_list.clone());
@@ -532,9 +523,10 @@ where
app_version_info.clone(),
settings.show_beta_releases,
);
- rpc_runtime.runtime().spawn(version_updater.run());
+ tokio::spawn(version_updater.run());
let account_history =
account_history::AccountHistory::new(&cache_dir, &settings_dir, rpc_handle.clone())
+ .await
.map_err(Error::LoadAccountHistory)?;
// Restore the tunnel to a previous state
@@ -578,13 +570,14 @@ where
#[cfg(target_os = "android")]
android_context,
)
+ .await
.map_err(Error::TunnelError)?;
let wireguard_key_manager =
wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone());
// Attempt to download a fresh relay list
- rpc_runtime.runtime().block_on(relay_selector.update());
+ relay_selector.update().await;
let initial_target_state = if settings.get_account_token().is_some() {
if settings.auto_connect {
@@ -605,7 +598,7 @@ where
state: DaemonExecutionState::Running,
#[cfg(target_os = "linux")]
exclude_pids: split_tunnel::PidManager::new().map_err(Error::InitSplitTunneling)?,
- rx: internal_event_rx.wait(),
+ rx: internal_event_rx,
tx: internal_event_tx,
reconnection_job: None,
event_listener,
@@ -616,7 +609,6 @@ where
rpc_handle,
wireguard_key_manager,
version_updater_handle,
- core_handle,
relay_selector,
last_generated_relay: None,
last_generated_bridge_relay: None,
@@ -626,19 +618,22 @@ where
cache_dir,
};
- daemon.ensure_wireguard_keys_for_current_account();
+ daemon.ensure_wireguard_keys_for_current_account().await;
if let Some(token) = daemon.settings.get_account_token() {
- daemon.wireguard_key_manager.set_rotation_interval(
- &mut daemon.account_history,
- token,
- daemon
- .settings
- .tunnel_options
- .wireguard
- .automatic_rotation
- .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)),
- );
+ daemon
+ .wireguard_key_manager
+ .set_rotation_interval(
+ &mut daemon.account_history,
+ token,
+ daemon
+ .settings
+ .tunnel_options
+ .wireguard
+ .automatic_rotation
+ .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)),
+ )
+ .await;
}
Ok(daemon)
@@ -646,90 +641,101 @@ where
/// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a
/// shutdown event is received.
- pub fn run(mut self) -> Result<(), Error> {
+ pub async fn run(mut self) -> Result<(), Error> {
if self.target_state == TargetState::Secured {
self.connect_tunnel();
}
- while let Some(Ok(event)) = self.rx.next() {
- self.handle_event(event);
+
+ while let Some(event) = self.rx.next().await {
+ self.handle_event(event).await;
if self.state == DaemonExecutionState::Finished {
break;
}
}
- self.finalize();
+ self.finalize().await;
Ok(())
}
- fn finalize(self) {
- let (event_listener, shutdown_callbacks, tunnel_state_machine_shutdown_signal) =
+ async fn finalize(self) {
+ let (event_listener, shutdown_callbacks, rpc_runtime, tunnel_state_machine_shutdown_signal) =
self.shutdown();
for cb in shutdown_callbacks {
cb();
}
- let state_machine_shutdown = tokio_timer::Timer::default().timeout(
- // the oneshot::Canceled error type does not play well with the timer error, as such,
- // it has to be cast away.
- tunnel_state_machine_shutdown_signal.map_err(|_| {
- log::error!("Tunnel state machine already shut down");
- }),
+ let shutdown_signal = tokio::time::timeout(
TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT,
+ tunnel_state_machine_shutdown_signal,
);
-
- match state_machine_shutdown.wait() {
- Ok(_) => {
- log::info!("Tunnel state machine shut down");
- }
- Err(_) => {
- log::error!("Tunnel state machine did not shut down in time, shutting down anyway");
- }
+ match shutdown_signal.await {
+ Ok(_) => log::info!("Tunnel state machine shut down"),
+ Err(_) => log::error!("Tunnel state machine did not shut down gracefully"),
}
+
mem::drop(event_listener);
+ mem::drop(rpc_runtime);
}
/// Shuts down the daemon without shutting down the underlying event listener and the shutdown
/// callbacks
- fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, oneshot::Receiver<()>) {
+ fn shutdown(
+ self,
+ ) -> (
+ L,
+ Vec<Box<dyn FnOnce()>>,
+ mullvad_rpc::MullvadRpcRuntime,
+ oneshot::Receiver<()>,
+ ) {
let Daemon {
event_listener,
shutdown_callbacks,
+ rpc_runtime,
tunnel_state_machine_shutdown_signal,
..
} = self;
(
event_listener,
shutdown_callbacks,
+ rpc_runtime,
tunnel_state_machine_shutdown_signal,
)
}
- fn handle_event(&mut self, event: InternalDaemonEvent) {
+ async fn handle_event(&mut self, event: InternalDaemonEvent) {
use self::InternalDaemonEvent::*;
match event {
- TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition),
+ TunnelStateTransition(transition) => {
+ self.handle_tunnel_state_transition(transition).await
+ }
GenerateTunnelParameters(tunnel_parameters_tx, retry_attempt) => {
self.handle_generate_tunnel_parameters(&tunnel_parameters_tx, retry_attempt)
+ .await
}
- Command(command) => self.handle_command(command),
+ Command(command) => self.handle_command(command).await,
TriggerShutdown => self.trigger_shutdown_event(),
- WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event),
- NewAccountEvent(account_token, tx) => self.handle_new_account_event(account_token, tx),
+ WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event).await,
+ NewAccountEvent(account_token, tx) => {
+ self.handle_new_account_event(account_token, tx).await
+ }
NewAppVersionInfo(app_version_info) => {
self.handle_new_app_version_info(app_version_info)
}
}
}
- fn handle_tunnel_state_transition(&mut self, tunnel_state_transition: TunnelStateTransition) {
+ async fn handle_tunnel_state_transition(
+ &mut self,
+ tunnel_state_transition: TunnelStateTransition,
+ ) {
match &tunnel_state_transition {
TunnelStateTransition::Disconnected
| TunnelStateTransition::Connected(_)
| TunnelStateTransition::Error(_) => {
// Reset the RPCs so that they fail immediately after the underlying socket gets
// invalidated due to the tunnel either coming up or breaking.
- self.rpc_handle.service().reset();
+ self.rpc_handle.service().reset().await;
}
_ => (),
};
@@ -770,7 +776,7 @@ where
}
if let ErrorStateCause::AuthFailed(_) = error_state.cause() {
- self.schedule_reconnect(Duration::from_secs(60))
+ self.schedule_reconnect(Duration::from_secs(60)).await
}
}
_ => {}
@@ -780,9 +786,11 @@ where
self.event_listener.notify_new_state(tunnel_state);
}
- fn handle_generate_tunnel_parameters(
+ async fn handle_generate_tunnel_parameters(
&mut self,
- tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
+ tunnel_parameters_tx: &sync_mpsc::Sender<
+ Result<TunnelParameters, ParameterGenerationError>,
+ >,
retry_attempt: u32,
) {
if let Some(account_token) = self.settings.get_account_token() {
@@ -797,26 +805,30 @@ where
ParameterGenerationError::CustomTunnelHostResultionError
})
}
- RelaySettings::Normal(constraints) => self
- .relay_selector
- .get_tunnel_endpoint(
- &constraints,
- self.settings.get_bridge_state(),
- retry_attempt,
- self.account_history
- .get(&account_token)
- .unwrap_or(None)
- .and_then(|entry| entry.wireguard)
- .is_some(),
- )
- .map_err(|_| ParameterGenerationError::NoMatchingRelay)
- .and_then(|(relay, endpoint)| {
- let result = self.create_tunnel_parameters(
- &relay,
- endpoint,
- account_token,
+ RelaySettings::Normal(constraints) => {
+ let endpoint = self
+ .relay_selector
+ .get_tunnel_endpoint(
+ &constraints,
+ self.settings.get_bridge_state(),
retry_attempt,
- );
+ self.account_history
+ .get(&account_token)
+ .await
+ .unwrap_or(None)
+ .and_then(|entry| entry.wireguard)
+ .is_some(),
+ )
+ .ok();
+ if let Some((relay, endpoint)) = endpoint {
+ let result = self
+ .create_tunnel_parameters(
+ &relay,
+ endpoint,
+ account_token,
+ retry_attempt,
+ )
+ .await;
self.last_generated_relay = Some(relay);
match result {
Ok(result) => Ok(result),
@@ -836,7 +848,10 @@ where
Err(ParameterGenerationError::NoMatchingRelay)
}
}
- }),
+ } else {
+ Err(ParameterGenerationError::NoMatchingRelay)
+ }
+ }
};
if tunnel_parameters_tx.send(result).is_err() {
log::error!("Failed to send tunnel parameters");
@@ -846,7 +861,7 @@ where
}
}
- fn create_tunnel_parameters(
+ async fn create_tunnel_parameters(
&mut self,
relay: &Relay,
endpoint: MullvadEndpoint,
@@ -927,6 +942,7 @@ where
let wg_data = self
.account_history
.get(&account_token)
+ .await
.map_err(Error::AccountHistory)?
.and_then(|entry| entry.wireguard)
.ok_or(Error::NoKeyAvailable)?;
@@ -952,15 +968,15 @@ where
}
}
- fn schedule_reconnect(&mut self, delay: Duration) {
+ async fn schedule_reconnect(&mut self, delay: Duration) {
let tunnel_command_tx = self.tx.to_specialized_sender();
let (future, abort_handle) = abortable(Box::pin(async move {
- tokio02::time::delay_for(delay).await;
+ tokio::time::delay_for(delay).await;
log::debug!("Attempting to reconnect");
let _ = tunnel_command_tx.send(DaemonCommand::Reconnect);
}));
- self.spawn_future(future);
+ tokio::spawn(future);
self.reconnection_job = Some(abort_handle);
}
@@ -970,23 +986,8 @@ where
}
}
- fn spawn_future<F>(&mut self, fut: F)
- where
- F: std::future::Future + Send + 'static,
- F::Output: Send,
- {
- self.rpc_runtime.runtime().spawn(fut);
- }
-
- fn block_on_future<F>(&mut self, fut: F) -> F::Output
- where
- F: std::future::Future,
- {
- self.rpc_runtime.runtime().block_on(fut)
- }
-
- fn handle_command(&mut self, command: DaemonCommand) {
+ async fn handle_command(&mut self, command: DaemonCommand) {
use self::DaemonCommand::*;
if !self.state.is_running() {
log::trace!("Dropping daemon command because the daemon is shutting down",);
@@ -996,22 +997,22 @@ where
SetTargetState(tx, state) => self.on_set_target_state(tx, state),
Reconnect => self.on_reconnect(),
GetState(tx) => self.on_get_state(tx),
- GetCurrentLocation(tx) => self.on_get_current_location(tx),
- CreateNewAccount(tx) => self.on_create_new_account(tx),
- GetAccountData(tx, account_token) => self.on_get_account_data(tx, account_token),
- GetWwwAuthToken(tx) => self.on_get_www_auth_token(tx),
- SubmitVoucher(tx, voucher) => self.on_submit_voucher(tx, voucher),
+ GetCurrentLocation(tx) => self.on_get_current_location(tx).await,
+ CreateNewAccount(tx) => self.on_create_new_account(tx).await,
+ GetAccountData(tx, account_token) => self.on_get_account_data(tx, account_token).await,
+ GetWwwAuthToken(tx) => self.on_get_www_auth_token(tx).await,
+ SubmitVoucher(tx, voucher) => self.on_submit_voucher(tx, voucher).await,
GetRelayLocations(tx) => self.on_get_relay_locations(tx),
- UpdateRelayLocations => self.on_update_relay_locations(),
- SetAccount(tx, account_token) => self.on_set_account(tx, account_token),
+ UpdateRelayLocations => self.on_update_relay_locations().await,
+ SetAccount(tx, account_token) => self.on_set_account(tx, account_token).await,
GetAccountHistory(tx) => self.on_get_account_history(tx),
RemoveAccountFromHistory(tx, account_token) => {
- self.on_remove_account_from_history(tx, account_token)
+ self.on_remove_account_from_history(tx, account_token).await
}
- ClearAccountHistory(tx) => self.on_clear_account_history(tx),
+ ClearAccountHistory(tx) => self.on_clear_account_history(tx).await,
UpdateRelaySettings(tx, update) => self.on_update_relay_settings(tx, update),
SetAllowLan(tx, allow_lan) => self.on_set_allow_lan(tx, allow_lan),
- SetShowBetaReleases(tx, enabled) => self.on_set_show_beta_releases(tx, enabled),
+ SetShowBetaReleases(tx, enabled) => self.on_set_show_beta_releases(tx, enabled).await,
SetBlockWhenDisconnected(tx, block_when_disconnected) => {
self.on_set_block_when_disconnected(tx, block_when_disconnected)
}
@@ -1024,16 +1025,16 @@ where
SetEnableIpv6(tx, enable_ipv6) => self.on_set_enable_ipv6(tx, enable_ipv6),
SetWireguardMtu(tx, mtu) => self.on_set_wireguard_mtu(tx, mtu),
SetWireguardRotationInterval(tx, interval) => {
- self.on_set_wireguard_rotation_interval(tx, interval)
+ self.on_set_wireguard_rotation_interval(tx, interval).await
}
GetSettings(tx) => self.on_get_settings(tx),
- GenerateWireguardKey(tx) => self.on_generate_wireguard_key(tx),
- GetWireguardKey(tx) => self.on_get_wireguard_key(tx),
- VerifyWireguardKey(tx) => self.on_verify_wireguard_key(tx),
+ GenerateWireguardKey(tx) => self.on_generate_wireguard_key(tx).await,
+ GetWireguardKey(tx) => self.on_get_wireguard_key(tx).await,
+ VerifyWireguardKey(tx) => self.on_verify_wireguard_key(tx).await,
GetVersionInfo(tx) => self.on_get_version_info(tx),
GetCurrentVersion(tx) => self.on_get_current_version(tx),
#[cfg(not(target_os = "android"))]
- FactoryReset(tx) => self.on_factory_reset(tx),
+ FactoryReset(tx) => self.on_factory_reset(tx).await,
#[cfg(target_os = "linux")]
GetSplitTunnelProcesses(tx) => self.on_get_split_tunnel_processes(tx),
#[cfg(target_os = "linux")]
@@ -1047,7 +1048,7 @@ where
}
}
- fn handle_wireguard_key_event(
+ async fn handle_wireguard_key_event(
&mut self,
event: (
AccountToken,
@@ -1073,6 +1074,7 @@ where
let mut account_entry = self
.account_history
.get(&account)
+ .await
.ok()
.and_then(|entry| entry)
.unwrap_or_else(|| account_history::AccountEntry {
@@ -1080,10 +1082,10 @@ where
wireguard: None,
});
account_entry.wireguard = Some(data);
- match self.account_history.insert(account_entry) {
+ match self.account_history.insert(account_entry).await {
Ok(_) => {
if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() {
- self.schedule_reconnect(WG_RECONNECT_DELAY);
+ self.schedule_reconnect(WG_RECONNECT_DELAY).await;
}
self.event_listener
.notify_key_event(KeygenEvent::NewKey(public_key))
@@ -1115,12 +1117,12 @@ where
}
}
- fn handle_new_account_event(
+ async fn handle_new_account_event(
&mut self,
new_token: AccountToken,
tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>,
) {
- match self.set_account(Some(new_token.clone())) {
+ match self.set_account(Some(new_token.clone())).await {
Ok(_) => {
self.set_target_state(TargetState::Unsecured);
let _ = tx.send(Ok(new_token));
@@ -1136,17 +1138,13 @@ where
self.event_listener.notify_app_version(app_version_info);
}
- fn on_set_target_state(
- &mut self,
- tx: oneshot::Sender<Result<(), ()>>,
- new_target_state: TargetState,
- ) {
+ fn on_set_target_state(&mut self, tx: oneshot::Sender<()>, new_target_state: TargetState) {
if self.state.is_running() {
self.set_target_state(new_target_state);
} else {
warn!("Ignoring target state change request due to shutdown");
}
- Self::oneshot_send(tx, Ok(()), "target state");
+ Self::oneshot_send(tx, (), "target state");
}
fn on_reconnect(&mut self) {
@@ -1161,42 +1159,56 @@ where
Self::oneshot_send(tx, self.tunnel_state.clone(), "current state");
}
- fn on_get_current_location(&mut self, tx: oneshot::Sender<Option<GeoIpLocation>>) {
+ async fn on_get_current_location(&mut self, tx: oneshot::Sender<Option<GeoIpLocation>>) {
use self::TunnelState::*;
- let get_location: Box<dyn Future<Item = Option<GeoIpLocation>, Error = ()> + Send> =
- match &self.tunnel_state {
- Disconnected => Box::new(self.get_geo_location().map(Some)),
- Connecting { location, .. } => Box::new(future::result(Ok(location.clone()))),
- Disconnecting(..) => Box::new(future::result(Ok(self.build_location_from_relay()))),
- Connected { location, .. } => {
- let relay_location = location.clone();
- Box::new(
- self.get_geo_location()
- .map(|fetched_location| GeoIpLocation {
- ipv4: fetched_location.ipv4,
- ipv6: fetched_location.ipv6,
- ..relay_location.unwrap_or(fetched_location)
- })
- .map(Some),
- )
- }
- Error(..) => {
- // We are not online at all at this stage so no location data is available.
- Box::new(future::result(Ok(None)))
- }
- };
- self.core_handle.remote.spawn(move |_| {
- get_location.map(|location| Self::oneshot_send(tx, location, "current location"))
- });
+ match &self.tunnel_state {
+ Disconnected => {
+ let location = self.get_geo_location();
+ tokio::spawn(async {
+ Self::oneshot_send(tx, location.await.ok(), "current location");
+ });
+ }
+ Connecting { location, .. } => {
+ Self::oneshot_send(tx, location.clone(), "current location")
+ }
+ Disconnecting(..) => {
+ Self::oneshot_send(tx, self.build_location_from_relay(), "current location")
+ }
+ Connected { location, .. } => {
+ let relay_location = location.clone();
+ let location_future = self.get_geo_location();
+ tokio::spawn(async {
+ let location = location_future.await;
+ Self::oneshot_send(
+ tx,
+ location.ok().map(|fetched_location| GeoIpLocation {
+ ipv4: fetched_location.ipv4,
+ ipv6: fetched_location.ipv6,
+ ..relay_location.unwrap_or(fetched_location)
+ }),
+ "current location",
+ );
+ });
+ }
+ Error(_) => {
+ // We are not online at all at this stage so no location data is available.
+ Self::oneshot_send(tx, None, "current location");
+ }
+ }
}
- fn get_geo_location(&mut self) -> impl Future<Item = GeoIpLocation, Error = ()> {
+ fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
let https_handle = self.rpc_runtime.rest_handle();
- geoip::send_location_request(https_handle).map_err(|e| {
- warn!("Unable to fetch GeoIP location: {}", e.display_chain());
- })
+ async {
+ geoip::send_location_request(https_handle)
+ .map_err(|e| {
+ warn!("Unable to fetch GeoIP location: {}", e.display_chain());
+ })
+ .compat()
+ .await
+ }
}
fn build_location_from_relay(&self) -> Option<GeoIpLocation> {
@@ -1221,7 +1233,7 @@ where
})
}
- fn on_create_new_account(
+ async fn on_create_new_account(
&mut self,
tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>,
) {
@@ -1242,41 +1254,55 @@ where
Ok(())
});
- if self.core_handle.remote.execute(future).is_err() {
- log::error!("Failed to spawn future for creating a new account");
- }
+ tokio::spawn(async {
+ if future.compat().await.is_err() {
+ log::error!("Failed to spawn future for creating a new account");
+ }
+ });
}
- fn on_get_account_data(
+ async fn on_get_account_data(
&mut self,
- tx: oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::rest::Error>>,
+ tx: oneshot::Sender<Result<AccountData, mullvad_rpc::rest::Error>>,
account_token: AccountToken,
) {
- let rpc_call = self
- .accounts_proxy
- .get_expiry(account_token)
- .map(|expiry| AccountData { expiry });
- Self::oneshot_send(tx, Box::new(rpc_call), "account data")
+ let expiry_old_fut = self.accounts_proxy.get_expiry(account_token);
+ let rpc_call = async {
+ let result = expiry_old_fut
+ .compat()
+ .await
+ .map(|expiry| AccountData { expiry });
+ Self::oneshot_send(tx, result, "account data");
+ };
+ tokio::spawn(rpc_call);
}
- fn on_get_www_auth_token(
+ async fn on_get_www_auth_token(
&mut self,
- tx: oneshot::Sender<BoxFuture<String, mullvad_rpc::rest::Error>>,
+ tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>,
) {
if let Some(account_token) = self.settings.get_account_token() {
- let rpc_call = self.accounts_proxy.get_www_auth_token(account_token);
- Self::oneshot_send(tx, Box::new(rpc_call), "get_www_auth_token response")
+ let old_future = self.accounts_proxy.get_www_auth_token(account_token);
+ let rpc_call = async {
+ let result = old_future.compat().await;
+ Self::oneshot_send(tx, result, "get_www_auth_token response");
+ };
+ tokio::spawn(rpc_call);
}
}
- fn on_submit_voucher(
+ async fn on_submit_voucher(
&mut self,
- tx: oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::rest::Error>>,
+ tx: oneshot::Sender<Result<VoucherSubmission, mullvad_rpc::rest::Error>>,
voucher: String,
) {
if let Some(account_token) = self.settings.get_account_token() {
- let rpc_call = self.accounts_proxy.submit_voucher(account_token, voucher);
- Self::oneshot_send(tx, Box::new(rpc_call), "submit_voucher response");
+ let old_future = self.accounts_proxy.submit_voucher(account_token, voucher);
+ let rpc_call = async {
+ let result = old_future.compat().await;
+ Self::oneshot_send(tx, result, "submit_voucher response");
+ };
+ tokio::spawn(rpc_call);
}
}
@@ -1284,13 +1310,12 @@ where
Self::oneshot_send(tx, self.relay_selector.get_locations(), "relay locations");
}
- fn on_update_relay_locations(&mut self) {
- let update_future = self.relay_selector.update();
- self.block_on_future(update_future);
+ async fn on_update_relay_locations(&mut self) {
+ self.relay_selector.update().await;
}
- fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) {
- match self.set_account(account_token.clone()) {
+ async fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) {
+ match self.set_account(account_token.clone()).await {
Ok(account_changed) => {
if account_changed {
match account_token {
@@ -1312,7 +1337,10 @@ where
}
}
- fn set_account(&mut self, account_token: Option<String>) -> Result<bool, settings::Error> {
+ async fn set_account(
+ &mut self,
+ account_token: Option<String>,
+ ) -> Result<bool, settings::Error> {
let account_changed = self.settings.set_account_token(account_token.clone())?;
if account_changed {
self.event_listener
@@ -1320,17 +1348,18 @@ where
// Bump account history if a token was set
if let Some(token) = account_token.clone() {
- if let Err(e) = self.account_history.bump_history(&token) {
+ if let Err(e) = self.account_history.bump_history(&token).await {
log::error!("Failed to bump account history: {}", e);
}
}
- self.ensure_wireguard_keys_for_current_account();
+ self.ensure_wireguard_keys_for_current_account().await;
if let Some(token) = account_token {
// update automatic rotation
self.wireguard_key_manager
- .reset_rotation(&mut self.account_history, token);
+ .reset_rotation(&mut self.account_history, token)
+ .await;
}
}
Ok(account_changed)
@@ -1344,18 +1373,23 @@ where
);
}
- fn on_remove_account_from_history(
+ async fn on_remove_account_from_history(
&mut self,
tx: oneshot::Sender<()>,
account_token: AccountToken,
) {
- if self.account_history.remove_account(&account_token).is_ok() {
+ if self
+ .account_history
+ .remove_account(&account_token)
+ .await
+ .is_ok()
+ {
Self::oneshot_send(tx, (), "remove_account_from_history response");
}
}
- fn on_clear_account_history(&mut self, tx: oneshot::Sender<()>) {
- match self.account_history.clear() {
+ async fn on_clear_account_history(&mut self, tx: oneshot::Sender<()>) {
+ match self.account_history.clear().await {
Ok(_) => {
self.set_target_state(TargetState::Unsecured);
Self::oneshot_send(tx, (), "clear_account_history response");
@@ -1384,7 +1418,7 @@ where
}
#[cfg(not(target_os = "android"))]
- fn on_factory_reset(&mut self, tx: oneshot::Sender<()>) {
+ async fn on_factory_reset(&mut self, tx: oneshot::Sender<()>) {
let mut failed = false;
@@ -1393,7 +1427,7 @@ where
failed = true;
}
- if let Err(e) = self.account_history.clear() {
+ if let Err(e) = self.account_history.clear().await {
log::error!("Failed to clear account history - {}", e);
failed = true;
}
@@ -1486,7 +1520,7 @@ where
}
}
- fn on_set_show_beta_releases(&mut self, tx: oneshot::Sender<()>, enabled: bool) {
+ async fn on_set_show_beta_releases(&mut self, tx: oneshot::Sender<()>, enabled: bool) {
let save_result = self.settings.set_show_beta_releases(enabled);
match save_result {
Ok(settings_changed) => {
@@ -1494,9 +1528,8 @@ where
if settings_changed {
self.event_listener
.notify_settings(self.settings.to_settings());
- let runtime = self.rpc_runtime.runtime();
let mut handle = self.version_updater_handle.clone();
- runtime.block_on(async { handle.set_show_beta_releases(enabled).await });
+ handle.set_show_beta_releases(enabled).await;
}
}
Err(e) => error!("{}", e.display_chain_with_msg("Unable to save settings")),
@@ -1648,7 +1681,7 @@ where
}
}
- fn on_set_wireguard_rotation_interval(
+ async fn on_set_wireguard_rotation_interval(
&mut self,
tx: oneshot::Sender<()>,
interval: Option<u32>,
@@ -1661,11 +1694,14 @@ where
let account_token = self.settings.get_account_token();
if let Some(token) = account_token {
- self.wireguard_key_manager.set_rotation_interval(
- &mut self.account_history,
- token,
- interval.map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)),
- );
+ self.wireguard_key_manager
+ .set_rotation_interval(
+ &mut self.account_history,
+ token,
+ interval
+ .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)),
+ )
+ .await;
}
self.event_listener
@@ -1676,68 +1712,89 @@ where
}
}
- fn ensure_wireguard_keys_for_current_account(&mut self) {
+ async fn ensure_wireguard_keys_for_current_account(&mut self) {
if let Some(account) = self.settings.get_account_token() {
if self
.account_history
.get(&account)
+ .await
.map(|entry| entry.map(|e| e.wireguard.is_none()).unwrap_or(true))
.unwrap_or(true)
{
log::info!("Automatically generating new wireguard key for account");
self.wireguard_key_manager
- .generate_key_async(account, Some(FIRST_KEY_PUSH_TIMEOUT));
+ .generate_key_async(account, Some(FIRST_KEY_PUSH_TIMEOUT))
+ .await;
} else {
log::info!("Account already has wireguard key");
}
}
}
- fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) {
- let mut result = || -> Result<KeygenEvent, String> {
- let account_token = self
- .settings
- .get_account_token()
- .ok_or_else(|| "No account token set".to_owned())?;
+ async fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) {
+ match self.on_generate_wireguard_key_inner().await {
+ Ok(key_event) => {
+ Self::oneshot_send(tx, key_event, "generate_wireguard_key response");
+ }
+ Err(e) => {
+ log::error!("Failed to generate new wireguard key - {}", e);
+ }
+ }
+ }
- let mut account_entry = self
- .account_history
- .get(&account_token)
- .map_err(|e| format!("Failed to read account entry from history: {}", e))
- .map(|data| {
- data.unwrap_or_else(|| {
- log::error!("Account token set in settings but not in account history");
- account_history::AccountEntry {
- account: account_token.clone(),
- wireguard: None,
- }
- })
- })?;
+ async fn on_generate_wireguard_key_inner(&mut self) -> Result<KeygenEvent, String> {
+ let account_token = self
+ .settings
+ .get_account_token()
+ .ok_or_else(|| "No account token set".to_owned())?;
- let gen_result = match &account_entry.wireguard {
- Some(wireguard_data) => self
- .wireguard_key_manager
- .replace_key(account_token.clone(), wireguard_data.get_public_key()),
- None => self
- .wireguard_key_manager
- .generate_key_sync(account_token.clone()),
- };
+ let mut account_entry = self
+ .account_history
+ .get(&account_token)
+ .await
+ .map_err(|e| format!("Failed to read account entry from history: {}", e))
+ .map(|data| {
+ data.unwrap_or_else(|| {
+ log::error!("Account token set in settings but not in account history");
+ account_history::AccountEntry {
+ account: account_token.clone(),
+ wireguard: None,
+ }
+ })
+ })?;
+
+ let gen_result = match &account_entry.wireguard {
+ Some(wireguard_data) => {
+ self.wireguard_key_manager
+ .replace_key(account_token.clone(), wireguard_data.get_public_key())
+ .await
+ }
+ None => {
+ self.wireguard_key_manager
+ .generate_key_sync(account_token.clone())
+ .await
+ }
+ };
- match gen_result {
- Ok(new_data) => {
- let public_key = new_data.get_public_key();
- account_entry.wireguard = Some(new_data);
- self.account_history.insert(account_entry).map_err(|e| {
+ match gen_result {
+ Ok(new_data) => {
+ let public_key = new_data.get_public_key();
+ account_entry.wireguard = Some(new_data);
+ self.account_history
+ .insert(account_entry)
+ .await
+ .map_err(|e| {
format!("Failed to add new wireguard key to account data: {}", e)
})?;
- if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() {
- self.reconnect_tunnel();
- }
- let keygen_event = KeygenEvent::NewKey(public_key);
- self.event_listener.notify_key_event(keygen_event.clone());
+ if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() {
+ self.reconnect_tunnel();
+ }
+ let keygen_event = KeygenEvent::NewKey(public_key);
+ self.event_listener.notify_key_event(keygen_event.clone());
- // update automatic rotation
- self.wireguard_key_manager.set_rotation_interval(
+ // update automatic rotation
+ self.wireguard_key_manager
+ .set_rotation_interval(
&mut self.account_history,
account_token,
self.settings
@@ -1745,39 +1802,31 @@ where
.wireguard
.automatic_rotation
.map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)),
- );
-
- Ok(keygen_event)
- }
- Err(wireguard::Error::TooManyKeys) => Ok(KeygenEvent::TooManyKeys),
- Err(e) => Err(format!(
- "Failed to generate new key - {}",
- e.display_chain_with_msg("Failed to generate new wireguard key:")
- )),
- }
- };
+ )
+ .await;
- match result() {
- Ok(key_event) => {
- Self::oneshot_send(tx, key_event, "generate_wireguard_key response");
- }
- Err(e) => {
- log::error!("Failed to generate new wireguard key - {}", e);
+ Ok(keygen_event)
}
+ Err(wireguard::Error::TooManyKeys) => Ok(KeygenEvent::TooManyKeys),
+ Err(e) => Err(format!(
+ "Failed to generate new key - {}",
+ e.display_chain_with_msg("Failed to generate new wireguard key:")
+ )),
}
}
- fn on_get_wireguard_key(&mut self, tx: oneshot::Sender<Option<wireguard::PublicKey>>) {
- let key = self
- .settings
- .get_account_token()
- .and_then(|account| self.account_history.get(&account).ok()?)
- .and_then(|account_entry| account_entry.wireguard.map(|wg| wg.get_public_key()));
-
- Self::oneshot_send(tx, key, "get_wireguard_key response");
+ async fn on_get_wireguard_key(&mut self, tx: oneshot::Sender<Option<wireguard::PublicKey>>) {
+ let token = self.settings.get_account_token();
+ if let Some(token) = token {
+ let entry = self.account_history.get(&token).await;
+ if let Ok(Some(entry)) = entry {
+ let key = entry.wireguard.map(|wg| wg.get_public_key());
+ Self::oneshot_send(tx, key, "get_wireguard_key response");
+ }
+ }
}
- fn on_verify_wireguard_key(&mut self, tx: oneshot::Sender<bool>) {
+ async fn on_verify_wireguard_key(&mut self, tx: oneshot::Sender<bool>) {
let account = match self.settings.get_account_token() {
Some(account) => account,
None => {
@@ -1789,6 +1838,7 @@ where
let key = self
.account_history
.get(&account)
+ .await
.map(|entry| entry.and_then(|e| e.wireguard.map(|wg| wg.private_key.public_key())));
let public_key = match key {
@@ -1807,7 +1857,7 @@ where
.wireguard_key_manager
.verify_wireguard_key(account, public_key);
- self.rpc_handle.service().spawn(async move {
+ tokio::spawn(async move {
match verification_rpc.await {
Ok(is_valid) => {
Self::oneshot_send(tx, is_valid, "verify_wireguard_key response");
@@ -1982,7 +2032,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator {
&mut self,
retry_attempt: u32,
) -> Result<TunnelParameters, ParameterGenerationError> {
- let (response_tx, response_rx) = mpsc::channel();
+ let (response_tx, response_rx) = sync_mpsc::channel();
if self
.tx
.send(InternalDaemonEvent::GenerateTunnelParameters(
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index adb7c39f0a..4e055183b9 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -4,7 +4,9 @@ use log::{debug, error, info, warn};
use mullvad_daemon::{
logging,
management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer},
- rpc_uniqueness_check, version, Daemon, DaemonCommandChannel, DaemonCommandSender,
+ rpc_uniqueness_check,
+ runtime::new_runtime_builder,
+ version, Daemon, DaemonCommandChannel, DaemonCommandSender,
};
use std::{path::PathBuf, thread, time::Duration};
use talpid_types::ErrorExt;
@@ -23,7 +25,13 @@ fn main() {
eprintln!("{}", error);
std::process::exit(1)
});
- let exit_code = match run_platform(config, log_dir) {
+
+ let mut runtime = new_runtime_builder().build().unwrap_or_else(|error| {
+ eprintln!("{}", error.display_chain());
+ std::process::exit(1);
+ });
+
+ let exit_code = match runtime.block_on(run_platform(config, log_dir)) {
Ok(_) => 0,
Err(error) => {
error!("{}", error);
@@ -64,7 +72,7 @@ fn get_log_dir(config: &cli::Config) -> Result<Option<PathBuf>, String> {
}
#[cfg(windows)]
-fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
+async fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
if config.run_as_service {
system_service::run()
} else {
@@ -75,46 +83,39 @@ fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), St
}
install_result
} else {
- run_standalone(log_dir)
+ run_standalone(log_dir).await
}
}
}
#[cfg(not(windows))]
-fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
- run_standalone(log_dir)
+async fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> {
+ run_standalone(log_dir).await
}
-fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
- {
- let mut runtime = tokio02::runtime::Builder::new()
- .basic_scheduler()
- .enable_all()
- .build()
- .map_err(|e| e.display_chain())?;
- if runtime.block_on(rpc_uniqueness_check::is_another_instance_running()) {
- return Err("Another instance of the daemon is already running".to_owned());
- }
+async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
+ if rpc_uniqueness_check::is_another_instance_running().await {
+ return Err("Another instance of the daemon is already running".to_owned());
}
if !running_as_admin() {
warn!("Running daemon as a non-administrator user, clients might refuse to connect");
}
- let daemon = create_daemon(log_dir)?;
+ let daemon = create_daemon(log_dir).await?;
let shutdown_handle = daemon.shutdown_handle();
shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown())
.map_err(|e| e.display_chain())?;
- daemon.run().map_err(|e| e.display_chain())?;
+ daemon.run().await.map_err(|e| e.display_chain())?;
info!("Mullvad daemon is quitting");
thread::sleep(Duration::from_millis(500));
Ok(())
}
-fn create_daemon(
+async fn create_daemon(
log_dir: Option<PathBuf>,
) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> {
let resource_dir = mullvad_paths::get_resource_dir();
@@ -124,7 +125,7 @@ fn create_daemon(
.map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?;
let command_channel = DaemonCommandChannel::new();
- let event_listener = spawn_management_interface(command_channel.sender())?;
+ let event_listener = spawn_management_interface(command_channel.sender()).await?;
Daemon::start(
log_dir,
@@ -134,23 +135,22 @@ fn create_daemon(
event_listener,
command_channel,
)
+ .await
.map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
}
-fn spawn_management_interface(
+async fn spawn_management_interface(
command_sender: DaemonCommandSender,
) -> Result<ManagementInterfaceEventBroadcaster, String> {
- let server = ManagementInterfaceServer::start(command_sender).map_err(|error| {
- error.display_chain_with_msg("Unable to start management interface server")
- })?;
+ let server = ManagementInterfaceServer::start(command_sender)
+ .await
+ .map_err(|error| {
+ error.display_chain_with_msg("Unable to start management interface server")
+ })?;
let event_broadcaster = server.event_broadcaster();
info!("Management interface listening on {}", server.socket_path());
-
- thread::spawn(|| {
- server.wait();
- info!("Management interface shut down");
- });
+ tokio::spawn(server.run());
Ok(event_broadcaster)
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index dee8cb3eea..cba6e96b43 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,6 +1,5 @@
use crate::{DaemonCommand, DaemonCommandSender, EventListener};
-use futures::compat::Future01CompatExt;
-use futures01::{future, sync, Future};
+use futures::channel::oneshot;
use mullvad_management_interface::{
types::{self, daemon_event, management_service_server::ManagementService},
Code, Request, Response, Status,
@@ -36,10 +35,6 @@ pub enum Error {
// Unable to start the management interface server
#[error(display = "Unable to start management interface server")]
SetupError(#[error(source)] mullvad_management_interface::Error),
-
- // Unable to start the tokio runtime
- #[error(display = "Failed to create the tokio runtime")]
- TokioRuntimeError(#[error(source)] tokio02::io::Error),
}
struct ManagementServiceImpl {
@@ -49,20 +44,17 @@ struct ManagementServiceImpl {
pub type ServiceResult<T> = std::result::Result<Response<T>, Status>;
type EventsListenerReceiver =
- tokio02::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>;
-type EventsListenerSender =
- tokio02::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>;
+ tokio::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>;
+type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>;
-const INVALID_ACCOUNT_TOKEN_MESSAGE: &str = "No valid account token configured";
const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid";
const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used";
#[mullvad_management_interface::async_trait]
impl ManagementService for ManagementServiceImpl {
type GetRelayLocationsStream =
- tokio02::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>;
- type GetSplitTunnelProcessesStream =
- tokio02::sync::mpsc::UnboundedReceiver<Result<i32, Status>>;
+ tokio::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>;
+ type GetSplitTunnelProcessesStream = tokio::sync::mpsc::UnboundedReceiver<Result<i32, Status>>;
type EventsListenStream = EventsListenerReceiver;
// Control and get the tunnel state
@@ -71,53 +63,39 @@ impl ManagementService for ManagementServiceImpl {
async fn connect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("connect_tunnel");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|result| match result {
- Ok(()) => Ok(Response::new(())),
- Err(()) => Err(Status::new(
- Code::Unauthenticated,
- INVALID_ACCOUNT_TOKEN_MESSAGE,
- )),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))?;
+ rx.await.map_err(|_| Status::internal("internal error"))?;
+ Ok(Response::new(()))
}
async fn disconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("disconnect_tunnel");
- let (tx, _) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))
- .then(|_| Ok(Response::new(())))
- .compat()
- .await
+ let (tx, _) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?;
+ Ok(Response::new(()))
}
async fn reconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("reconnect_tunnel");
- self.send_command_to_daemon(DaemonCommand::Reconnect)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::Reconnect)?;
+ Ok(Response::new(()))
}
async fn get_tunnel_state(&self, _: Request<()>) -> ServiceResult<types::TunnelState> {
log::debug!("get_tunnel_state");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetState(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|state| Ok(Response::new(convert_state(state))))
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetState(tx))?;
+ let state = rx.await.map_err(|_| Status::internal("internal error"))?;
+ Ok(Response::new(convert_state(state)))
}
// Control the daemon and receive events
//
async fn events_listen(&self, _: Request<()>) -> ServiceResult<Self::EventsListenStream> {
- let (tx, rx) = tokio02::sync::mpsc::unbounded_channel();
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let mut subscriptions = self.subscriptions.write();
subscriptions.push(tx);
@@ -127,30 +105,25 @@ impl ManagementService for ManagementServiceImpl {
async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("prepare_restart");
- self.send_command_to_daemon(DaemonCommand::PrepareRestart)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::PrepareRestart)?;
+ Ok(Response::new(()))
}
async fn shutdown(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("shutdown");
- self.send_command_to_daemon(DaemonCommand::Shutdown)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::Shutdown)?;
+ Ok(Response::new(()))
}
async fn factory_reset(&self, _: Request<()>) -> ServiceResult<()> {
#[cfg(not(target_os = "android"))]
{
log::debug!("factory_reset");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
#[cfg(target_os = "android")]
{
@@ -160,25 +133,22 @@ impl ManagementService for ManagementServiceImpl {
async fn get_current_version(&self, _: Request<()>) -> ServiceResult<String> {
log::debug!("get_current_version");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_version_info(&self, _: Request<()>) -> ServiceResult<types::AppVersionInfo> {
log::debug!("get_version_info");
- let (tx, rx) = sync::oneshot::channel();
- let app_version_info = self
- .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
-
- Ok(Response::new(convert_version_info(&app_version_info)))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
+ .map(|version_info| convert_version_info(&version_info))
+ .map(Response::new)
}
// Relays and tunnel constraints
@@ -186,10 +156,8 @@ impl ManagementService for ManagementServiceImpl {
async fn update_relay_locations(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("update_relay_locations");
- self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)
- .compat()
- .await
- .map(Response::new)
+ self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)?;
+ Ok(Response::new(()))
}
async fn update_relay_settings(
@@ -197,15 +165,14 @@ impl ManagementService for ManagementServiceImpl {
request: Request<types::RelaySettingsUpdate>,
) -> ServiceResult<()> {
log::debug!("update_relay_settings");
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
let constraints_update = convert_relay_settings_update(&request.into_inner())?;
let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update);
- self.send_command_to_daemon(message)
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ self.send_command_to_daemon(message)?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_relay_locations(
@@ -214,17 +181,14 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<Self::GetRelayLocationsStream> {
log::debug!("get_relay_locations");
- let (tx, rx) = sync::oneshot::channel();
- let locations = self
- .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?;
+ let locations = rx.await.map_err(|_| Status::internal("internal error"))?;
let (mut stream_tx, stream_rx) =
- tokio02::sync::mpsc::channel(cmp::max(1, locations.countries.len()));
+ tokio::sync::mpsc::channel(cmp::max(1, locations.countries.len()));
- tokio02::spawn(async move {
+ tokio::spawn(async move {
for country in &locations.countries {
if let Err(error) = stream_tx
.send(Ok(convert_relay_list_country(country)))
@@ -243,18 +207,13 @@ impl ManagementService for ManagementServiceImpl {
async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> {
log::debug!("get_current_location");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|geoip| {
- if let Some(geoip) = geoip {
- Ok(Response::new(convert_geoip_location(geoip)))
- } else {
- Err(Status::not_found("no location was found"))
- }
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ match result {
+ Some(geoip) => Ok(Response::new(convert_geoip_location(geoip))),
+ None => Err(Status::not_found("no location was found")),
+ }
}
async fn set_bridge_settings(
@@ -328,15 +287,12 @@ impl ManagementService for ManagementServiceImpl {
log::debug!("set_bridge_settings({:?})", settings);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|settings_result| {
- settings_result.map_err(|_| Status::internal("internal error"))
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))?;
+ let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ settings_result
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_bridge_state(&self, request: Request<types::BridgeState>) -> ServiceResult<()> {
@@ -350,15 +306,12 @@ impl ManagementService for ManagementServiceImpl {
};
log::debug!("set_bridge_state({:?})", bridge_state);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|settings_result| {
- settings_result.map_err(|_| Status::internal("internal error"))
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))?;
+ let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ settings_result
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
// Settings
@@ -366,59 +319,54 @@ impl ManagementService for ManagementServiceImpl {
async fn get_settings(&self, _: Request<()>) -> ServiceResult<types::Settings> {
log::debug!("get_settings");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetSettings(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetSettings(tx))?;
+ rx.await
.map(|settings| Response::new(convert_settings(&settings)))
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_allow_lan(&self, request: Request<bool>) -> ServiceResult<()> {
let allow_lan = request.into_inner();
log::debug!("set_allow_lan({})", allow_lan);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_show_beta_releases(&self, request: Request<bool>) -> ServiceResult<()> {
let enabled = request.into_inner();
log::debug!("set_show_beta_releases({})", enabled);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_block_when_disconnected(&self, request: Request<bool>) -> ServiceResult<()> {
let block_when_disconnected = request.into_inner();
log::debug!("set_block_when_disconnected({})", block_when_disconnected);
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected(
tx,
block_when_disconnected,
- ))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .map(Response::new)
- .compat()
- .await
+ ))?;
+ rx.await
+ .map(Response::new)
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_auto_connect(&self, request: Request<bool>) -> ServiceResult<()> {
let auto_connect = request.into_inner();
log::debug!("set_auto_connect({})", auto_connect);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_openvpn_mssfix(&self, request: Request<u32>) -> ServiceResult<()> {
@@ -429,50 +377,45 @@ impl ManagementService for ManagementServiceImpl {
None
};
log::debug!("set_openvpn_mssfix({:?})", mssfix);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_wireguard_mtu(&self, request: Request<u32>) -> ServiceResult<()> {
let mtu = request.into_inner();
let mtu = if mtu != 0 { Some(mtu as u16) } else { None };
log::debug!("set_wireguard_mtu({:?})", mtu);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_enable_ipv6(&self, request: Request<bool>) -> ServiceResult<()> {
let enable_ipv6 = request.into_inner();
log::debug!("set_enable_ipv6({})", enable_ipv6);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
// Account management
//
async fn create_new_account(&self, _: Request<()>) -> ServiceResult<String> {
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|result| match result {
- Ok(account_token) => Ok(Response::new(account_token)),
- Err(_) => Err(Status::internal("internal error")),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ match result {
+ Ok(account_token) => Ok(Response::new(account_token)),
+ Err(_) => Err(Status::internal("internal error")),
+ }
}
async fn set_account(&self, request: Request<AccountToken>) -> ServiceResult<()> {
@@ -483,14 +426,11 @@ impl ManagementService for ManagementServiceImpl {
} else {
Some(account_token)
};
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))
- .and_then(|_| {
- rx.map(Response::new)
- .map_err(|_| Status::internal("internal error"))
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))?;
+ rx.await
+ .map(Response::new)
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_account_data(
@@ -499,40 +439,35 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<types::AccountData> {
log::debug!("get_account_data");
let account_token = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|rpc_future| {
- rpc_future
- .map(|account_data| {
- Response::new(types::AccountData {
- expiry: Some(types::Timestamp {
- seconds: account_data.expiry.timestamp(),
- nanos: 0,
- }),
- })
- })
- .map_err(|error: RestError| {
- log::error!(
- "Unable to get account data from API: {}",
- error.display_chain()
- );
- map_rest_account_error(error)
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ result
+ .map(|account_data| {
+ Response::new(types::AccountData {
+ expiry: Some(types::Timestamp {
+ seconds: account_data.expiry.timestamp(),
+ nanos: 0,
+ }),
+ })
+ })
+ .map_err(|error: RestError| {
+ log::error!(
+ "Unable to get account data from API: {}",
+ error.display_chain()
+ );
+ map_rest_account_error(error)
})
- .compat()
- .await
}
async fn get_account_history(&self, _: Request<()>) -> ServiceResult<types::AccountHistory> {
// TODO: this might be a stream
log::debug!("get_account_history");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(|history| Response::new(types::AccountHistory { token: history }))
- .compat()
- .await
}
async fn remove_account_from_history(
@@ -541,42 +476,36 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<()> {
log::debug!("remove_account_from_history");
let account_token = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn clear_account_history(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("clear_account_history");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_www_auth_token(&self, _: Request<()>) -> ServiceResult<String> {
log::debug!("get_www_auth_token");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|rpc_future| {
- rpc_future
- .map(Response::new)
- .map_err(|error: mullvad_rpc::rest::Error| {
- log::error!(
- "Unable to get account data from API: {}",
- error.display_chain()
- );
- map_rest_account_error(error)
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ result
+ .map(Response::new)
+ .map_err(|error: mullvad_rpc::rest::Error| {
+ log::error!(
+ "Unable to get account data from API: {}",
+ error.display_chain()
+ );
+ map_rest_account_error(error)
})
- .compat()
- .await
}
async fn submit_voucher(
@@ -585,38 +514,33 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<types::VoucherSubmission> {
log::debug!("submit_voucher");
let voucher = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|f| {
- f.map(|submission| {
- Response::new(types::VoucherSubmission {
- seconds_added: submission.time_added,
- new_expiry: Some(types::Timestamp {
- seconds: submission.new_expiry.timestamp(),
- nanos: 0,
- }),
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ result
+ .map(|submission| {
+ Response::new(types::VoucherSubmission {
+ seconds_added: submission.time_added,
+ new_expiry: Some(types::Timestamp {
+ seconds: submission.new_expiry.timestamp(),
+ nanos: 0,
+ }),
})
- .map_err(|e| match e {
- RestError::ApiError(StatusCode::BAD_REQUEST, message) => {
- match &message.as_str() {
- &mullvad_rpc::INVALID_VOUCHER => {
- Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE)
- }
-
- &mullvad_rpc::VOUCHER_USED => {
- Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE)
- }
+ })
+ .map_err(|e| match e {
+ RestError::ApiError(StatusCode::BAD_REQUEST, message) => match &message.as_str() {
+ &mullvad_rpc::INVALID_VOUCHER => {
+ Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE)
+ }
- _ => Status::internal("internal error"),
- }
+ &mullvad_rpc::VOUCHER_USED => {
+ Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE)
}
+
_ => Status::internal("internal error"),
- })
+ },
+ _ => Status::internal("internal error"),
})
- .compat()
- .await
}
// WireGuard key management
@@ -626,61 +550,55 @@ impl ManagementService for ManagementServiceImpl {
let interval = request.into_inner();
log::debug!("set_wireguard_rotation_interval({:?})", interval);
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(
tx,
Some(interval),
- ))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .map(Response::new)
- .compat()
- .await
+ ))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
+ .map(Response::new)
}
async fn reset_wireguard_rotation_interval(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("reset_wireguard_rotation_interval");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
async fn generate_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::KeygenEvent> {
// TODO: return error for TooManyKeys, GenerationFailure
// on success, simply return the new key or nil
log::debug!("generate_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(|event| Response::new(convert_wireguard_key_event(&event)))
- .compat()
- .await
}
async fn get_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::PublicKey> {
log::debug!("get_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .then(|response| match response {
- Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))),
- Ok(None) => Err(Status::not_found("no WireGuard key was found")),
- Err(e) => Err(e),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))?;
+ let response = rx.await.map_err(|_| Status::internal("internal error"));
+ match response {
+ Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))),
+ Ok(None) => Err(Status::not_found("no WireGuard key was found")),
+ Err(e) => Err(e),
+ }
}
async fn verify_wireguard_key(&self, _: Request<()>) -> ServiceResult<bool> {
log::debug!("verify_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
// Split tunneling
@@ -693,15 +611,12 @@ impl ManagementService for ManagementServiceImpl {
#[cfg(target_os = "linux")]
{
log::debug!("get_split_tunnel_processes");
- let (tx, rx) = sync::oneshot::channel();
- let pids = self
- .send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))?;
+ let pids = rx.await.map_err(|_| Status::internal("internal error"))?;
- let (tx, rx) = tokio02::sync::mpsc::unbounded_channel();
- tokio02::spawn(async move {
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ tokio::spawn(async move {
for pid in pids {
let _ = tx.send(Ok(pid));
}
@@ -711,7 +626,7 @@ impl ManagementService for ManagementServiceImpl {
}
#[cfg(not(target_os = "linux"))]
{
- let (_, rx) = tokio02::sync::mpsc::unbounded_channel();
+ let (_, rx) = tokio::sync::mpsc::unbounded_channel();
Ok(Response::new(rx))
}
}
@@ -720,12 +635,11 @@ impl ManagementService for ManagementServiceImpl {
async fn add_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> {
let pid = request.into_inner();
log::debug!("add_split_tunnel_process");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
async fn add_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> {
@@ -736,12 +650,11 @@ impl ManagementService for ManagementServiceImpl {
async fn remove_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> {
let pid = request.into_inner();
log::debug!("remove_split_tunnel_process");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
async fn remove_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> {
@@ -752,12 +665,11 @@ impl ManagementService for ManagementServiceImpl {
#[cfg(target_os = "linux")]
{
log::debug!("clear_split_tunnel_processes");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
{
@@ -768,15 +680,10 @@ impl ManagementService for ManagementServiceImpl {
impl ManagementServiceImpl {
/// Sends a command to the daemon and maps the error to an RPC error.
- fn send_command_to_daemon(
- &self,
- command: DaemonCommand,
- ) -> impl Future<Item = (), Error = Status> {
- future::result(
- self.daemon_tx
- .send(command)
- .map_err(|_| Status::internal("internal error")),
- )
+ fn send_command_to_daemon(&self, command: DaemonCommand) -> Result<(), Status> {
+ self.daemon_tx
+ .send(command)
+ .map_err(|_| Status::internal("internal error"))
}
}
@@ -1527,23 +1434,14 @@ fn convert_proto_location(location: types::RelayLocation) -> Constraint<Location
pub struct ManagementInterfaceServer {
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
socket_path: String,
- runtime: tokio02::runtime::Runtime,
server_abort_tx: triggered::Trigger,
server_join_handle: Option<
- tokio02::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>,
+ tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>,
>,
}
impl ManagementInterfaceServer {
- pub fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
- // TODO: don't spawn a tokio runtime here; make this function async
- let mut runtime = tokio02::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(1)
- .enable_all()
- .build()
- .map_err(Error::TokioRuntimeError)?;
-
+ pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default();
let socket_path = mullvad_paths::get_rpc_socket_path()
@@ -1556,15 +1454,15 @@ impl ManagementInterfaceServer {
daemon_tx: tunnel_tx,
subscriptions: subscriptions.clone(),
};
- let server_join_handle = runtime.spawn(mullvad_management_interface::spawn_rpc_server(
+ let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server(
server,
start_tx,
server_abort_rx,
));
if let Err(_) = start_rx.recv() {
- return Err(runtime
- .block_on(server_join_handle)
+ return Err(server_join_handle
+ .await
.expect("Failed to resolve quit handle future")
.map_err(Error::SetupError)
.unwrap_err());
@@ -1573,7 +1471,6 @@ impl ManagementInterfaceServer {
Ok(ManagementInterfaceServer {
subscriptions,
socket_path,
- runtime,
server_abort_tx,
server_join_handle: Some(server_join_handle),
})
@@ -1585,19 +1482,18 @@ impl ManagementInterfaceServer {
pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster {
ManagementInterfaceEventBroadcaster {
- runtime: self.runtime.handle().clone(),
subscriptions: self.subscriptions.clone(),
close_handle: self.server_abort_tx.clone(),
}
}
- /// Consumes the server and waits for it to finish. Returns an error if the server exited
- /// due to an error.
- pub fn wait(mut self) {
+ /// Consumes the server and waits for it to finish.
+ pub async fn run(self) {
if let Some(server_join_handle) = self.server_join_handle {
- if let Err(error) = self.runtime.block_on(server_join_handle) {
+ if let Err(error) = server_join_handle.await {
log::error!("Management server panic: {:?}", error);
}
+ log::info!("Management interface shut down");
}
}
}
@@ -1605,7 +1501,6 @@ impl ManagementInterfaceServer {
/// A handle that allows broadcasting messages to all subscribers of the management interface.
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
- runtime: tokio02::runtime::Handle,
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
close_handle: triggered::Trigger,
}
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 78e0292023..ce880a3d19 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -33,7 +33,7 @@ use talpid_types::{
net::{all_of_the_internet, openvpn::ProxySettings, wireguard, TransportProtocol, TunnelType},
ErrorExt,
};
-use tokio02::fs::File;
+use tokio::fs::File;
const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f";
const RELAYS_FILENAME: &str = "relays.json";
@@ -807,7 +807,7 @@ impl RelayListUpdater {
}
async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) {
- let mut check_interval = tokio02::time::interval(UPDATE_CHECK_INTERVAL).fuse();
+ let mut check_interval = tokio::time::interval(UPDATE_CHECK_INTERVAL).fuse();
let mut download_future = Box::pin(Fuse::terminated());
loop {
futures::select! {
@@ -918,7 +918,7 @@ impl RelayListUpdater {
.map_err(Error::OpenRelayCache)?;
let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?;
let mut slice: &[u8] = bytes.as_slice();
- let _ = tokio02::io::copy(&mut slice, &mut file)
+ let _ = tokio::io::copy(&mut slice, &mut file)
.await
.map_err(Error::WriteRelayCache)?;
Ok(())
diff --git a/mullvad-daemon/src/runtime.rs b/mullvad-daemon/src/runtime.rs
new file mode 100644
index 0000000000..3c60b133e8
--- /dev/null
+++ b/mullvad-daemon/src/runtime.rs
@@ -0,0 +1,11 @@
+use tokio::runtime;
+
+pub fn new_runtime_builder() -> runtime::Builder {
+ let mut builder = runtime::Builder::new();
+ builder
+ .threaded_scheduler()
+ .core_threads(4)
+ .max_threads(8)
+ .enable_all();
+ builder
+}
diff --git a/mullvad-daemon/src/system_service.rs b/mullvad-daemon/src/system_service.rs
index 8b40e8965c..cb27a443e8 100644
--- a/mullvad-daemon/src/system_service.rs
+++ b/mullvad-daemon/src/system_service.rs
@@ -1,5 +1,5 @@
use crate::cli;
-use mullvad_daemon::DaemonShutdownHandle;
+use mullvad_daemon::{runtime::new_runtime_builder, DaemonShutdownHandle};
use std::{
env,
ffi::OsString,
@@ -102,7 +102,20 @@ fn run_service() -> Result<(), String> {
let clean_shutdown = Arc::new(AtomicBool::new(false));
let log_dir = crate::get_log_dir(cli::get_config()).expect("Log dir should be available here");
- let result = crate::create_daemon(log_dir).and_then(|daemon| {
+
+ let runtime = new_runtime_builder().build();
+ let mut runtime = match runtime {
+ Err(error) => {
+ persistent_service_status
+ .set_stopped(ServiceExitCode::ServiceSpecific(1))
+ .unwrap();
+ return Err(error.display_chain());
+ }
+ Ok(runtime) => runtime,
+ };
+
+ let result = runtime.block_on(crate::create_daemon(log_dir));
+ let result = if let Ok(daemon) = result {
let shutdown_handle = daemon.shutdown_handle();
// Register monitor that translates `ServiceControl` to Daemon events
@@ -115,8 +128,12 @@ fn run_service() -> Result<(), String> {
persistent_service_status.set_running().unwrap();
- daemon.run().map_err(|e| e.display_chain())
- });
+ runtime
+ .block_on(daemon.run())
+ .map_err(|e| e.display_chain())
+ } else {
+ result.map(|_| ())
+ };
let exit_code = match result {
Ok(()) => {
diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs
index 6092c5b0de..183082f457 100644
--- a/mullvad-daemon/src/version_check.rs
+++ b/mullvad-daemon/src/version_check.rs
@@ -17,7 +17,7 @@ use std::{
};
use talpid_core::mpsc::Sender;
use talpid_types::ErrorExt;
-use tokio02::fs::File;
+use tokio::fs::File;
const VERSION_INFO_FILENAME: &str = "version-info.json";
@@ -165,7 +165,7 @@ impl VersionUpdater {
let mut buf = serde_json::to_vec_pretty(&cached_app_version).map_err(Error::Serialize)?;
let mut read_buf: &[u8] = buf.as_mut();
- let _ = tokio02::io::copy(&mut read_buf, &mut file)
+ let _ = tokio::io::copy(&mut read_buf, &mut file)
.await
.map_err(Error::WriteVersionCache)?;
Ok(())
@@ -218,7 +218,7 @@ impl VersionUpdater {
pub async fn run(mut self) {
let mut rx = self.rx.take().unwrap().fuse();
- let next_delay = || tokio02::time::delay_for(UPDATE_CHECK_INTERVAL).fuse();
+ let next_delay = || tokio::time::delay_for(UPDATE_CHECK_INTERVAL).fuse();
let mut check_delay = next_delay();
let mut version_check = futures::future::Fuse::terminated();
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index 2ebd5fae50..189d1f5a6a 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -19,7 +19,6 @@ pub use talpid_types::net::wireguard::{
ConnectionConfig, PrivateKey, TunnelConfig, TunnelParameters,
};
use talpid_types::ErrorExt;
-use tokio_timer;
/// Default automatic key rotation
const DEFAULT_AUTOMATIC_KEY_ROTATION: Duration = Duration::from_secs(7 * 24 * 60 * 60);
@@ -35,8 +34,6 @@ pub enum Error {
RestError(#[error(source)] mullvad_rpc::rest::Error),
#[error(display = "Account already has maximum number of keys")]
TooManyKeys,
- #[error(display = "Failed to create rotation timer")]
- RotationScheduleError(#[error(source)] tokio_timer::TimerError),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -63,16 +60,19 @@ impl KeyManager {
/// Reset key rotation, cancelling the current one and starting a new one for the specified
/// account
- pub fn reset_rotation(
+ pub async fn reset_rotation(
&mut self,
account_history: &mut AccountHistory,
account_token: AccountToken,
) {
match account_history
.get(&account_token)
+ .await
.map(|entry| entry.map(|entry| entry.wireguard.map(|wg| wg.get_public_key())))
{
- Ok(Some(Some(public_key))) => self.run_automatic_rotation(account_token, public_key),
+ Ok(Some(Some(public_key))) => {
+ self.run_automatic_rotation(account_token, public_key).await
+ }
Ok(Some(None)) => {
log::error!("reset_rotation: failed to obtain public key for account entry.")
}
@@ -84,7 +84,7 @@ impl KeyManager {
/// Update automatic key rotation interval
/// Passing `None` for the interval will use the default value.
/// A duration of `0` disables automatic key rotation.
- pub fn set_rotation_interval(
+ pub async fn set_rotation_interval(
&mut self,
account_history: &mut AccountHistory,
account_token: AccountToken,
@@ -93,7 +93,7 @@ impl KeyManager {
self.auto_rotation_interval =
auto_rotation_interval.unwrap_or(DEFAULT_AUTOMATIC_KEY_ROTATION);
- self.reset_rotation(account_history, account_token);
+ self.reset_rotation(account_history, account_token).await;
}
/// Stop current key generation
@@ -104,19 +104,18 @@ impl KeyManager {
}
/// Generate a new private key
- pub fn generate_key_sync(&mut self, account: AccountToken) -> Result<WireguardData> {
+ pub async fn generate_key_sync(&mut self, account: AccountToken) -> Result<WireguardData> {
self.reset();
let private_key = PrivateKey::new_from_random();
- self.http_handle
- .service()
- .block_on(self.push_future_generator(account, private_key, None)())
+ self.push_future_generator(account, private_key, None)()
+ .await
.map_err(Self::map_rpc_error)
}
/// Replace a key for an account synchronously
- pub fn replace_key(
+ pub async fn replace_key(
&mut self,
account: AccountToken,
old_key: PublicKey,
@@ -124,12 +123,7 @@ impl KeyManager {
self.reset();
let new_key = PrivateKey::new_from_random();
- self.http_handle.service().block_on(Self::replace_key_rpc(
- self.http_handle.clone(),
- account,
- old_key,
- new_key,
- ))
+ Self::replace_key_rpc(self.http_handle.clone(), account, old_key, new_key).await
}
/// Verifies whether a key is valid or not.
@@ -154,7 +148,7 @@ impl KeyManager {
/// Generate a new private key asynchronously. The new keys will be sent to the daemon channel.
- pub fn generate_key_async(&mut self, account: AccountToken, timeout: Option<Duration>) {
+ pub async fn generate_key_async(&mut self, account: AccountToken, timeout: Option<Duration>) {
self.reset();
let private_key = PrivateKey::new_from_random();
@@ -222,7 +216,7 @@ impl KeyManager {
};
- self.http_handle.service().spawn(Box::pin(future));
+ tokio::spawn(Box::pin(future));
self.current_job = Some(abort_handle);
}
@@ -291,7 +285,7 @@ impl KeyManager {
}
async fn key_rotation_timer(key: PublicKey, rotation_interval_secs: u64) {
- let mut interval = tokio02::time::interval(KEY_CHECK_INTERVAL);
+ let mut interval = tokio::time::interval(KEY_CHECK_INTERVAL);
loop {
interval.tick().await;
if (Utc::now().signed_duration_since(key.created)).num_seconds() as u64
@@ -342,7 +336,7 @@ impl KeyManager {
rotation_interval_secs: u64,
account_token: AccountToken,
) {
- let mut interval = tokio02::time::interval_at(
+ let mut interval = tokio::time::interval_at(
(Instant::now() + AUTOMATIC_ROTATION_RETRY_DELAY).into(),
AUTOMATIC_ROTATION_RETRY_DELAY,
);
@@ -375,7 +369,7 @@ impl KeyManager {
}
}
- fn run_automatic_rotation(&mut self, account_token: AccountToken, public_key: PublicKey) {
+ async fn run_automatic_rotation(&mut self, account_token: AccountToken, public_key: PublicKey) {
self.stop_automatic_rotation();
if self.auto_rotation_interval == Duration::new(0, 0) {
@@ -394,7 +388,7 @@ impl KeyManager {
);
let (request, abort_handle) = abortable(Box::pin(fut));
- self.http_handle.service().spawn(request);
+ tokio::spawn(request);
self.abort_scheduler_tx = Some(abort_handle);
}
diff --git a/mullvad-jni/Cargo.toml b/mullvad-jni/Cargo.toml
index b65909d669..d30a6502f7 100644
--- a/mullvad-jni/Cargo.toml
+++ b/mullvad-jni/Cargo.toml
@@ -12,16 +12,15 @@ crate_type = ["cdylib"]
[target.'cfg(target_os = "android")'.dependencies]
err-derive = "0.2.1"
-futures = "0.1"
+futures = "0.3"
ipnetwork = "0.16"
jnix = { version = "0.2.3", features = ["derive"] }
-jsonrpc-client-core = "0.5"
-jsonrpc-core = "8"
lazy_static = "1"
log = "0.4"
log-panics = "2"
nix = "0.17"
rand = "0.7"
+tokio = "0.2"
mullvad-daemon = { path = "../mullvad-daemon" }
mullvad-paths = { path = "../mullvad-paths" }
diff --git a/mullvad-jni/src/daemon_interface.rs b/mullvad-jni/src/daemon_interface.rs
index c8294fa744..d8a3b980cf 100644
--- a/mullvad-jni/src/daemon_interface.rs
+++ b/mullvad-jni/src/daemon_interface.rs
@@ -1,4 +1,4 @@
-use futures::{sync::oneshot, Future};
+use futures::{channel::oneshot, executor::block_on};
use mullvad_daemon::{DaemonCommand, DaemonCommandSender};
use mullvad_types::{
account::{AccountData, VoucherSubmission},
@@ -42,9 +42,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Secured))?;
- rx.wait().map_err(|_| Error::NoResponse)?.unwrap();
-
- Ok(())
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn create_new_account(&self) -> Result<String> {
@@ -52,7 +50,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::CreateNewAccount(tx))?;
- rx.wait()
+ block_on(rx)
.map_err(|_| Error::NoResponse)?
.map_err(Error::RpcError)
}
@@ -62,9 +60,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?;
- rx.wait().map_err(|_| Error::NoResponse)?.unwrap();
-
- Ok(())
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn generate_wireguard_key(&self) -> Result<KeygenEvent> {
@@ -72,7 +68,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GenerateWireguardKey(tx))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn get_account_data(&self, account_token: String) -> Result<AccountData> {
@@ -80,9 +76,8 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetAccountData(tx, account_token))?;
- rx.wait()
+ block_on(rx)
.map_err(|_| Error::NoResponse)?
- .wait()
.map_err(Error::RpcError)
}
@@ -91,7 +86,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetAccountHistory(tx))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn get_www_auth_token(&self) -> Result<String> {
@@ -99,9 +94,8 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetWwwAuthToken(tx))?;
- rx.wait()
+ block_on(rx)
.map_err(|_| Error::NoResponse)?
- .wait()
.map_err(Error::RpcError)
}
@@ -110,7 +104,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetCurrentLocation(tx))?;
- Ok(rx.wait().map_err(|_| Error::NoResponse)?)
+ Ok(block_on(rx).map_err(|_| Error::NoResponse)?)
}
pub fn get_current_version(&self) -> Result<String> {
@@ -118,7 +112,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetCurrentVersion(tx))?;
- Ok(rx.wait().map_err(|_| Error::NoResponse)?)
+ Ok(block_on(rx).map_err(|_| Error::NoResponse)?)
}
pub fn get_relay_locations(&self) -> Result<RelayList> {
@@ -126,7 +120,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetRelayLocations(tx))?;
- Ok(rx.wait().map_err(|_| Error::NoResponse)?)
+ Ok(block_on(rx).map_err(|_| Error::NoResponse)?)
}
pub fn get_settings(&self) -> Result<Settings> {
@@ -134,7 +128,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetSettings(tx))?;
- Ok(rx.wait().map_err(|_| Error::NoResponse)?)
+ Ok(block_on(rx).map_err(|_| Error::NoResponse)?)
}
pub fn get_state(&self) -> Result<TunnelState> {
@@ -142,7 +136,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetState(tx))?;
- Ok(rx.wait().map_err(|_| Error::NoResponse)?)
+ Ok(block_on(rx).map_err(|_| Error::NoResponse)?)
}
pub fn get_version_info(&self) -> Result<AppVersionInfo> {
@@ -150,7 +144,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetVersionInfo(tx))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn reconnect(&self) -> Result<()> {
@@ -164,14 +158,14 @@ impl DaemonInterface {
self.send_command(DaemonCommand::GetWireguardKey(tx))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn verify_wireguard_key(&self) -> Result<bool> {
let (tx, rx) = oneshot::channel();
self.send_command(DaemonCommand::VerifyWireguardKey(tx))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn set_account(&self, account_token: Option<String>) -> Result<()> {
@@ -179,7 +173,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetAccount(tx, account_token))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn set_allow_lan(&self, allow_lan: bool) -> Result<()> {
@@ -187,7 +181,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetAllowLan(tx, allow_lan))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn set_auto_connect(&self, auto_connect: bool) -> Result<()> {
@@ -195,7 +189,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetAutoConnect(tx, auto_connect))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn set_wireguard_mtu(&self, wireguard_mtu: Option<u16>) -> Result<()> {
@@ -203,7 +197,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SetWireguardMtu(tx, wireguard_mtu))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
pub fn shutdown(&self) -> Result<()> {
@@ -215,9 +209,8 @@ impl DaemonInterface {
self.send_command(DaemonCommand::SubmitVoucher(tx, voucher))?;
- rx.wait()
+ block_on(rx)
.map_err(|_| Error::NoResponse)?
- .wait()
.map_err(Error::RpcError)
}
@@ -226,7 +219,7 @@ impl DaemonInterface {
self.send_command(DaemonCommand::UpdateRelaySettings(tx, update))?;
- rx.wait().map_err(|_| Error::NoResponse)
+ block_on(rx).map_err(|_| Error::NoResponse)
}
fn send_command(&self, command: DaemonCommand) -> Result<()> {
diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs
index 78cee57d1e..1abd4d093d 100644
--- a/mullvad-jni/src/lib.rs
+++ b/mullvad-jni/src/lib.rs
@@ -17,10 +17,13 @@ use jnix::{
},
FromJava, IntoJava, JnixEnv,
};
-use mullvad_daemon::{exception_logging, logging, version, Daemon, DaemonCommandChannel};
+use mullvad_daemon::{
+ exception_logging, logging, runtime::new_runtime_builder, version, Daemon, DaemonCommandChannel,
+};
use mullvad_rpc::{rest::Error as RestError, StatusCode};
use mullvad_types::account::{AccountData, VoucherSubmission};
use std::{
+ io,
path::{Path, PathBuf},
ptr,
sync::{mpsc, Arc, Once},
@@ -46,6 +49,9 @@ pub enum Error {
#[error(display = "Failed to initialize the mullvad daemon")]
InitializeDaemon(#[error(source)] mullvad_daemon::Error),
+ #[error(display = "Failed to spawn the tokio runtime")]
+ InitializeTokioRuntime(#[error(source)] io::Error),
+
#[error(display = "Failed to spawn the JNI event listener")]
SpawnJniEventListener(#[error(source)] jni_event_listener::Error),
}
@@ -202,9 +208,13 @@ fn spawn_daemon(
.map_err(Error::CreateGlobalReference)?;
let (tx, rx) = mpsc::channel();
+ let mut runtime = new_runtime_builder()
+ .build()
+ .map_err(Error::InitializeTokioRuntime)?;
+
thread::spawn(move || {
let jvm = android_context.jvm.clone();
- let daemon = Daemon::start(
+ let daemon = runtime.block_on(Daemon::start(
Some(resource_dir.clone()),
resource_dir.clone(),
resource_dir,
@@ -212,12 +222,12 @@ fn spawn_daemon(
listener,
command_channel,
android_context,
- );
+ ));
match daemon {
Ok(daemon) => {
let _ = tx.send(Ok(()));
- match daemon.run() {
+ match runtime.block_on(daemon.run()) {
Ok(()) => log::info!("Mullvad daemon has stopped"),
Err(error) => log::error!("{}", error.display_chain()),
}
diff --git a/mullvad-problem-report/Cargo.toml b/mullvad-problem-report/Cargo.toml
index 2642fa6acb..68925c5f62 100644
--- a/mullvad-problem-report/Cargo.toml
+++ b/mullvad-problem-report/Cargo.toml
@@ -16,6 +16,7 @@ futures01 = { version = "0.1", package = "futures" }
lazy_static = "1.0"
regex = "1.0"
uuid = { version = "0.7", features = ["v4"] }
+tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] }
mullvad-paths = { path = "../mullvad-paths" }
mullvad-rpc = { path = "../mullvad-rpc" }
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index 019dd82f62..4265d70268 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -67,6 +67,9 @@ pub enum Error {
#[error(display = "Error during RPC call")]
SendRpcError(#[error(source)] mullvad_rpc::rest::Error),
+
+ #[error(display = "Unable to spawn Tokio runtime")]
+ CreateRuntime(#[error(source)] io::Error),
}
/// These are errors that can happen during problem report collection.
@@ -263,8 +266,14 @@ pub fn send_problem_report(
let metadata =
ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
- let mut rpc_manager =
- mullvad_rpc::MullvadRpcRuntime::new().map_err(Error::CreateRpcClientError)?;
+ let runtime = tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .map_err(Error::CreateRuntime)?;
+
+ let mut rpc_manager = mullvad_rpc::MullvadRpcRuntime::new(runtime.handle().clone())
+ .map_err(Error::CreateRpcClientError)?;
let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle());
rpc_client
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml
index e06985e34b..18fa0994e7 100644
--- a/mullvad-rpc/Cargo.toml
+++ b/mullvad-rpc/Cargo.toml
@@ -20,7 +20,7 @@ regex = "1"
serde = "1"
serde_json = "1.0"
hyper-rustls = "0.20"
-tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] }
+tokio = { version = "0.2", features = [ "macros", "time", "rt-threaded", "net", "io-std", "io-driver" ] }
tokio-rustls = "0.13"
urlencoding = "1"
webpki = { version = "0.21", features = [] }
diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs
index 27601d4b21..8213d736da 100644
--- a/mullvad-rpc/src/bin/relay_list.rs
+++ b/mullvad-rpc/src/bin/relay_list.rs
@@ -4,12 +4,16 @@ use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy};
use std::process;
use talpid_types::ErrorExt;
-fn main() {
- let mut runtime = MullvadRpcRuntime::new().expect("Failed to load runtime");
+#[tokio::main]
+async fn main() {
+ let mut runtime =
+ MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime");
- let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()).relay_list();
+ let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle())
+ .relay_list()
+ .await;
- let relay_list = match runtime.runtime().block_on(relay_list_request) {
+ let relay_list = match relay_list_request {
Ok(relay_list) => relay_list,
Err(RestError::TimeoutError(_)) => {
eprintln!("Request timed out");
diff --git a/mullvad-rpc/src/event_loop.rs b/mullvad-rpc/src/event_loop.rs
deleted file mode 100644
index ea93de2493..0000000000
--- a/mullvad-rpc/src/event_loop.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use tokio::runtime::{Builder, Runtime};
-
-/// Creates a new tokio runtime to be exclusively used for HTTP requests.
-// FIXME: Remove this once the daemon has migrated.
-pub fn create_runtime() -> Result<Runtime, crate::Error> {
- let runtime = Builder::new()
- .threaded_scheduler()
- .core_threads(2)
- .enable_all()
- .thread_name("mullvad-rpc-event-loop")
- .build();
-
- runtime.map_err(crate::Error::TokioRuntimeError)
-}
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index 796cb56553..5e2e6f80fa 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -16,7 +16,6 @@ use std::{
use talpid_types::net::wireguard;
-pub mod event_loop;
pub mod rest;
mod cached_dns_resolver;
@@ -44,29 +43,27 @@ const API_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(193, 138, 218, 78));
pub struct MullvadRpcRuntime {
cached_dns_resolver: CachedDnsResolver,
https_connector: HttpsConnectorWithSni,
- runtime: tokio::runtime::Runtime,
+ handle: tokio::runtime::Handle,
}
#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Failed to construct a rest client")]
RestError(#[error(source)] rest::Error),
- #[error(display = "Failed to spawn a tokio runtime")]
- TokioRuntimeError(#[error(source)] tokio::io::Error),
}
impl MullvadRpcRuntime {
/// Create a new `MullvadRpcRuntime`.
- pub fn new() -> Result<Self, Error> {
+ pub fn new(handle: tokio::runtime::Handle) -> Result<Self, Error> {
Ok(MullvadRpcRuntime {
cached_dns_resolver: CachedDnsResolver::new(API_HOST.to_owned(), None, API_IP),
- runtime: event_loop::create_runtime()?,
https_connector: HttpsConnectorWithSni::new(),
+ handle,
})
}
/// Create a new `MullvadRpcRuntime` using the specified cache directory.
- pub fn with_cache_dir(cache_dir: &Path) -> Result<Self, Error> {
+ pub fn with_cache_dir(handle: tokio::runtime::Handle, cache_dir: &Path) -> Result<Self, Error> {
let cache_file = cache_dir.join(API_IP_CACHE_FILENAME);
let cached_dns_resolver =
CachedDnsResolver::new(API_HOST.to_owned(), Some(cache_file), API_IP);
@@ -75,8 +72,8 @@ impl MullvadRpcRuntime {
Ok(MullvadRpcRuntime {
cached_dns_resolver,
- runtime: event_loop::create_runtime()?,
https_connector,
+ handle,
})
}
@@ -85,9 +82,9 @@ impl MullvadRpcRuntime {
let mut https_connector = self.https_connector.clone();
https_connector.set_sni_hostname(sni_hostname);
- let service = rest::RequestService::new(https_connector, self.runtime.handle().clone());
+ let service = rest::RequestService::new(https_connector, self.handle.clone());
let handle = service.handle();
- self.runtime.spawn(service.into_future());
+ self.handle.spawn(service.into_future());
handle
}
@@ -106,17 +103,8 @@ impl MullvadRpcRuntime {
self.new_request_service(None)
}
- pub fn runtime(&mut self) -> &mut tokio::runtime::Runtime {
- &mut self.runtime
- }
-}
-
-impl Drop for MullvadRpcRuntime {
- fn drop(&mut self) {
- if let Ok(runtime) = event_loop::create_runtime() {
- let old_runtime = std::mem::replace(&mut self.runtime, runtime);
- old_runtime.shutdown_timeout(std::time::Duration::from_secs(1));
- }
+ pub fn handle(&mut self) -> &mut tokio::runtime::Handle {
+ &mut self.handle
}
}
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 205f350ea5..0e8994b1c3 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -176,12 +176,10 @@ pub struct RequestServiceHandle {
impl RequestServiceHandle {
/// Resets the corresponding RequestService, dropping all in-flight requests.
- pub fn reset(&self) {
+ pub async fn reset(&self) {
let mut tx = self.tx.clone();
- self.handle.block_on(async move {
- let _ = tx.send(RequestCommand::Reset).await;
- });
+ let _ = tx.send(RequestCommand::Reset).await;
}
/// Submits a `RestRequest` for exectuion to the request service.
@@ -216,13 +214,6 @@ impl RequestServiceHandle {
pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) {
let _ = self.handle.spawn(future);
}
-
- pub fn block_on<T: Send + 'static>(
- &self,
- future: impl Future<Output = T> + Send + 'static,
- ) -> T {
- self.handle.block_on(future)
- }
}
#[derive(Debug)]
diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml
index f135e52b26..5222c5d26d 100644
--- a/talpid-core/Cargo.toml
+++ b/talpid-core/Cargo.toml
@@ -24,12 +24,10 @@ parking_lot = "0.9"
regex = "1.1.0"
shell-escape = "0.1"
talpid-types = { path = "../talpid-types" }
-tokio-core = "0.1"
-tokio-executor = "0.1"
uuid = { version = "0.7", features = ["v4"] }
zeroize = "1"
chrono = "0.4"
-tokio02 = { package = "tokio", version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream"] }
+tokio = { version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream"] }
rand = "0.7"
@@ -42,7 +40,6 @@ prost = "0.6"
[target.'cfg(unix)'.dependencies]
nix = "0.17"
-tokio-io = "0.1"
[target.'cfg(target_os = "android")'.dependencies]
diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs
index bf992117bd..cc1fb1ed20 100644
--- a/talpid-core/src/future_retry.rs
+++ b/talpid-core/src/future_retry.rs
@@ -32,10 +32,10 @@ pub async fn retry_future_with_backoff<
async fn sleep(mut delay: Duration) {
while delay > MAX_SINGLE_DELAY {
delay -= MAX_SINGLE_DELAY;
- tokio02::time::delay_for(MAX_SINGLE_DELAY).await;
+ tokio::time::delay_for(MAX_SINGLE_DELAY).await;
}
- tokio02::time::delay_for(delay).await;
+ tokio::time::delay_for(delay).await;
}
/// Provides an exponential back-off timer to delay the next retry of a failed operation.
diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs
index 3863415cfb..fefe2556cf 100644
--- a/talpid-core/src/offline/android.rs
+++ b/talpid-core/src/offline/android.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use jnix::{
jni::{
self,
@@ -96,7 +96,7 @@ impl MonitorHandle {
})
}
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
match self.get_is_connected() {
Ok(is_connected) => !is_connected,
Err(error) => {
@@ -205,7 +205,7 @@ unsafe fn get_sender_from_address(address: jlong) -> Box<Weak<UnboundedSender<Tu
Box::from_raw(address as *mut Weak<UnboundedSender<TunnelCommand>>)
}
-pub fn spawn_monitor(
+pub async fn spawn_monitor(
sender: Weak<UnboundedSender<TunnelCommand>>,
android_context: AndroidContext,
) -> Result<MonitorHandle, Error> {
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs
index 0b6526f0e5..5e4a4fa7e2 100644
--- a/talpid-core/src/offline/linux.rs
+++ b/talpid-core/src/offline/linux.rs
@@ -1,6 +1,8 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures::{StreamExt, TryStreamExt};
-use futures01::sync::mpsc::UnboundedSender;
+use futures::{
+ channel::{mpsc::UnboundedSender, oneshot},
+ FutureExt, StreamExt, TryStreamExt,
+};
use netlink_packet_route::{
constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP},
rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla},
@@ -15,8 +17,6 @@ use std::{collections::BTreeSet, io, sync::Weak};
pub type Result<T> = std::result::Result<T, Error>;
-const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop";
-
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
pub enum Error {
@@ -47,12 +47,12 @@ pub enum Error {
pub struct MonitorHandle {
handle: rtnetlink::Handle,
- runtime: tokio02::runtime::Runtime,
+ _stop_connection_tx: oneshot::Sender<()>,
}
impl MonitorHandle {
- pub fn is_offline(&mut self) -> bool {
- match self.runtime.block_on(check_offline_state(&self.handle)) {
+ pub async fn is_offline(&mut self) -> bool {
+ match check_offline_state(&self.handle).await {
Ok(is_offline) => is_offline,
Err(err) => {
log::error!(
@@ -65,41 +65,36 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
- let mut runtime = tokio02::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(1)
- .enable_all()
- .thread_name(EVENT_LOOP_THREAD_NAME)
- .build()
- .map_err(Error::EventLoopError)?;
-
- let (connection, handle, mut messages) = runtime.block_on(async move {
- let (mut connection, handle, messages) =
- rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
+pub async fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
+ let (mut connection, handle, mut messages) =
+ rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
- let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY;
- let addr = SocketAddr::new(0, mgroup_flags);
+ let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY;
+ let addr = SocketAddr::new(0, mgroup_flags);
- connection
- .socket_mut()
- .bind(&addr)
- .map_err(Error::BindError)?;
+ connection
+ .socket_mut()
+ .bind(&addr)
+ .map_err(Error::BindError)?;
- Ok((connection, handle, messages))
- })?;
+ let (stop_connection_tx, stop_rx) = oneshot::channel();
- // Connection will be closed once the runtime is dropped
- let _ = runtime.spawn(connection);
- let mut is_offline = runtime.block_on(check_offline_state(&handle))?;
+ // Connection will be closed once the channel is dropped
+ tokio::spawn(async {
+ futures::select! {
+ _ = connection.fuse() => (),
+ _ = stop_rx.fuse() => (),
+ }
+ });
+ let mut is_offline = check_offline_state(&handle).await?;
let monitor_handle = MonitorHandle {
handle: handle.clone(),
- runtime,
+ _stop_connection_tx: stop_connection_tx,
};
- let _ = monitor_handle.runtime.spawn(async move {
+ tokio::spawn(async move {
while let Some(_new_message) = messages.next().await {
match sender.upgrade() {
Some(sender) => {
diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs
index 602da6cda9..2569fa06c6 100644
--- a/talpid-core/src/offline/macos.rs
+++ b/talpid-core/src/offline/macos.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use std::{
net::{Ipv4Addr, SocketAddr},
sync::{
@@ -44,7 +44,7 @@ pub struct MonitorHandle;
impl MonitorHandle {
/// Host is considered to be offline if the IPv4 internet is considered to be unreachable by the
/// given reachability flags *or* there are no active physical interfaces.
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
let reachability = SCNetworkReachability::from(ipv4_internet());
let store = SCDynamicStoreBuilder::new("talpid-offline-check").build();
reachability
@@ -54,7 +54,9 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
+pub async fn spawn_monitor(
+ sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<MonitorHandle, Error> {
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let mut reachability_ref = SCNetworkReachability::from(ipv4_internet());
diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs
index 5cda6290a3..9a6ce1dae5 100644
--- a/talpid-core/src/offline/mod.rs
+++ b/talpid-core/src/offline/mod.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use std::sync::Weak;
#[cfg(target_os = "android")]
use talpid_types::android::AndroidContext;
@@ -25,18 +25,21 @@ pub use self::imp::Error;
pub struct MonitorHandle(imp::MonitorHandle);
impl MonitorHandle {
- pub fn is_offline(&mut self) -> bool {
- self.0.is_offline()
+ pub async fn is_offline(&mut self) -> bool {
+ self.0.is_offline().await
}
}
-pub fn spawn_monitor(
+pub async fn spawn_monitor(
sender: Weak<UnboundedSender<TunnelCommand>>,
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<MonitorHandle, Error> {
- Ok(MonitorHandle(imp::spawn_monitor(
- sender,
- #[cfg(target_os = "android")]
- android_context,
- )?))
+ Ok(MonitorHandle(
+ imp::spawn_monitor(
+ sender,
+ #[cfg(target_os = "android")]
+ android_context,
+ )
+ .await?,
+ ))
}
diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs
index c7a86e4073..d9e5c7782d 100644
--- a/talpid-core/src/offline/windows.rs
+++ b/talpid-core/src/offline/windows.rs
@@ -1,5 +1,5 @@
use crate::{logging::windows::log_sink, tunnel_state_machine::TunnelCommand, winnet};
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use parking_lot::Mutex;
use std::{
ffi::c_void,
@@ -203,7 +203,7 @@ impl BroadcastListener {
state.apply_change(StateChange::NetworkConnectivity(connectivity));
}
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
let state = self._system_state.lock();
state.is_offline_currently().unwrap_or(false)
}
@@ -264,7 +264,9 @@ impl SystemState {
pub type MonitorHandle = BroadcastListener;
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
+pub async fn spawn_monitor(
+ sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<MonitorHandle, Error> {
BroadcastListener::start(sender)
}
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs
index ba5ffb66a6..566e1c027b 100644
--- a/talpid-core/src/routing/linux.rs
+++ b/talpid-core/src/routing/linux.rs
@@ -124,7 +124,7 @@ impl RouteManagerImpl {
.bind(&addr)
.map_err(Error::BindError)?;
- tokio02::spawn(connection);
+ tokio::spawn(connection);
let iface_map = Self::initialize_link_map(&handle).await?;
let split_table_id = Self::initialize_exclusions_table().await?;
@@ -903,7 +903,7 @@ mod test {
/// Tests if dropping inside a tokio runtime panics
#[test]
fn test_drop_in_executor() {
- let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime");
+ let mut runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime");
runtime.block_on(async {
let manager = RouteManagerImpl::new(HashSet::new())
.await
@@ -915,7 +915,7 @@ mod test {
/// Tests if dropping outside a runtime panics
#[test]
fn test_drop() {
- let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime");
+ let mut runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime");
let manager = runtime.block_on(async {
RouteManagerImpl::new(HashSet::new())
.await
diff --git a/talpid-core/src/routing/macos.rs b/talpid-core/src/routing/macos.rs
index 6a4512d144..6304dcc24f 100644
--- a/talpid-core/src/routing/macos.rs
+++ b/talpid-core/src/routing/macos.rs
@@ -12,7 +12,7 @@ use std::{
net::IpAddr,
process::{ExitStatus, Stdio},
};
-use tokio02::{io::AsyncBufReadExt, process::Command};
+use tokio::{io::AsyncBufReadExt, process::Command};
pub type Result<T> = std::result::Result<T, Error>;
@@ -310,7 +310,7 @@ async fn listen_for_default_route_changes() -> Result<impl Stream<Item = std::io
let mut process = cmd.spawn().map_err(Error::FailedToMonitorRoutes)?;
- let reader = tokio02::io::BufReader::new(process.stdout.take().unwrap());
+ let reader = tokio::io::BufReader::new(process.stdout.take().unwrap());
let lines = reader.lines();
// route -n monitor will produce netlink messages in the following format
diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs
index 38896a4392..a59ee6ed27 100644
--- a/talpid-core/src/routing/unix.rs
+++ b/talpid-core/src/routing/unix.rs
@@ -7,7 +7,7 @@ use futures::channel::{
mpsc::{self, UnboundedSender},
oneshot,
};
-use std::collections::HashSet;
+use std::{collections::HashSet, io};
use talpid_types::ErrorExt;
#[cfg(target_os = "linux")]
@@ -39,6 +39,9 @@ pub enum Error {
/// Failed to spawn route manager future
#[error(display = "Failed to spawn route manager on the provided executor")]
FailedToSpawnManager,
+ /// Failed to spawn route manager runtime
+ #[error(display = "Failed to spawn route manager runtime")]
+ FailedToSpawnRuntime(#[error(source)] io::Error),
/// Attempt to use route manager that has been dropped
#[error(display = "Cannot send message to route manager since it is down")]
RouteManagerDown,
@@ -69,7 +72,7 @@ pub enum RouteManagerCommand {
/// the route will be adjusted dynamically when the default route changes.
pub struct RouteManager {
manage_tx: Option<UnboundedSender<RouteManagerCommand>>,
- runtime: tokio02::runtime::Runtime,
+ runtime: tokio::runtime::Runtime,
}
impl RouteManager {
@@ -78,7 +81,12 @@ impl RouteManager {
/// routes.
pub fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self, Error> {
let (manage_tx, manage_rx) = mpsc::unbounded();
- let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to spawn runtime");
+ let mut runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(1)
+ .max_threads(1)
+ .enable_all()
+ .build()?;
let manager = runtime.block_on(imp::RouteManagerImpl::new(required_routes))?;
runtime.handle().spawn(manager.run(manage_rx));
diff --git a/talpid-core/src/tunnel/openvpn.rs b/talpid-core/src/tunnel/openvpn.rs
index 5dc14ba8d9..02ff92f4e7 100644
--- a/talpid-core/src/tunnel/openvpn.rs
+++ b/talpid-core/src/tunnel/openvpn.rs
@@ -21,7 +21,7 @@ use std::{
time::Duration,
};
use talpid_types::net::openvpn;
-use tokio02::task;
+use tokio::task;
#[cfg(target_os = "linux")]
use which;
@@ -132,7 +132,7 @@ pub struct OpenVpnMonitor<C: OpenVpnBuilder = OpenVpnCommand> {
/// Keep the 'TempFile' for the proxy user-pass file in the struct, so it's removed on drop.
_proxy_auth_file: Option<mktemp::TempFile>,
- runtime: tokio02::runtime::Runtime,
+ runtime: tokio::runtime::Runtime,
event_server_abort_tx: triggered::Trigger,
server_join_handle: Option<task::JoinHandle<std::result::Result<(), event_server::Error>>>,
}
@@ -239,9 +239,10 @@ impl<C: OpenVpnBuilder + 'static> OpenVpnMonitor<C> {
let (event_server_abort_tx, event_server_abort_rx) = triggered::trigger();
- let mut runtime = tokio02::runtime::Builder::new()
+ let mut runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
+ .max_threads(1)
.enable_all()
.build()
.map_err(Error::RuntimeError)?;
@@ -619,7 +620,7 @@ mod event_server {
pin::Pin,
task::{Context, Poll},
};
- use tokio02::io::{AsyncRead, AsyncWrite};
+ use tokio::io::{AsyncRead, AsyncWrite};
use tonic::{
self,
transport::{server::Connected, Server},
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index b0222ade67..4d96a2fe3c 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -23,25 +23,23 @@ use crate::{
tunnel::tun_provider::TunProvider,
};
-use futures01::{
- sync::{mpsc, oneshot},
- Async, Future, Poll, Stream,
+use futures::{
+ channel::{mpsc, oneshot},
+ StreamExt,
};
+use futures01::{sync::mpsc as old_mpsc, Async, Poll, Stream};
use std::{
collections::HashSet,
io,
path::{Path, PathBuf},
sync::{mpsc as sync_mpsc, Arc},
- thread,
};
#[cfg(target_os = "android")]
-use talpid_types::android::AndroidContext;
+use talpid_types::{android::AndroidContext, ErrorExt};
use talpid_types::{
net::TunnelParameters,
tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition},
- ErrorExt,
};
-use tokio_core::reactor::Core;
/// Errors that can happen when setting up or using the state machine.
#[derive(err_derive::Error, Debug)]
@@ -77,7 +75,7 @@ pub enum Error {
}
/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
-pub fn spawn(
+pub async fn spawn(
allow_lan: bool,
block_when_disconnected: bool,
tunnel_parameters_generator: impl TunnelParametersGenerator,
@@ -88,15 +86,16 @@ pub fn spawn(
shutdown_tx: oneshot::Sender<()>,
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> {
- let (command_tx, command_rx) = mpsc::unbounded();
+ let (command_tx, mut command_rx) = mpsc::unbounded();
let command_tx = Arc::new(command_tx);
let mut offline_monitor = offline::spawn_monitor(
Arc::downgrade(&command_tx),
#[cfg(target_os = "android")]
android_context.clone(),
)
+ .await
.map_err(Error::OfflineMonitorError)?;
- let is_offline = offline_monitor.is_offline();
+ let is_offline = offline_monitor.is_offline().await;
let tun_provider = TunProvider::new(
#[cfg(target_os = "android")]
@@ -105,9 +104,19 @@ pub fn spawn(
allow_lan,
);
+ // Hide internal 0.1 futures from the client
+ let (command_adapter_tx, command_adapter_rx) = old_mpsc::unbounded();
+ tokio::spawn(async move {
+ while let Some(command) = command_rx.next().await {
+ if command_adapter_tx.unbounded_send(command).is_err() {
+ log::error!("Failed to forward daemon command");
+ }
+ }
+ });
+
let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
- thread::spawn(move || {
- match create_event_loop(
+ std::thread::spawn(move || {
+ let state_machine = TunnelStateMachine::new(
allow_lan,
block_when_disconnected,
is_offline,
@@ -116,28 +125,33 @@ pub fn spawn(
log_dir,
resource_dir,
cache_dir,
- command_rx,
- state_change_listener,
- shutdown_tx,
- ) {
- Ok((mut reactor, event_loop)) => {
- startup_result_tx.send(Ok(())).expect(
- "Tunnel state machine won't be started because the owner thread crashed",
- );
-
- if let Err(e) = reactor.run(event_loop) {
- log::error!(
- "{}",
- e.display_chain_with_msg("Tunnel state machine exited with an error")
- );
- }
+ command_adapter_rx,
+ );
+ let state_machine = match state_machine {
+ Ok(state_machine) => {
+ startup_result_tx.send(Ok(())).unwrap();
+ state_machine
+ }
+ Err(error) => {
+ startup_result_tx.send(Err(error)).unwrap();
+ return;
}
- Err(startup_error) => {
- startup_result_tx
- .send(Err(startup_error))
- .expect("Failed to send startup error");
+ };
+
+ let mut iter = state_machine.wait();
+ while let Some(Ok(change_event)) = iter.next() {
+ if let Err(error) = state_change_listener
+ .send(change_event)
+ .map_err(|_| Error::SendStateChange)
+ {
+ log::error!("{}", error);
+ break;
}
}
+ if shutdown_tx.send(()).is_err() {
+ log::error!("Can't send shutdown completion to daemon");
+ }
+
std::mem::drop(offline_monitor);
});
@@ -147,48 +161,6 @@ pub fn spawn(
Ok(command_tx)
}
-fn create_event_loop(
- allow_lan: bool,
- block_when_disconnected: bool,
- is_offline: bool,
- tunnel_parameters_generator: impl TunnelParametersGenerator,
- tun_provider: TunProvider,
- log_dir: Option<PathBuf>,
- resource_dir: PathBuf,
- cache_dir: impl AsRef<Path>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
- state_change_listener: impl Sender<TunnelStateTransition>,
- shutdown_tx: oneshot::Sender<()>,
-) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> {
- let reactor = Core::new().map_err(Error::ReactorError)?;
- let state_machine = TunnelStateMachine::new(
- allow_lan,
- block_when_disconnected,
- is_offline,
- tunnel_parameters_generator,
- tun_provider,
- log_dir,
- resource_dir,
- cache_dir,
- commands,
- )?;
-
- let future = state_machine
- .for_each(move |state_change_event| {
- state_change_listener
- .send(state_change_event)
- .map_err(|_| Error::SendStateChange)
- })
- .then(move |_| {
- if shutdown_tx.send(()).is_err() {
- log::error!("Can't send shutdown completion to daemon");
- }
- Ok(())
- });
-
- Ok((reactor, future))
-}
-
/// Representation of external commands for the tunnel state machine.
pub enum TunnelCommand {
/// Enable or disable LAN access in the firewall.
@@ -213,7 +185,7 @@ pub enum TunnelCommand {
/// by the stream.
struct TunnelStateMachine {
current_state: Option<TunnelStateWrapper>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: SharedTunnelStateValues,
}
@@ -227,7 +199,7 @@ impl TunnelStateMachine {
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
cache_dir: impl AsRef<Path>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: old_mpsc::UnboundedReceiver<TunnelCommand>,
) -> Result<Self, Error> {
let args = if block_when_disconnected {
FirewallArguments {
@@ -432,7 +404,7 @@ trait TunnelState: Into<TunnelStateWrapper> + Sized {
/// [`EventConsequence`]: enum.EventConsequence.html
fn handle_event(
self,
- commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: &mut SharedTunnelStateValues,
) -> EventConsequence<Self>;
}
@@ -456,7 +428,7 @@ macro_rules! state_wrapper {
impl $wrapper_name {
fn handle_event(
self,
- commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: &mut SharedTunnelStateValues,
) -> TunnelStateMachineAction {
match self {
diff --git a/talpid-openvpn-plugin/src/processing.rs b/talpid-openvpn-plugin/src/processing.rs
index f03442e2fb..291ad41092 100644
--- a/talpid-openvpn-plugin/src/processing.rs
+++ b/talpid-openvpn-plugin/src/processing.rs
@@ -28,6 +28,7 @@ impl EventProcessor {
let mut runtime = runtime::Builder::new()
.basic_scheduler()
.core_threads(1)
+ .max_threads(1)
.enable_all()
.build()
.map_err(Error::CreateRuntime)?;