diff options
| author | David Lönnhager <david.l@mullvad.net> | 2020-09-01 15:08:19 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2020-09-01 15:08:19 +0200 |
| commit | bb8173aefb3c75d7d21354ea3aec6142c68357dc (patch) | |
| tree | 7d41d4fd31cd22a16538ec467cd4196756369cd1 | |
| parent | be42b66a39ea97653414b32443cd49e858846bde (diff) | |
| parent | 76e3a149d9bcf9def332ab8d56f51d4f66958f17 (diff) | |
| download | mullvadvpn-bb8173aefb3c75d7d21354ea3aec6142c68357dc.tar.xz mullvadvpn-bb8173aefb3c75d7d21354ea3aec6142c68357dc.zip | |
Merge branch 'daemon-tokio-updates'
35 files changed, 929 insertions, 1330 deletions
diff --git a/Cargo.lock b/Cargo.lock index 31d6dd3547..d530e53bd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,15 +182,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "bytes" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "bytes" version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -284,40 +275,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "crossbeam-deque" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "memoffset 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", - "scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "crossbeam-queue" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "crossbeam-utils" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -979,31 +936,6 @@ dependencies = [ ] [[package]] -name = "jsonrpc-client-core" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.12.4 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "jsonrpc-core" -version = "8.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "kernel32-sys" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1078,14 +1010,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "memoffset" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "miniz_oxide" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1238,8 +1162,6 @@ dependencies = [ "talpid-core 0.1.0", "talpid-types 0.1.0", "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "triggered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1262,11 +1184,9 @@ name = "mullvad-jni" version = "0.1.0" dependencies = [ "err-derive 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "ipnetwork 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "jnix 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-client-core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "log-panics 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1279,6 +1199,7 @@ dependencies = [ "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "talpid-core 0.1.0", "talpid-types 0.1.0", + "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1324,6 +1245,7 @@ dependencies = [ "regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "rs-release 0.1.7 (git+https://github.com/mullvad/rs-release?branch=snailquote-unescape)", "talpid-types 0.1.0", + "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "winres 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2197,11 +2119,6 @@ dependencies = [ ] [[package]] -name = "scoped-tls" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2322,11 +2239,6 @@ dependencies = [ [[package]] name = "slab" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "slab" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2526,9 +2438,6 @@ dependencies = [ "talpid-types 0.1.0", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tonic 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tonic-build 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "triggered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2648,29 +2557,6 @@ dependencies = [ [[package]] name = "tokio" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-current-thread 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-fs 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tcp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-udp 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-uds 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio" version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ @@ -2693,72 +2579,6 @@ dependencies = [ ] [[package]] -name = "tokio-codec" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-core" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-current-thread" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-executor" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-fs" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-io" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "tokio-macros" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2769,24 +2589,6 @@ dependencies = [ ] [[package]] -name = "tokio-reactor" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "tokio-rustls" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2798,95 +2600,6 @@ dependencies = [ ] [[package]] -name = "tokio-sync" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "fnv 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-tcp" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-threadpool" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-timer" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-timer" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-udp" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "tokio-uds" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.76 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", - "mio-uds 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "tokio-util" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3506,7 +3219,6 @@ dependencies = [ "checksum blake2b_simd 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a" "checksum bumpalo 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" "checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" -"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" "checksum cc 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)" = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381" "checksum cesu8 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" @@ -3519,9 +3231,6 @@ dependencies = [ "checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" "checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171" "checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" -"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" -"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" -"checksum crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" "checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" "checksum ct-logs 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4d3686f5fa27dbc1d76c751300376e167c5a43387f44bb451fd1c24776e49113" "checksum ctrlc 3.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d0b676fa23f995faf587496dcd1c80fead847ed58d2da52ac1caca9a72790dd2" @@ -3595,8 +3304,6 @@ dependencies = [ "checksum jnix 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6354ae923ca4df982181ae2cd77eb4214f8c11d11d0c0cd8606c9347ac2abc57" "checksum jnix-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c78132fe420156f13b30518fcda9449b0ab8ae3b5584e8a1c53ce390fe770b44" "checksum js-sys 0.3.44 (registry+https://github.com/rust-lang/crates.io-index)" = "85a7e2c92a4804dd459b86c339278d0fe87cf93757fae222c3fa3ae75458bc73" -"checksum jsonrpc-client-core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f29cb249837420fb0cee7fb0fbf1d22679e121b160e71bb5e0d90b9df241c23e" -"checksum jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ddf83704f4e79979a424d1082dd2c1e52683058056c9280efa19ac5f6bc9033c" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" "checksum lazycell 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" @@ -3608,7 +3315,6 @@ dependencies = [ "checksum log-panics 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ae0136257df209261daa18d6c16394757c63e032e27aafd8b07788b051082bef" "checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" "checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" -"checksum memoffset 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f" "checksum miniz_oxide 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" "checksum mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" "checksum mio-extras 2.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" @@ -3707,7 +3413,6 @@ dependencies = [ "checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" "checksum same-file 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" "checksum schannel 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" -"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" "checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" "checksum sct 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c" "checksum security-framework 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535" @@ -3722,7 +3427,6 @@ dependencies = [ "checksum shell-escape 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" "checksum signal-hook-registry 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035" "checksum simple-signal 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53f7da44adcc42667d57483bd93f81295f27d66897804b757573b61b6f13288b" -"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum smallvec 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" "checksum snailquote 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fc3e2894a343234fb8a8653cf9d49ef6aea44e6581612ca311c91c4bd356dec4" @@ -3749,24 +3453,9 @@ dependencies = [ "checksum thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" -"checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" "checksum tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" -"checksum tokio-codec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" -"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" -"checksum tokio-current-thread 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e" -"checksum tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" -"checksum tokio-fs 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" -"checksum tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" "checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" -"checksum tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" "checksum tokio-rustls 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)" = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4" -"checksum tokio-sync 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" -"checksum tokio-tcp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72" -"checksum tokio-threadpool 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89" -"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc" -"checksum tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" -"checksum tokio-udp 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" -"checksum tokio-uds 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" "checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" "checksum tokio-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" "checksum toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index 86abaabfaf..4847bb7465 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -24,9 +24,7 @@ rand = "0.7" regex = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio02 = { package = "tokio", version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream", "fs"] } -tokio-core = "0.1" -tokio-timer = "0.1" +tokio = { version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream", "fs"] } uuid = { version = "0.7", features = ["v4"] } mullvad-paths = { path = "../mullvad-paths" } diff --git a/mullvad-daemon/src/account_history.rs b/mullvad-daemon/src/account_history.rs index 05ea1c7d3b..8cceecda42 100644 --- a/mullvad-daemon/src/account_history.rs +++ b/mullvad-daemon/src/account_history.rs @@ -6,6 +6,7 @@ use std::{ future::Future, io::{self, Seek, Write}, path::Path, + sync::{Arc, Mutex}, }; use talpid_types::ErrorExt; @@ -22,6 +23,9 @@ pub enum Error { #[error(display = "Unable to write account history file")] Write(#[error(source)] io::Error), + + #[error(display = "Write task panicked or was cancelled")] + WriteCancelled(#[error(source)] tokio::task::JoinError), } static ACCOUNT_HISTORY_FILE: &str = "account-history.json"; @@ -29,19 +33,19 @@ static ACCOUNT_HISTORY_LIMIT: usize = 3; /// A trivial MRU cache of account data pub struct AccountHistory { - file: io::BufWriter<fs::File>, - accounts: VecDeque<AccountEntry>, + file: Arc<Mutex<io::BufWriter<fs::File>>>, + accounts: Arc<Mutex<VecDeque<AccountEntry>>>, rpc_handle: MullvadRestHandle, } impl AccountHistory { - pub fn new( + pub async fn new( cache_dir: &Path, settings_dir: &Path, rpc_handle: MullvadRestHandle, ) -> Result<AccountHistory> { - Self::migrate_from_old_file_location(cache_dir, settings_dir); + Self::migrate_from_old_file_location(cache_dir, settings_dir).await; let mut options = fs::OpenOptions::new(); #[cfg(unix)] @@ -83,30 +87,32 @@ impl AccountHistory { }; let file = io::BufWriter::new(reader.into_inner()); let mut history = AccountHistory { - file, - accounts, + file: Arc::new(Mutex::new(file)), + accounts: Arc::new(Mutex::new(accounts)), rpc_handle, }; - if let Err(e) = history.save_to_disk() { + if let Err(e) = history.save_to_disk().await { log::error!("Failed to save account cache after opening it: {}", e); } Ok(history) } - fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) { + async fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) { + use tokio::fs; + let old_path = old_dir.join(ACCOUNT_HISTORY_FILE); let new_path = new_dir.join(ACCOUNT_HISTORY_FILE); if !old_path.exists() || new_path.exists() || new_path == old_path { return; } - if let Err(error) = fs::copy(&old_path, &new_path) { + if let Err(error) = fs::copy(&old_path, &new_path).await { log::error!( "{}", error.display_chain_with_msg("Failed to migrate account history file location") ); } else { - let _ = fs::remove_file(old_path); + let _ = fs::remove_file(old_path).await; } } @@ -123,9 +129,11 @@ impl AccountHistory { /// Gets account data for a certain account id and bumps it's entry to the top of the list if /// it isn't there already. Returns None if the account entry is not available. - pub fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> { + pub async fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> { let (idx, entry) = match self .accounts + .lock() + .unwrap() .iter() .enumerate() .find(|(_idx, entry)| &entry.account == account) @@ -139,19 +147,19 @@ impl AccountHistory { if idx == 0 { return Ok(Some(entry)); } - self.insert(entry.clone())?; + self.insert(entry.clone()).await?; Ok(Some(entry)) } /// Bumps history of an account token. If the account token is not in history, it will be /// added. - pub fn bump_history(&mut self, account: &AccountToken) -> Result<()> { - if self.get(account)?.is_none() { + pub async fn bump_history(&mut self, account: &AccountToken) -> Result<()> { + if self.get(account).await?.is_none() { let new_entry = AccountEntry { account: account.to_string(), wireguard: None, }; - self.insert(new_entry)?; + self.insert(new_entry).await?; } Ok(()) } @@ -173,58 +181,58 @@ impl AccountHistory { } /// Always inserts a new entry at the start of the list - pub fn insert(&mut self, new_entry: AccountEntry) -> Result<()> { - self.accounts - .retain(|entry| entry.account != new_entry.account); - - self.accounts.push_front(new_entry); + pub async fn insert(&mut self, new_entry: AccountEntry) -> Result<()> { + let mut accounts = self.accounts.lock().unwrap(); + accounts.retain(|entry| entry.account != new_entry.account); + accounts.push_front(new_entry); - if self.accounts.len() > ACCOUNT_HISTORY_LIMIT { - let last_entry = self.accounts.pop_back().unwrap(); + if accounts.len() > ACCOUNT_HISTORY_LIMIT { + let last_entry = accounts.pop_back().unwrap(); if let Some(wg_data) = last_entry.wireguard { - self.rpc_handle - .service() - .spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data)); + tokio::spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data)); } } - self.save_to_disk() + std::mem::drop(accounts); + self.save_to_disk().await } /// Retrieve account history. pub fn get_account_history(&self) -> Vec<AccountToken> { self.accounts + .lock() + .unwrap() .iter() .map(|entry| entry.account.clone()) .collect() } /// Remove account data - pub fn remove_account(&mut self, account: &str) -> Result<()> { - let entry = self.get(&String::from(account))?; + pub async fn remove_account(&mut self, account: &str) -> Result<()> { + let entry = self.get(&String::from(account)).await?; let entry = match entry { Some(entry) => entry, None => return Ok(()), }; if let Some(wg_data) = entry.wireguard { - self.rpc_handle - .service() - .spawn(self.create_remove_wg_key_rpc(account, &wg_data)) + tokio::spawn(self.create_remove_wg_key_rpc(account, &wg_data)); } - let _ = self.accounts.pop_front(); - self.save_to_disk() + let _ = self.accounts.lock().unwrap().pop_front(); + self.save_to_disk().await } /// Remove account history - pub fn clear(&mut self) -> Result<()> { + pub async fn clear(&mut self) -> Result<()> { log::debug!("account_history::clear"); let rpc = WireguardKeyProxy::new(self.rpc_handle.clone()); let removal: Vec<_> = self .accounts + .lock() + .unwrap() .drain(0..) .filter_map(move |entry| { let account = entry.account.clone(); @@ -241,21 +249,34 @@ impl AccountHistory { .collect(); - let joined_futs = futures::future::join_all(removal); - self.rpc_handle.service().block_on(joined_futs); + futures::future::join_all(removal).await; + + self.accounts = Arc::new(Mutex::new(VecDeque::new())); + self.save_to_disk().await + } + + async fn save_to_disk(&mut self) -> Result<()> { + let file = self.file.clone(); + let accounts = self.accounts.clone(); - self.accounts = VecDeque::new(); - self.save_to_disk() + tokio::task::spawn_blocking(move || { + let mut file = file.lock().unwrap(); + let accounts = accounts.lock().unwrap(); + Self::save_to_disk_inner(&mut *file, &*accounts) + }) + .await + .map_err(Error::WriteCancelled)? } - fn save_to_disk(&mut self) -> Result<()> { - self.file.get_mut().set_len(0).map_err(Error::Write)?; - self.file - .seek(io::SeekFrom::Start(0)) - .map_err(Error::Write)?; - serde_json::to_writer_pretty(&mut self.file, &self.accounts).map_err(Error::Serialize)?; - self.file.flush().map_err(Error::Write)?; - self.file.get_mut().sync_all().map_err(Error::Write) + fn save_to_disk_inner( + mut file: &mut io::BufWriter<fs::File>, + accounts: &VecDeque<AccountEntry>, + ) -> Result<()> { + file.get_mut().set_len(0).map_err(Error::Write)?; + file.seek(io::SeekFrom::Start(0)).map_err(Error::Write)?; + serde_json::to_writer_pretty(&mut file, accounts).map_err(Error::Serialize)?; + file.flush().map_err(Error::Write)?; + file.get_mut().sync_all().map_err(Error::Write) } } diff --git a/mullvad-daemon/src/event_loop.rs b/mullvad-daemon/src/event_loop.rs deleted file mode 100644 index 58f638a2ac..0000000000 --- a/mullvad-daemon/src/event_loop.rs +++ /dev/null @@ -1,38 +0,0 @@ -use futures01::{sync::oneshot, Future}; -use std::thread; -use tokio_core::reactor::{Core, Remote}; - -pub struct CoreHandle { - /// Remote used to spawn futures on the daemon's event loop. - pub remote: Remote, - /// A sender that will cause the event loop to stop once it's dropped. - shutdown_tx: Option<oneshot::Sender<()>>, -} - -impl Drop for CoreHandle { - fn drop(&mut self) { - if let Some(shutdown_tx) = self.shutdown_tx.take() { - if shutdown_tx.send(()).is_err() { - log::error!("Core already shut down"); - } - } - } -} - -/// Panics if a new tokio event loop can't be spawned. -pub fn spawn() -> CoreHandle { - let (tx, rx) = oneshot::channel(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - thread::spawn(move || { - let mut core = Core::new().expect("Failed to spawn event loop"); - let remote = core.remote(); - let _ = tx.send(remote); - let _ = core.run(shutdown_rx); - }); - let remote = rx.wait().expect("Failed to spawn event loop"); - - CoreHandle { - remote, - shutdown_tx: Some(shutdown_tx), - } -} diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 414d64e267..e159e33be7 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -14,20 +14,18 @@ pub mod management_interface; mod relays; #[cfg(not(target_os = "android"))] pub mod rpc_uniqueness_check; +pub mod runtime; mod settings; pub mod version; mod version_check; -use futures::future::{abortable, AbortHandle}; -use futures01::{ - future::{self, Executor}, - stream::Wait, - sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, - }, - Future, Stream, +use futures::{ + channel::{mpsc, oneshot}, + compat::Future01CompatExt, + future::{abortable, AbortHandle, Future}, + StreamExt, }; +use futures01::Future as Future01; use log::{debug, error, info, warn}; use mullvad_rpc::AccountsProxy; use mullvad_types::{ @@ -53,7 +51,7 @@ use std::{ marker::PhantomData, mem, path::PathBuf, - sync::{mpsc, Arc, Weak}, + sync::{mpsc as sync_mpsc, Arc, Weak}, time::Duration, }; #[cfg(target_os = "linux")] @@ -74,12 +72,6 @@ use talpid_types::{ mod wireguard; const TARGET_START_STATE_FILE: &str = "target-start-state.json"; -mod event_loop; - -/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core -/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196). -/// Remove this and use the one in jsonrpc_core when that is released. -type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>; const TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); @@ -151,24 +143,24 @@ pub enum Error { /// Enum representing commands that can be sent to the daemon. pub enum DaemonCommand { /// Set target state. Does nothing if the daemon already has the state that is being set. - SetTargetState(oneshot::Sender<std::result::Result<(), ()>>, TargetState), + SetTargetState(oneshot::Sender<()>, TargetState), /// Reconnect the tunnel, if one is connecting/connected. Reconnect, /// Request the current state. GetState(oneshot::Sender<TunnelState>), /// Get the current geographical location. GetCurrentLocation(oneshot::Sender<Option<GeoIpLocation>>), - CreateNewAccount(oneshot::Sender<std::result::Result<String, mullvad_rpc::rest::Error>>), + CreateNewAccount(oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>), /// Request the metadata for an account. GetAccountData( - oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::rest::Error>>, + oneshot::Sender<Result<AccountData, mullvad_rpc::rest::Error>>, AccountToken, ), /// Request www auth token for an account - GetWwwAuthToken(oneshot::Sender<BoxFuture<String, mullvad_rpc::rest::Error>>), + GetWwwAuthToken(oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>), /// Submit voucher to add time to the current account. Returns time added in seconds SubmitVoucher( - oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::rest::Error>>, + oneshot::Sender<Result<VoucherSubmission, mullvad_rpc::rest::Error>>, String, ), /// Request account history @@ -197,15 +189,9 @@ pub enum DaemonCommand { /// Set the mssfix argument for OpenVPN SetOpenVpnMssfix(oneshot::Sender<()>, Option<u16>), /// Set proxy details for OpenVPN - SetBridgeSettings( - oneshot::Sender<std::result::Result<(), settings::Error>>, - BridgeSettings, - ), + SetBridgeSettings(oneshot::Sender<Result<(), settings::Error>>, BridgeSettings), /// Set proxy state - SetBridgeState( - oneshot::Sender<std::result::Result<(), settings::Error>>, - BridgeState, - ), + SetBridgeState(oneshot::Sender<Result<(), settings::Error>>, BridgeState), /// Set if IPv6 should be enabled in the tunnel SetEnableIpv6(oneshot::Sender<()>, bool), /// Set MTU for wireguard tunnels @@ -252,7 +238,7 @@ pub(crate) enum InternalDaemonEvent { TunnelStateTransition(TunnelStateTransition), /// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay. GenerateTunnelParameters( - mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, + sync_mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, u32, ), /// A command sent to the daemon. @@ -338,12 +324,12 @@ impl DaemonExecutionState { pub struct DaemonCommandChannel { sender: DaemonCommandSender, - receiver: UnboundedReceiver<InternalDaemonEvent>, + receiver: mpsc::UnboundedReceiver<InternalDaemonEvent>, } impl DaemonCommandChannel { pub fn new() -> Self { - let (untracked_sender, receiver) = futures01::sync::mpsc::unbounded(); + let (untracked_sender, receiver) = mpsc::unbounded(); let sender = DaemonCommandSender(Arc::new(untracked_sender)); Self { sender, receiver } @@ -353,7 +339,12 @@ impl DaemonCommandChannel { self.sender.clone() } - fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) { + fn destructure( + self, + ) -> ( + DaemonEventSender, + mpsc::UnboundedReceiver<InternalDaemonEvent>, + ) { let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0)); (event_sender, self.receiver) @@ -361,7 +352,7 @@ impl DaemonCommandChannel { } #[derive(Clone)] -pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>); +pub struct DaemonCommandSender(Arc<mpsc::UnboundedSender<InternalDaemonEvent>>); impl DaemonCommandSender { pub fn send(&self, command: DaemonCommand) -> Result<(), Error> { @@ -372,7 +363,7 @@ impl DaemonCommandSender { } pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> { - sender: Weak<UnboundedSender<InternalDaemonEvent>>, + sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>, _event: PhantomData<E>, } @@ -389,7 +380,7 @@ where } impl DaemonEventSender { - pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self { + pub fn new(sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>) -> Self { DaemonEventSender { sender, _event: PhantomData, @@ -454,13 +445,13 @@ pub trait EventListener { } pub struct Daemon<L: EventListener> { - tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>, + tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: TargetState, state: DaemonExecutionState, #[cfg(target_os = "linux")] exclude_pids: split_tunnel::PidManager, - rx: Wait<UnboundedReceiver<InternalDaemonEvent>>, + rx: mpsc::UnboundedReceiver<InternalDaemonEvent>, tx: DaemonEventSender, reconnection_job: Option<AbortHandle>, event_listener: L, @@ -471,7 +462,6 @@ pub struct Daemon<L: EventListener> { rpc_handle: mullvad_rpc::rest::MullvadRestHandle, wireguard_key_manager: wireguard::KeyManager, version_updater_handle: version_check::VersionUpdaterHandle, - core_handle: event_loop::CoreHandle, relay_selector: relays::RelaySelector, last_generated_relay: Option<Relay>, last_generated_bridge_relay: Option<Relay>, @@ -486,7 +476,7 @@ impl<L> Daemon<L> where L: EventListener + Clone + Send + 'static, { - pub fn start( + pub async fn start( log_dir: Option<PathBuf>, resource_dir: PathBuf, settings_dir: PathBuf, @@ -498,12 +488,13 @@ where let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) = oneshot::channel(); - let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir) - .map_err(Error::InitRpcFactory)?; + let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir( + tokio::runtime::Handle::current(), + &cache_dir, + ) + .map_err(Error::InitRpcFactory)?; let rpc_handle = rpc_runtime.mullvad_rest_handle(); - let core_handle = event_loop::spawn(); - let relay_list_listener = event_listener.clone(); let on_relay_list_update = move |relay_list: &RelayList| { relay_list_listener.notify_relay_list(relay_list.clone()); @@ -532,9 +523,10 @@ where app_version_info.clone(), settings.show_beta_releases, ); - rpc_runtime.runtime().spawn(version_updater.run()); + tokio::spawn(version_updater.run()); let account_history = account_history::AccountHistory::new(&cache_dir, &settings_dir, rpc_handle.clone()) + .await .map_err(Error::LoadAccountHistory)?; // Restore the tunnel to a previous state @@ -578,13 +570,14 @@ where #[cfg(target_os = "android")] android_context, ) + .await .map_err(Error::TunnelError)?; let wireguard_key_manager = wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone()); // Attempt to download a fresh relay list - rpc_runtime.runtime().block_on(relay_selector.update()); + relay_selector.update().await; let initial_target_state = if settings.get_account_token().is_some() { if settings.auto_connect { @@ -605,7 +598,7 @@ where state: DaemonExecutionState::Running, #[cfg(target_os = "linux")] exclude_pids: split_tunnel::PidManager::new().map_err(Error::InitSplitTunneling)?, - rx: internal_event_rx.wait(), + rx: internal_event_rx, tx: internal_event_tx, reconnection_job: None, event_listener, @@ -616,7 +609,6 @@ where rpc_handle, wireguard_key_manager, version_updater_handle, - core_handle, relay_selector, last_generated_relay: None, last_generated_bridge_relay: None, @@ -626,19 +618,22 @@ where cache_dir, }; - daemon.ensure_wireguard_keys_for_current_account(); + daemon.ensure_wireguard_keys_for_current_account().await; if let Some(token) = daemon.settings.get_account_token() { - daemon.wireguard_key_manager.set_rotation_interval( - &mut daemon.account_history, - token, - daemon - .settings - .tunnel_options - .wireguard - .automatic_rotation - .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)), - ); + daemon + .wireguard_key_manager + .set_rotation_interval( + &mut daemon.account_history, + token, + daemon + .settings + .tunnel_options + .wireguard + .automatic_rotation + .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)), + ) + .await; } Ok(daemon) @@ -646,90 +641,101 @@ where /// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a /// shutdown event is received. - pub fn run(mut self) -> Result<(), Error> { + pub async fn run(mut self) -> Result<(), Error> { if self.target_state == TargetState::Secured { self.connect_tunnel(); } - while let Some(Ok(event)) = self.rx.next() { - self.handle_event(event); + + while let Some(event) = self.rx.next().await { + self.handle_event(event).await; if self.state == DaemonExecutionState::Finished { break; } } - self.finalize(); + self.finalize().await; Ok(()) } - fn finalize(self) { - let (event_listener, shutdown_callbacks, tunnel_state_machine_shutdown_signal) = + async fn finalize(self) { + let (event_listener, shutdown_callbacks, rpc_runtime, tunnel_state_machine_shutdown_signal) = self.shutdown(); for cb in shutdown_callbacks { cb(); } - let state_machine_shutdown = tokio_timer::Timer::default().timeout( - // the oneshot::Canceled error type does not play well with the timer error, as such, - // it has to be cast away. - tunnel_state_machine_shutdown_signal.map_err(|_| { - log::error!("Tunnel state machine already shut down"); - }), + let shutdown_signal = tokio::time::timeout( TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT, + tunnel_state_machine_shutdown_signal, ); - - match state_machine_shutdown.wait() { - Ok(_) => { - log::info!("Tunnel state machine shut down"); - } - Err(_) => { - log::error!("Tunnel state machine did not shut down in time, shutting down anyway"); - } + match shutdown_signal.await { + Ok(_) => log::info!("Tunnel state machine shut down"), + Err(_) => log::error!("Tunnel state machine did not shut down gracefully"), } + mem::drop(event_listener); + mem::drop(rpc_runtime); } /// Shuts down the daemon without shutting down the underlying event listener and the shutdown /// callbacks - fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, oneshot::Receiver<()>) { + fn shutdown( + self, + ) -> ( + L, + Vec<Box<dyn FnOnce()>>, + mullvad_rpc::MullvadRpcRuntime, + oneshot::Receiver<()>, + ) { let Daemon { event_listener, shutdown_callbacks, + rpc_runtime, tunnel_state_machine_shutdown_signal, .. } = self; ( event_listener, shutdown_callbacks, + rpc_runtime, tunnel_state_machine_shutdown_signal, ) } - fn handle_event(&mut self, event: InternalDaemonEvent) { + async fn handle_event(&mut self, event: InternalDaemonEvent) { use self::InternalDaemonEvent::*; match event { - TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition), + TunnelStateTransition(transition) => { + self.handle_tunnel_state_transition(transition).await + } GenerateTunnelParameters(tunnel_parameters_tx, retry_attempt) => { self.handle_generate_tunnel_parameters(&tunnel_parameters_tx, retry_attempt) + .await } - Command(command) => self.handle_command(command), + Command(command) => self.handle_command(command).await, TriggerShutdown => self.trigger_shutdown_event(), - WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event), - NewAccountEvent(account_token, tx) => self.handle_new_account_event(account_token, tx), + WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event).await, + NewAccountEvent(account_token, tx) => { + self.handle_new_account_event(account_token, tx).await + } NewAppVersionInfo(app_version_info) => { self.handle_new_app_version_info(app_version_info) } } } - fn handle_tunnel_state_transition(&mut self, tunnel_state_transition: TunnelStateTransition) { + async fn handle_tunnel_state_transition( + &mut self, + tunnel_state_transition: TunnelStateTransition, + ) { match &tunnel_state_transition { TunnelStateTransition::Disconnected | TunnelStateTransition::Connected(_) | TunnelStateTransition::Error(_) => { // Reset the RPCs so that they fail immediately after the underlying socket gets // invalidated due to the tunnel either coming up or breaking. - self.rpc_handle.service().reset(); + self.rpc_handle.service().reset().await; } _ => (), }; @@ -770,7 +776,7 @@ where } if let ErrorStateCause::AuthFailed(_) = error_state.cause() { - self.schedule_reconnect(Duration::from_secs(60)) + self.schedule_reconnect(Duration::from_secs(60)).await } } _ => {} @@ -780,9 +786,11 @@ where self.event_listener.notify_new_state(tunnel_state); } - fn handle_generate_tunnel_parameters( + async fn handle_generate_tunnel_parameters( &mut self, - tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, + tunnel_parameters_tx: &sync_mpsc::Sender< + Result<TunnelParameters, ParameterGenerationError>, + >, retry_attempt: u32, ) { if let Some(account_token) = self.settings.get_account_token() { @@ -797,26 +805,30 @@ where ParameterGenerationError::CustomTunnelHostResultionError }) } - RelaySettings::Normal(constraints) => self - .relay_selector - .get_tunnel_endpoint( - &constraints, - self.settings.get_bridge_state(), - retry_attempt, - self.account_history - .get(&account_token) - .unwrap_or(None) - .and_then(|entry| entry.wireguard) - .is_some(), - ) - .map_err(|_| ParameterGenerationError::NoMatchingRelay) - .and_then(|(relay, endpoint)| { - let result = self.create_tunnel_parameters( - &relay, - endpoint, - account_token, + RelaySettings::Normal(constraints) => { + let endpoint = self + .relay_selector + .get_tunnel_endpoint( + &constraints, + self.settings.get_bridge_state(), retry_attempt, - ); + self.account_history + .get(&account_token) + .await + .unwrap_or(None) + .and_then(|entry| entry.wireguard) + .is_some(), + ) + .ok(); + if let Some((relay, endpoint)) = endpoint { + let result = self + .create_tunnel_parameters( + &relay, + endpoint, + account_token, + retry_attempt, + ) + .await; self.last_generated_relay = Some(relay); match result { Ok(result) => Ok(result), @@ -836,7 +848,10 @@ where Err(ParameterGenerationError::NoMatchingRelay) } } - }), + } else { + Err(ParameterGenerationError::NoMatchingRelay) + } + } }; if tunnel_parameters_tx.send(result).is_err() { log::error!("Failed to send tunnel parameters"); @@ -846,7 +861,7 @@ where } } - fn create_tunnel_parameters( + async fn create_tunnel_parameters( &mut self, relay: &Relay, endpoint: MullvadEndpoint, @@ -927,6 +942,7 @@ where let wg_data = self .account_history .get(&account_token) + .await .map_err(Error::AccountHistory)? .and_then(|entry| entry.wireguard) .ok_or(Error::NoKeyAvailable)?; @@ -952,15 +968,15 @@ where } } - fn schedule_reconnect(&mut self, delay: Duration) { + async fn schedule_reconnect(&mut self, delay: Duration) { let tunnel_command_tx = self.tx.to_specialized_sender(); let (future, abort_handle) = abortable(Box::pin(async move { - tokio02::time::delay_for(delay).await; + tokio::time::delay_for(delay).await; log::debug!("Attempting to reconnect"); let _ = tunnel_command_tx.send(DaemonCommand::Reconnect); })); - self.spawn_future(future); + tokio::spawn(future); self.reconnection_job = Some(abort_handle); } @@ -970,23 +986,8 @@ where } } - fn spawn_future<F>(&mut self, fut: F) - where - F: std::future::Future + Send + 'static, - F::Output: Send, - { - self.rpc_runtime.runtime().spawn(fut); - } - - fn block_on_future<F>(&mut self, fut: F) -> F::Output - where - F: std::future::Future, - { - self.rpc_runtime.runtime().block_on(fut) - } - - fn handle_command(&mut self, command: DaemonCommand) { + async fn handle_command(&mut self, command: DaemonCommand) { use self::DaemonCommand::*; if !self.state.is_running() { log::trace!("Dropping daemon command because the daemon is shutting down",); @@ -996,22 +997,22 @@ where SetTargetState(tx, state) => self.on_set_target_state(tx, state), Reconnect => self.on_reconnect(), GetState(tx) => self.on_get_state(tx), - GetCurrentLocation(tx) => self.on_get_current_location(tx), - CreateNewAccount(tx) => self.on_create_new_account(tx), - GetAccountData(tx, account_token) => self.on_get_account_data(tx, account_token), - GetWwwAuthToken(tx) => self.on_get_www_auth_token(tx), - SubmitVoucher(tx, voucher) => self.on_submit_voucher(tx, voucher), + GetCurrentLocation(tx) => self.on_get_current_location(tx).await, + CreateNewAccount(tx) => self.on_create_new_account(tx).await, + GetAccountData(tx, account_token) => self.on_get_account_data(tx, account_token).await, + GetWwwAuthToken(tx) => self.on_get_www_auth_token(tx).await, + SubmitVoucher(tx, voucher) => self.on_submit_voucher(tx, voucher).await, GetRelayLocations(tx) => self.on_get_relay_locations(tx), - UpdateRelayLocations => self.on_update_relay_locations(), - SetAccount(tx, account_token) => self.on_set_account(tx, account_token), + UpdateRelayLocations => self.on_update_relay_locations().await, + SetAccount(tx, account_token) => self.on_set_account(tx, account_token).await, GetAccountHistory(tx) => self.on_get_account_history(tx), RemoveAccountFromHistory(tx, account_token) => { - self.on_remove_account_from_history(tx, account_token) + self.on_remove_account_from_history(tx, account_token).await } - ClearAccountHistory(tx) => self.on_clear_account_history(tx), + ClearAccountHistory(tx) => self.on_clear_account_history(tx).await, UpdateRelaySettings(tx, update) => self.on_update_relay_settings(tx, update), SetAllowLan(tx, allow_lan) => self.on_set_allow_lan(tx, allow_lan), - SetShowBetaReleases(tx, enabled) => self.on_set_show_beta_releases(tx, enabled), + SetShowBetaReleases(tx, enabled) => self.on_set_show_beta_releases(tx, enabled).await, SetBlockWhenDisconnected(tx, block_when_disconnected) => { self.on_set_block_when_disconnected(tx, block_when_disconnected) } @@ -1024,16 +1025,16 @@ where SetEnableIpv6(tx, enable_ipv6) => self.on_set_enable_ipv6(tx, enable_ipv6), SetWireguardMtu(tx, mtu) => self.on_set_wireguard_mtu(tx, mtu), SetWireguardRotationInterval(tx, interval) => { - self.on_set_wireguard_rotation_interval(tx, interval) + self.on_set_wireguard_rotation_interval(tx, interval).await } GetSettings(tx) => self.on_get_settings(tx), - GenerateWireguardKey(tx) => self.on_generate_wireguard_key(tx), - GetWireguardKey(tx) => self.on_get_wireguard_key(tx), - VerifyWireguardKey(tx) => self.on_verify_wireguard_key(tx), + GenerateWireguardKey(tx) => self.on_generate_wireguard_key(tx).await, + GetWireguardKey(tx) => self.on_get_wireguard_key(tx).await, + VerifyWireguardKey(tx) => self.on_verify_wireguard_key(tx).await, GetVersionInfo(tx) => self.on_get_version_info(tx), GetCurrentVersion(tx) => self.on_get_current_version(tx), #[cfg(not(target_os = "android"))] - FactoryReset(tx) => self.on_factory_reset(tx), + FactoryReset(tx) => self.on_factory_reset(tx).await, #[cfg(target_os = "linux")] GetSplitTunnelProcesses(tx) => self.on_get_split_tunnel_processes(tx), #[cfg(target_os = "linux")] @@ -1047,7 +1048,7 @@ where } } - fn handle_wireguard_key_event( + async fn handle_wireguard_key_event( &mut self, event: ( AccountToken, @@ -1073,6 +1074,7 @@ where let mut account_entry = self .account_history .get(&account) + .await .ok() .and_then(|entry| entry) .unwrap_or_else(|| account_history::AccountEntry { @@ -1080,10 +1082,10 @@ where wireguard: None, }); account_entry.wireguard = Some(data); - match self.account_history.insert(account_entry) { + match self.account_history.insert(account_entry).await { Ok(_) => { if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() { - self.schedule_reconnect(WG_RECONNECT_DELAY); + self.schedule_reconnect(WG_RECONNECT_DELAY).await; } self.event_listener .notify_key_event(KeygenEvent::NewKey(public_key)) @@ -1115,12 +1117,12 @@ where } } - fn handle_new_account_event( + async fn handle_new_account_event( &mut self, new_token: AccountToken, tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>, ) { - match self.set_account(Some(new_token.clone())) { + match self.set_account(Some(new_token.clone())).await { Ok(_) => { self.set_target_state(TargetState::Unsecured); let _ = tx.send(Ok(new_token)); @@ -1136,17 +1138,13 @@ where self.event_listener.notify_app_version(app_version_info); } - fn on_set_target_state( - &mut self, - tx: oneshot::Sender<Result<(), ()>>, - new_target_state: TargetState, - ) { + fn on_set_target_state(&mut self, tx: oneshot::Sender<()>, new_target_state: TargetState) { if self.state.is_running() { self.set_target_state(new_target_state); } else { warn!("Ignoring target state change request due to shutdown"); } - Self::oneshot_send(tx, Ok(()), "target state"); + Self::oneshot_send(tx, (), "target state"); } fn on_reconnect(&mut self) { @@ -1161,42 +1159,56 @@ where Self::oneshot_send(tx, self.tunnel_state.clone(), "current state"); } - fn on_get_current_location(&mut self, tx: oneshot::Sender<Option<GeoIpLocation>>) { + async fn on_get_current_location(&mut self, tx: oneshot::Sender<Option<GeoIpLocation>>) { use self::TunnelState::*; - let get_location: Box<dyn Future<Item = Option<GeoIpLocation>, Error = ()> + Send> = - match &self.tunnel_state { - Disconnected => Box::new(self.get_geo_location().map(Some)), - Connecting { location, .. } => Box::new(future::result(Ok(location.clone()))), - Disconnecting(..) => Box::new(future::result(Ok(self.build_location_from_relay()))), - Connected { location, .. } => { - let relay_location = location.clone(); - Box::new( - self.get_geo_location() - .map(|fetched_location| GeoIpLocation { - ipv4: fetched_location.ipv4, - ipv6: fetched_location.ipv6, - ..relay_location.unwrap_or(fetched_location) - }) - .map(Some), - ) - } - Error(..) => { - // We are not online at all at this stage so no location data is available. - Box::new(future::result(Ok(None))) - } - }; - self.core_handle.remote.spawn(move |_| { - get_location.map(|location| Self::oneshot_send(tx, location, "current location")) - }); + match &self.tunnel_state { + Disconnected => { + let location = self.get_geo_location(); + tokio::spawn(async { + Self::oneshot_send(tx, location.await.ok(), "current location"); + }); + } + Connecting { location, .. } => { + Self::oneshot_send(tx, location.clone(), "current location") + } + Disconnecting(..) => { + Self::oneshot_send(tx, self.build_location_from_relay(), "current location") + } + Connected { location, .. } => { + let relay_location = location.clone(); + let location_future = self.get_geo_location(); + tokio::spawn(async { + let location = location_future.await; + Self::oneshot_send( + tx, + location.ok().map(|fetched_location| GeoIpLocation { + ipv4: fetched_location.ipv4, + ipv6: fetched_location.ipv6, + ..relay_location.unwrap_or(fetched_location) + }), + "current location", + ); + }); + } + Error(_) => { + // We are not online at all at this stage so no location data is available. + Self::oneshot_send(tx, None, "current location"); + } + } } - fn get_geo_location(&mut self) -> impl Future<Item = GeoIpLocation, Error = ()> { + fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> { let https_handle = self.rpc_runtime.rest_handle(); - geoip::send_location_request(https_handle).map_err(|e| { - warn!("Unable to fetch GeoIP location: {}", e.display_chain()); - }) + async { + geoip::send_location_request(https_handle) + .map_err(|e| { + warn!("Unable to fetch GeoIP location: {}", e.display_chain()); + }) + .compat() + .await + } } fn build_location_from_relay(&self) -> Option<GeoIpLocation> { @@ -1221,7 +1233,7 @@ where }) } - fn on_create_new_account( + async fn on_create_new_account( &mut self, tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>, ) { @@ -1242,41 +1254,55 @@ where Ok(()) }); - if self.core_handle.remote.execute(future).is_err() { - log::error!("Failed to spawn future for creating a new account"); - } + tokio::spawn(async { + if future.compat().await.is_err() { + log::error!("Failed to spawn future for creating a new account"); + } + }); } - fn on_get_account_data( + async fn on_get_account_data( &mut self, - tx: oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::rest::Error>>, + tx: oneshot::Sender<Result<AccountData, mullvad_rpc::rest::Error>>, account_token: AccountToken, ) { - let rpc_call = self - .accounts_proxy - .get_expiry(account_token) - .map(|expiry| AccountData { expiry }); - Self::oneshot_send(tx, Box::new(rpc_call), "account data") + let expiry_old_fut = self.accounts_proxy.get_expiry(account_token); + let rpc_call = async { + let result = expiry_old_fut + .compat() + .await + .map(|expiry| AccountData { expiry }); + Self::oneshot_send(tx, result, "account data"); + }; + tokio::spawn(rpc_call); } - fn on_get_www_auth_token( + async fn on_get_www_auth_token( &mut self, - tx: oneshot::Sender<BoxFuture<String, mullvad_rpc::rest::Error>>, + tx: oneshot::Sender<Result<String, mullvad_rpc::rest::Error>>, ) { if let Some(account_token) = self.settings.get_account_token() { - let rpc_call = self.accounts_proxy.get_www_auth_token(account_token); - Self::oneshot_send(tx, Box::new(rpc_call), "get_www_auth_token response") + let old_future = self.accounts_proxy.get_www_auth_token(account_token); + let rpc_call = async { + let result = old_future.compat().await; + Self::oneshot_send(tx, result, "get_www_auth_token response"); + }; + tokio::spawn(rpc_call); } } - fn on_submit_voucher( + async fn on_submit_voucher( &mut self, - tx: oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::rest::Error>>, + tx: oneshot::Sender<Result<VoucherSubmission, mullvad_rpc::rest::Error>>, voucher: String, ) { if let Some(account_token) = self.settings.get_account_token() { - let rpc_call = self.accounts_proxy.submit_voucher(account_token, voucher); - Self::oneshot_send(tx, Box::new(rpc_call), "submit_voucher response"); + let old_future = self.accounts_proxy.submit_voucher(account_token, voucher); + let rpc_call = async { + let result = old_future.compat().await; + Self::oneshot_send(tx, result, "submit_voucher response"); + }; + tokio::spawn(rpc_call); } } @@ -1284,13 +1310,12 @@ where Self::oneshot_send(tx, self.relay_selector.get_locations(), "relay locations"); } - fn on_update_relay_locations(&mut self) { - let update_future = self.relay_selector.update(); - self.block_on_future(update_future); + async fn on_update_relay_locations(&mut self) { + self.relay_selector.update().await; } - fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) { - match self.set_account(account_token.clone()) { + async fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) { + match self.set_account(account_token.clone()).await { Ok(account_changed) => { if account_changed { match account_token { @@ -1312,7 +1337,10 @@ where } } - fn set_account(&mut self, account_token: Option<String>) -> Result<bool, settings::Error> { + async fn set_account( + &mut self, + account_token: Option<String>, + ) -> Result<bool, settings::Error> { let account_changed = self.settings.set_account_token(account_token.clone())?; if account_changed { self.event_listener @@ -1320,17 +1348,18 @@ where // Bump account history if a token was set if let Some(token) = account_token.clone() { - if let Err(e) = self.account_history.bump_history(&token) { + if let Err(e) = self.account_history.bump_history(&token).await { log::error!("Failed to bump account history: {}", e); } } - self.ensure_wireguard_keys_for_current_account(); + self.ensure_wireguard_keys_for_current_account().await; if let Some(token) = account_token { // update automatic rotation self.wireguard_key_manager - .reset_rotation(&mut self.account_history, token); + .reset_rotation(&mut self.account_history, token) + .await; } } Ok(account_changed) @@ -1344,18 +1373,23 @@ where ); } - fn on_remove_account_from_history( + async fn on_remove_account_from_history( &mut self, tx: oneshot::Sender<()>, account_token: AccountToken, ) { - if self.account_history.remove_account(&account_token).is_ok() { + if self + .account_history + .remove_account(&account_token) + .await + .is_ok() + { Self::oneshot_send(tx, (), "remove_account_from_history response"); } } - fn on_clear_account_history(&mut self, tx: oneshot::Sender<()>) { - match self.account_history.clear() { + async fn on_clear_account_history(&mut self, tx: oneshot::Sender<()>) { + match self.account_history.clear().await { Ok(_) => { self.set_target_state(TargetState::Unsecured); Self::oneshot_send(tx, (), "clear_account_history response"); @@ -1384,7 +1418,7 @@ where } #[cfg(not(target_os = "android"))] - fn on_factory_reset(&mut self, tx: oneshot::Sender<()>) { + async fn on_factory_reset(&mut self, tx: oneshot::Sender<()>) { let mut failed = false; @@ -1393,7 +1427,7 @@ where failed = true; } - if let Err(e) = self.account_history.clear() { + if let Err(e) = self.account_history.clear().await { log::error!("Failed to clear account history - {}", e); failed = true; } @@ -1486,7 +1520,7 @@ where } } - fn on_set_show_beta_releases(&mut self, tx: oneshot::Sender<()>, enabled: bool) { + async fn on_set_show_beta_releases(&mut self, tx: oneshot::Sender<()>, enabled: bool) { let save_result = self.settings.set_show_beta_releases(enabled); match save_result { Ok(settings_changed) => { @@ -1494,9 +1528,8 @@ where if settings_changed { self.event_listener .notify_settings(self.settings.to_settings()); - let runtime = self.rpc_runtime.runtime(); let mut handle = self.version_updater_handle.clone(); - runtime.block_on(async { handle.set_show_beta_releases(enabled).await }); + handle.set_show_beta_releases(enabled).await; } } Err(e) => error!("{}", e.display_chain_with_msg("Unable to save settings")), @@ -1648,7 +1681,7 @@ where } } - fn on_set_wireguard_rotation_interval( + async fn on_set_wireguard_rotation_interval( &mut self, tx: oneshot::Sender<()>, interval: Option<u32>, @@ -1661,11 +1694,14 @@ where let account_token = self.settings.get_account_token(); if let Some(token) = account_token { - self.wireguard_key_manager.set_rotation_interval( - &mut self.account_history, - token, - interval.map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)), - ); + self.wireguard_key_manager + .set_rotation_interval( + &mut self.account_history, + token, + interval + .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)), + ) + .await; } self.event_listener @@ -1676,68 +1712,89 @@ where } } - fn ensure_wireguard_keys_for_current_account(&mut self) { + async fn ensure_wireguard_keys_for_current_account(&mut self) { if let Some(account) = self.settings.get_account_token() { if self .account_history .get(&account) + .await .map(|entry| entry.map(|e| e.wireguard.is_none()).unwrap_or(true)) .unwrap_or(true) { log::info!("Automatically generating new wireguard key for account"); self.wireguard_key_manager - .generate_key_async(account, Some(FIRST_KEY_PUSH_TIMEOUT)); + .generate_key_async(account, Some(FIRST_KEY_PUSH_TIMEOUT)) + .await; } else { log::info!("Account already has wireguard key"); } } } - fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) { - let mut result = || -> Result<KeygenEvent, String> { - let account_token = self - .settings - .get_account_token() - .ok_or_else(|| "No account token set".to_owned())?; + async fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) { + match self.on_generate_wireguard_key_inner().await { + Ok(key_event) => { + Self::oneshot_send(tx, key_event, "generate_wireguard_key response"); + } + Err(e) => { + log::error!("Failed to generate new wireguard key - {}", e); + } + } + } - let mut account_entry = self - .account_history - .get(&account_token) - .map_err(|e| format!("Failed to read account entry from history: {}", e)) - .map(|data| { - data.unwrap_or_else(|| { - log::error!("Account token set in settings but not in account history"); - account_history::AccountEntry { - account: account_token.clone(), - wireguard: None, - } - }) - })?; + async fn on_generate_wireguard_key_inner(&mut self) -> Result<KeygenEvent, String> { + let account_token = self + .settings + .get_account_token() + .ok_or_else(|| "No account token set".to_owned())?; - let gen_result = match &account_entry.wireguard { - Some(wireguard_data) => self - .wireguard_key_manager - .replace_key(account_token.clone(), wireguard_data.get_public_key()), - None => self - .wireguard_key_manager - .generate_key_sync(account_token.clone()), - }; + let mut account_entry = self + .account_history + .get(&account_token) + .await + .map_err(|e| format!("Failed to read account entry from history: {}", e)) + .map(|data| { + data.unwrap_or_else(|| { + log::error!("Account token set in settings but not in account history"); + account_history::AccountEntry { + account: account_token.clone(), + wireguard: None, + } + }) + })?; + + let gen_result = match &account_entry.wireguard { + Some(wireguard_data) => { + self.wireguard_key_manager + .replace_key(account_token.clone(), wireguard_data.get_public_key()) + .await + } + None => { + self.wireguard_key_manager + .generate_key_sync(account_token.clone()) + .await + } + }; - match gen_result { - Ok(new_data) => { - let public_key = new_data.get_public_key(); - account_entry.wireguard = Some(new_data); - self.account_history.insert(account_entry).map_err(|e| { + match gen_result { + Ok(new_data) => { + let public_key = new_data.get_public_key(); + account_entry.wireguard = Some(new_data); + self.account_history + .insert(account_entry) + .await + .map_err(|e| { format!("Failed to add new wireguard key to account data: {}", e) })?; - if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() { - self.reconnect_tunnel(); - } - let keygen_event = KeygenEvent::NewKey(public_key); - self.event_listener.notify_key_event(keygen_event.clone()); + if let Some(TunnelType::Wireguard) = self.get_connected_tunnel_type() { + self.reconnect_tunnel(); + } + let keygen_event = KeygenEvent::NewKey(public_key); + self.event_listener.notify_key_event(keygen_event.clone()); - // update automatic rotation - self.wireguard_key_manager.set_rotation_interval( + // update automatic rotation + self.wireguard_key_manager + .set_rotation_interval( &mut self.account_history, account_token, self.settings @@ -1745,39 +1802,31 @@ where .wireguard .automatic_rotation .map(|hours| Duration::from_secs(60u64 * 60u64 * hours as u64)), - ); - - Ok(keygen_event) - } - Err(wireguard::Error::TooManyKeys) => Ok(KeygenEvent::TooManyKeys), - Err(e) => Err(format!( - "Failed to generate new key - {}", - e.display_chain_with_msg("Failed to generate new wireguard key:") - )), - } - }; + ) + .await; - match result() { - Ok(key_event) => { - Self::oneshot_send(tx, key_event, "generate_wireguard_key response"); - } - Err(e) => { - log::error!("Failed to generate new wireguard key - {}", e); + Ok(keygen_event) } + Err(wireguard::Error::TooManyKeys) => Ok(KeygenEvent::TooManyKeys), + Err(e) => Err(format!( + "Failed to generate new key - {}", + e.display_chain_with_msg("Failed to generate new wireguard key:") + )), } } - fn on_get_wireguard_key(&mut self, tx: oneshot::Sender<Option<wireguard::PublicKey>>) { - let key = self - .settings - .get_account_token() - .and_then(|account| self.account_history.get(&account).ok()?) - .and_then(|account_entry| account_entry.wireguard.map(|wg| wg.get_public_key())); - - Self::oneshot_send(tx, key, "get_wireguard_key response"); + async fn on_get_wireguard_key(&mut self, tx: oneshot::Sender<Option<wireguard::PublicKey>>) { + let token = self.settings.get_account_token(); + if let Some(token) = token { + let entry = self.account_history.get(&token).await; + if let Ok(Some(entry)) = entry { + let key = entry.wireguard.map(|wg| wg.get_public_key()); + Self::oneshot_send(tx, key, "get_wireguard_key response"); + } + } } - fn on_verify_wireguard_key(&mut self, tx: oneshot::Sender<bool>) { + async fn on_verify_wireguard_key(&mut self, tx: oneshot::Sender<bool>) { let account = match self.settings.get_account_token() { Some(account) => account, None => { @@ -1789,6 +1838,7 @@ where let key = self .account_history .get(&account) + .await .map(|entry| entry.and_then(|e| e.wireguard.map(|wg| wg.private_key.public_key()))); let public_key = match key { @@ -1807,7 +1857,7 @@ where .wireguard_key_manager .verify_wireguard_key(account, public_key); - self.rpc_handle.service().spawn(async move { + tokio::spawn(async move { match verification_rpc.await { Ok(is_valid) => { Self::oneshot_send(tx, is_valid, "verify_wireguard_key response"); @@ -1982,7 +2032,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { &mut self, retry_attempt: u32, ) -> Result<TunnelParameters, ParameterGenerationError> { - let (response_tx, response_rx) = mpsc::channel(); + let (response_tx, response_rx) = sync_mpsc::channel(); if self .tx .send(InternalDaemonEvent::GenerateTunnelParameters( diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index adb7c39f0a..4e055183b9 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -4,7 +4,9 @@ use log::{debug, error, info, warn}; use mullvad_daemon::{ logging, management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer}, - rpc_uniqueness_check, version, Daemon, DaemonCommandChannel, DaemonCommandSender, + rpc_uniqueness_check, + runtime::new_runtime_builder, + version, Daemon, DaemonCommandChannel, DaemonCommandSender, }; use std::{path::PathBuf, thread, time::Duration}; use talpid_types::ErrorExt; @@ -23,7 +25,13 @@ fn main() { eprintln!("{}", error); std::process::exit(1) }); - let exit_code = match run_platform(config, log_dir) { + + let mut runtime = new_runtime_builder().build().unwrap_or_else(|error| { + eprintln!("{}", error.display_chain()); + std::process::exit(1); + }); + + let exit_code = match runtime.block_on(run_platform(config, log_dir)) { Ok(_) => 0, Err(error) => { error!("{}", error); @@ -64,7 +72,7 @@ fn get_log_dir(config: &cli::Config) -> Result<Option<PathBuf>, String> { } #[cfg(windows)] -fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> { +async fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> { if config.run_as_service { system_service::run() } else { @@ -75,46 +83,39 @@ fn run_platform(config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), St } install_result } else { - run_standalone(log_dir) + run_standalone(log_dir).await } } } #[cfg(not(windows))] -fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> { - run_standalone(log_dir) +async fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), String> { + run_standalone(log_dir).await } -fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> { - { - let mut runtime = tokio02::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .map_err(|e| e.display_chain())?; - if runtime.block_on(rpc_uniqueness_check::is_another_instance_running()) { - return Err("Another instance of the daemon is already running".to_owned()); - } +async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> { + if rpc_uniqueness_check::is_another_instance_running().await { + return Err("Another instance of the daemon is already running".to_owned()); } if !running_as_admin() { warn!("Running daemon as a non-administrator user, clients might refuse to connect"); } - let daemon = create_daemon(log_dir)?; + let daemon = create_daemon(log_dir).await?; let shutdown_handle = daemon.shutdown_handle(); shutdown::set_shutdown_signal_handler(move || shutdown_handle.shutdown()) .map_err(|e| e.display_chain())?; - daemon.run().map_err(|e| e.display_chain())?; + daemon.run().await.map_err(|e| e.display_chain())?; info!("Mullvad daemon is quitting"); thread::sleep(Duration::from_millis(500)); Ok(()) } -fn create_daemon( +async fn create_daemon( log_dir: Option<PathBuf>, ) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> { let resource_dir = mullvad_paths::get_resource_dir(); @@ -124,7 +125,7 @@ fn create_daemon( .map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?; let command_channel = DaemonCommandChannel::new(); - let event_listener = spawn_management_interface(command_channel.sender())?; + let event_listener = spawn_management_interface(command_channel.sender()).await?; Daemon::start( log_dir, @@ -134,23 +135,22 @@ fn create_daemon( event_listener, command_channel, ) + .await .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon")) } -fn spawn_management_interface( +async fn spawn_management_interface( command_sender: DaemonCommandSender, ) -> Result<ManagementInterfaceEventBroadcaster, String> { - let server = ManagementInterfaceServer::start(command_sender).map_err(|error| { - error.display_chain_with_msg("Unable to start management interface server") - })?; + let server = ManagementInterfaceServer::start(command_sender) + .await + .map_err(|error| { + error.display_chain_with_msg("Unable to start management interface server") + })?; let event_broadcaster = server.event_broadcaster(); info!("Management interface listening on {}", server.socket_path()); - - thread::spawn(|| { - server.wait(); - info!("Management interface shut down"); - }); + tokio::spawn(server.run()); Ok(event_broadcaster) } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index dee8cb3eea..cba6e96b43 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,6 +1,5 @@ use crate::{DaemonCommand, DaemonCommandSender, EventListener}; -use futures::compat::Future01CompatExt; -use futures01::{future, sync, Future}; +use futures::channel::oneshot; use mullvad_management_interface::{ types::{self, daemon_event, management_service_server::ManagementService}, Code, Request, Response, Status, @@ -36,10 +35,6 @@ pub enum Error { // Unable to start the management interface server #[error(display = "Unable to start management interface server")] SetupError(#[error(source)] mullvad_management_interface::Error), - - // Unable to start the tokio runtime - #[error(display = "Failed to create the tokio runtime")] - TokioRuntimeError(#[error(source)] tokio02::io::Error), } struct ManagementServiceImpl { @@ -49,20 +44,17 @@ struct ManagementServiceImpl { pub type ServiceResult<T> = std::result::Result<Response<T>, Status>; type EventsListenerReceiver = - tokio02::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>; -type EventsListenerSender = - tokio02::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>; + tokio::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>; +type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>; -const INVALID_ACCOUNT_TOKEN_MESSAGE: &str = "No valid account token configured"; const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid"; const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used"; #[mullvad_management_interface::async_trait] impl ManagementService for ManagementServiceImpl { type GetRelayLocationsStream = - tokio02::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>; - type GetSplitTunnelProcessesStream = - tokio02::sync::mpsc::UnboundedReceiver<Result<i32, Status>>; + tokio::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>; + type GetSplitTunnelProcessesStream = tokio::sync::mpsc::UnboundedReceiver<Result<i32, Status>>; type EventsListenStream = EventsListenerReceiver; // Control and get the tunnel state @@ -71,53 +63,39 @@ impl ManagementService for ManagementServiceImpl { async fn connect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("connect_tunnel"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|result| match result { - Ok(()) => Ok(Response::new(())), - Err(()) => Err(Status::new( - Code::Unauthenticated, - INVALID_ACCOUNT_TOKEN_MESSAGE, - )), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))?; + rx.await.map_err(|_| Status::internal("internal error"))?; + Ok(Response::new(())) } async fn disconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("disconnect_tunnel"); - let (tx, _) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured)) - .then(|_| Ok(Response::new(()))) - .compat() - .await + let (tx, _) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?; + Ok(Response::new(())) } async fn reconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("reconnect_tunnel"); - self.send_command_to_daemon(DaemonCommand::Reconnect) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::Reconnect)?; + Ok(Response::new(())) } async fn get_tunnel_state(&self, _: Request<()>) -> ServiceResult<types::TunnelState> { log::debug!("get_tunnel_state"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetState(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|state| Ok(Response::new(convert_state(state)))) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetState(tx))?; + let state = rx.await.map_err(|_| Status::internal("internal error"))?; + Ok(Response::new(convert_state(state))) } // Control the daemon and receive events // async fn events_listen(&self, _: Request<()>) -> ServiceResult<Self::EventsListenStream> { - let (tx, rx) = tokio02::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let mut subscriptions = self.subscriptions.write(); subscriptions.push(tx); @@ -127,30 +105,25 @@ impl ManagementService for ManagementServiceImpl { async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("prepare_restart"); - self.send_command_to_daemon(DaemonCommand::PrepareRestart) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::PrepareRestart)?; + Ok(Response::new(())) } async fn shutdown(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("shutdown"); - self.send_command_to_daemon(DaemonCommand::Shutdown) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::Shutdown)?; + Ok(Response::new(())) } async fn factory_reset(&self, _: Request<()>) -> ServiceResult<()> { #[cfg(not(target_os = "android"))] { log::debug!("factory_reset"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::FactoryReset(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } #[cfg(target_os = "android")] { @@ -160,25 +133,22 @@ impl ManagementService for ManagementServiceImpl { async fn get_current_version(&self, _: Request<()>) -> ServiceResult<String> { log::debug!("get_current_version"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_version_info(&self, _: Request<()>) -> ServiceResult<types::AppVersionInfo> { log::debug!("get_version_info"); - let (tx, rx) = sync::oneshot::channel(); - let app_version_info = self - .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; - - Ok(Response::new(convert_version_info(&app_version_info))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) + .map(|version_info| convert_version_info(&version_info)) + .map(Response::new) } // Relays and tunnel constraints @@ -186,10 +156,8 @@ impl ManagementService for ManagementServiceImpl { async fn update_relay_locations(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("update_relay_locations"); - self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations) - .compat() - .await - .map(Response::new) + self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)?; + Ok(Response::new(())) } async fn update_relay_settings( @@ -197,15 +165,14 @@ impl ManagementService for ManagementServiceImpl { request: Request<types::RelaySettingsUpdate>, ) -> ServiceResult<()> { log::debug!("update_relay_settings"); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let constraints_update = convert_relay_settings_update(&request.into_inner())?; let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update); - self.send_command_to_daemon(message) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + self.send_command_to_daemon(message)?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_relay_locations( @@ -214,17 +181,14 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<Self::GetRelayLocationsStream> { log::debug!("get_relay_locations"); - let (tx, rx) = sync::oneshot::channel(); - let locations = self - .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?; + let locations = rx.await.map_err(|_| Status::internal("internal error"))?; let (mut stream_tx, stream_rx) = - tokio02::sync::mpsc::channel(cmp::max(1, locations.countries.len())); + tokio::sync::mpsc::channel(cmp::max(1, locations.countries.len())); - tokio02::spawn(async move { + tokio::spawn(async move { for country in &locations.countries { if let Err(error) = stream_tx .send(Ok(convert_relay_list_country(country))) @@ -243,18 +207,13 @@ impl ManagementService for ManagementServiceImpl { async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> { log::debug!("get_current_location"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|geoip| { - if let Some(geoip) = geoip { - Ok(Response::new(convert_geoip_location(geoip))) - } else { - Err(Status::not_found("no location was found")) - } - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + match result { + Some(geoip) => Ok(Response::new(convert_geoip_location(geoip))), + None => Err(Status::not_found("no location was found")), + } } async fn set_bridge_settings( @@ -328,15 +287,12 @@ impl ManagementService for ManagementServiceImpl { log::debug!("set_bridge_settings({:?})", settings); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|settings_result| { - settings_result.map_err(|_| Status::internal("internal error")) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))?; + let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?; + settings_result .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_bridge_state(&self, request: Request<types::BridgeState>) -> ServiceResult<()> { @@ -350,15 +306,12 @@ impl ManagementService for ManagementServiceImpl { }; log::debug!("set_bridge_state({:?})", bridge_state); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|settings_result| { - settings_result.map_err(|_| Status::internal("internal error")) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))?; + let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?; + settings_result .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } // Settings @@ -366,59 +319,54 @@ impl ManagementService for ManagementServiceImpl { async fn get_settings(&self, _: Request<()>) -> ServiceResult<types::Settings> { log::debug!("get_settings"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetSettings(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetSettings(tx))?; + rx.await .map(|settings| Response::new(convert_settings(&settings))) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_allow_lan(&self, request: Request<bool>) -> ServiceResult<()> { let allow_lan = request.into_inner(); log::debug!("set_allow_lan({})", allow_lan); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_show_beta_releases(&self, request: Request<bool>) -> ServiceResult<()> { let enabled = request.into_inner(); log::debug!("set_show_beta_releases({})", enabled); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_block_when_disconnected(&self, request: Request<bool>) -> ServiceResult<()> { let block_when_disconnected = request.into_inner(); log::debug!("set_block_when_disconnected({})", block_when_disconnected); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected( tx, block_when_disconnected, - )) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .map(Response::new) - .compat() - .await + ))?; + rx.await + .map(Response::new) + .map_err(|_| Status::internal("internal error")) } async fn set_auto_connect(&self, request: Request<bool>) -> ServiceResult<()> { let auto_connect = request.into_inner(); log::debug!("set_auto_connect({})", auto_connect); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_openvpn_mssfix(&self, request: Request<u32>) -> ServiceResult<()> { @@ -429,50 +377,45 @@ impl ManagementService for ManagementServiceImpl { None }; log::debug!("set_openvpn_mssfix({:?})", mssfix); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_wireguard_mtu(&self, request: Request<u32>) -> ServiceResult<()> { let mtu = request.into_inner(); let mtu = if mtu != 0 { Some(mtu as u16) } else { None }; log::debug!("set_wireguard_mtu({:?})", mtu); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_enable_ipv6(&self, request: Request<bool>) -> ServiceResult<()> { let enable_ipv6 = request.into_inner(); log::debug!("set_enable_ipv6({})", enable_ipv6); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } // Account management // async fn create_new_account(&self, _: Request<()>) -> ServiceResult<String> { - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|result| match result { - Ok(account_token) => Ok(Response::new(account_token)), - Err(_) => Err(Status::internal("internal error")), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + match result { + Ok(account_token) => Ok(Response::new(account_token)), + Err(_) => Err(Status::internal("internal error")), + } } async fn set_account(&self, request: Request<AccountToken>) -> ServiceResult<()> { @@ -483,14 +426,11 @@ impl ManagementService for ManagementServiceImpl { } else { Some(account_token) }; - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token)) - .and_then(|_| { - rx.map(Response::new) - .map_err(|_| Status::internal("internal error")) - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))?; + rx.await + .map(Response::new) + .map_err(|_| Status::internal("internal error")) } async fn get_account_data( @@ -499,40 +439,35 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<types::AccountData> { log::debug!("get_account_data"); let account_token = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|rpc_future| { - rpc_future - .map(|account_data| { - Response::new(types::AccountData { - expiry: Some(types::Timestamp { - seconds: account_data.expiry.timestamp(), - nanos: 0, - }), - }) - }) - .map_err(|error: RestError| { - log::error!( - "Unable to get account data from API: {}", - error.display_chain() - ); - map_rest_account_error(error) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + result + .map(|account_data| { + Response::new(types::AccountData { + expiry: Some(types::Timestamp { + seconds: account_data.expiry.timestamp(), + nanos: 0, + }), + }) + }) + .map_err(|error: RestError| { + log::error!( + "Unable to get account data from API: {}", + error.display_chain() + ); + map_rest_account_error(error) }) - .compat() - .await } async fn get_account_history(&self, _: Request<()>) -> ServiceResult<types::AccountHistory> { // TODO: this might be a stream log::debug!("get_account_history"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(|history| Response::new(types::AccountHistory { token: history })) - .compat() - .await } async fn remove_account_from_history( @@ -541,42 +476,36 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<()> { log::debug!("remove_account_from_history"); let account_token = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn clear_account_history(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("clear_account_history"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_www_auth_token(&self, _: Request<()>) -> ServiceResult<String> { log::debug!("get_www_auth_token"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|rpc_future| { - rpc_future - .map(Response::new) - .map_err(|error: mullvad_rpc::rest::Error| { - log::error!( - "Unable to get account data from API: {}", - error.display_chain() - ); - map_rest_account_error(error) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + result + .map(Response::new) + .map_err(|error: mullvad_rpc::rest::Error| { + log::error!( + "Unable to get account data from API: {}", + error.display_chain() + ); + map_rest_account_error(error) }) - .compat() - .await } async fn submit_voucher( @@ -585,38 +514,33 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<types::VoucherSubmission> { log::debug!("submit_voucher"); let voucher = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|f| { - f.map(|submission| { - Response::new(types::VoucherSubmission { - seconds_added: submission.time_added, - new_expiry: Some(types::Timestamp { - seconds: submission.new_expiry.timestamp(), - nanos: 0, - }), - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + result + .map(|submission| { + Response::new(types::VoucherSubmission { + seconds_added: submission.time_added, + new_expiry: Some(types::Timestamp { + seconds: submission.new_expiry.timestamp(), + nanos: 0, + }), }) - .map_err(|e| match e { - RestError::ApiError(StatusCode::BAD_REQUEST, message) => { - match &message.as_str() { - &mullvad_rpc::INVALID_VOUCHER => { - Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE) - } - - &mullvad_rpc::VOUCHER_USED => { - Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE) - } + }) + .map_err(|e| match e { + RestError::ApiError(StatusCode::BAD_REQUEST, message) => match &message.as_str() { + &mullvad_rpc::INVALID_VOUCHER => { + Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE) + } - _ => Status::internal("internal error"), - } + &mullvad_rpc::VOUCHER_USED => { + Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE) } + _ => Status::internal("internal error"), - }) + }, + _ => Status::internal("internal error"), }) - .compat() - .await } // WireGuard key management @@ -626,61 +550,55 @@ impl ManagementService for ManagementServiceImpl { let interval = request.into_inner(); log::debug!("set_wireguard_rotation_interval({:?})", interval); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval( tx, Some(interval), - )) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .map(Response::new) - .compat() - .await + ))?; + rx.await + .map_err(|_| Status::internal("internal error")) + .map(Response::new) } async fn reset_wireguard_rotation_interval(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("reset_wireguard_rotation_interval"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } async fn generate_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::KeygenEvent> { // TODO: return error for TooManyKeys, GenerationFailure // on success, simply return the new key or nil log::debug!("generate_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(|event| Response::new(convert_wireguard_key_event(&event))) - .compat() - .await } async fn get_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::PublicKey> { log::debug!("get_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .then(|response| match response { - Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))), - Ok(None) => Err(Status::not_found("no WireGuard key was found")), - Err(e) => Err(e), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))?; + let response = rx.await.map_err(|_| Status::internal("internal error")); + match response { + Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))), + Ok(None) => Err(Status::not_found("no WireGuard key was found")), + Err(e) => Err(e), + } } async fn verify_wireguard_key(&self, _: Request<()>) -> ServiceResult<bool> { log::debug!("verify_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } // Split tunneling @@ -693,15 +611,12 @@ impl ManagementService for ManagementServiceImpl { #[cfg(target_os = "linux")] { log::debug!("get_split_tunnel_processes"); - let (tx, rx) = sync::oneshot::channel(); - let pids = self - .send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))?; + let pids = rx.await.map_err(|_| Status::internal("internal error"))?; - let (tx, rx) = tokio02::sync::mpsc::unbounded_channel(); - tokio02::spawn(async move { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn(async move { for pid in pids { let _ = tx.send(Ok(pid)); } @@ -711,7 +626,7 @@ impl ManagementService for ManagementServiceImpl { } #[cfg(not(target_os = "linux"))] { - let (_, rx) = tokio02::sync::mpsc::unbounded_channel(); + let (_, rx) = tokio::sync::mpsc::unbounded_channel(); Ok(Response::new(rx)) } } @@ -720,12 +635,11 @@ impl ManagementService for ManagementServiceImpl { async fn add_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> { let pid = request.into_inner(); log::debug!("add_split_tunnel_process"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] async fn add_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> { @@ -736,12 +650,11 @@ impl ManagementService for ManagementServiceImpl { async fn remove_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> { let pid = request.into_inner(); log::debug!("remove_split_tunnel_process"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] async fn remove_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> { @@ -752,12 +665,11 @@ impl ManagementService for ManagementServiceImpl { #[cfg(target_os = "linux")] { log::debug!("clear_split_tunnel_processes"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] { @@ -768,15 +680,10 @@ impl ManagementService for ManagementServiceImpl { impl ManagementServiceImpl { /// Sends a command to the daemon and maps the error to an RPC error. - fn send_command_to_daemon( - &self, - command: DaemonCommand, - ) -> impl Future<Item = (), Error = Status> { - future::result( - self.daemon_tx - .send(command) - .map_err(|_| Status::internal("internal error")), - ) + fn send_command_to_daemon(&self, command: DaemonCommand) -> Result<(), Status> { + self.daemon_tx + .send(command) + .map_err(|_| Status::internal("internal error")) } } @@ -1527,23 +1434,14 @@ fn convert_proto_location(location: types::RelayLocation) -> Constraint<Location pub struct ManagementInterfaceServer { subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>, socket_path: String, - runtime: tokio02::runtime::Runtime, server_abort_tx: triggered::Trigger, server_join_handle: Option< - tokio02::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>, + tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>, >, } impl ManagementInterfaceServer { - pub fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> { - // TODO: don't spawn a tokio runtime here; make this function async - let mut runtime = tokio02::runtime::Builder::new() - .threaded_scheduler() - .core_threads(1) - .enable_all() - .build() - .map_err(Error::TokioRuntimeError)?; - + pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> { let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default(); let socket_path = mullvad_paths::get_rpc_socket_path() @@ -1556,15 +1454,15 @@ impl ManagementInterfaceServer { daemon_tx: tunnel_tx, subscriptions: subscriptions.clone(), }; - let server_join_handle = runtime.spawn(mullvad_management_interface::spawn_rpc_server( + let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server( server, start_tx, server_abort_rx, )); if let Err(_) = start_rx.recv() { - return Err(runtime - .block_on(server_join_handle) + return Err(server_join_handle + .await .expect("Failed to resolve quit handle future") .map_err(Error::SetupError) .unwrap_err()); @@ -1573,7 +1471,6 @@ impl ManagementInterfaceServer { Ok(ManagementInterfaceServer { subscriptions, socket_path, - runtime, server_abort_tx, server_join_handle: Some(server_join_handle), }) @@ -1585,19 +1482,18 @@ impl ManagementInterfaceServer { pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster { ManagementInterfaceEventBroadcaster { - runtime: self.runtime.handle().clone(), subscriptions: self.subscriptions.clone(), close_handle: self.server_abort_tx.clone(), } } - /// Consumes the server and waits for it to finish. Returns an error if the server exited - /// due to an error. - pub fn wait(mut self) { + /// Consumes the server and waits for it to finish. + pub async fn run(self) { if let Some(server_join_handle) = self.server_join_handle { - if let Err(error) = self.runtime.block_on(server_join_handle) { + if let Err(error) = server_join_handle.await { log::error!("Management server panic: {:?}", error); } + log::info!("Management interface shut down"); } } } @@ -1605,7 +1501,6 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. #[derive(Clone)] pub struct ManagementInterfaceEventBroadcaster { - runtime: tokio02::runtime::Handle, subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>, close_handle: triggered::Trigger, } diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 78e0292023..ce880a3d19 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -33,7 +33,7 @@ use talpid_types::{ net::{all_of_the_internet, openvpn::ProxySettings, wireguard, TransportProtocol, TunnelType}, ErrorExt, }; -use tokio02::fs::File; +use tokio::fs::File; const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f"; const RELAYS_FILENAME: &str = "relays.json"; @@ -807,7 +807,7 @@ impl RelayListUpdater { } async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { - let mut check_interval = tokio02::time::interval(UPDATE_CHECK_INTERVAL).fuse(); + let mut check_interval = tokio::time::interval(UPDATE_CHECK_INTERVAL).fuse(); let mut download_future = Box::pin(Fuse::terminated()); loop { futures::select! { @@ -918,7 +918,7 @@ impl RelayListUpdater { .map_err(Error::OpenRelayCache)?; let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?; let mut slice: &[u8] = bytes.as_slice(); - let _ = tokio02::io::copy(&mut slice, &mut file) + let _ = tokio::io::copy(&mut slice, &mut file) .await .map_err(Error::WriteRelayCache)?; Ok(()) diff --git a/mullvad-daemon/src/runtime.rs b/mullvad-daemon/src/runtime.rs new file mode 100644 index 0000000000..3c60b133e8 --- /dev/null +++ b/mullvad-daemon/src/runtime.rs @@ -0,0 +1,11 @@ +use tokio::runtime; + +pub fn new_runtime_builder() -> runtime::Builder { + let mut builder = runtime::Builder::new(); + builder + .threaded_scheduler() + .core_threads(4) + .max_threads(8) + .enable_all(); + builder +} diff --git a/mullvad-daemon/src/system_service.rs b/mullvad-daemon/src/system_service.rs index 8b40e8965c..cb27a443e8 100644 --- a/mullvad-daemon/src/system_service.rs +++ b/mullvad-daemon/src/system_service.rs @@ -1,5 +1,5 @@ use crate::cli; -use mullvad_daemon::DaemonShutdownHandle; +use mullvad_daemon::{runtime::new_runtime_builder, DaemonShutdownHandle}; use std::{ env, ffi::OsString, @@ -102,7 +102,20 @@ fn run_service() -> Result<(), String> { let clean_shutdown = Arc::new(AtomicBool::new(false)); let log_dir = crate::get_log_dir(cli::get_config()).expect("Log dir should be available here"); - let result = crate::create_daemon(log_dir).and_then(|daemon| { + + let runtime = new_runtime_builder().build(); + let mut runtime = match runtime { + Err(error) => { + persistent_service_status + .set_stopped(ServiceExitCode::ServiceSpecific(1)) + .unwrap(); + return Err(error.display_chain()); + } + Ok(runtime) => runtime, + }; + + let result = runtime.block_on(crate::create_daemon(log_dir)); + let result = if let Ok(daemon) = result { let shutdown_handle = daemon.shutdown_handle(); // Register monitor that translates `ServiceControl` to Daemon events @@ -115,8 +128,12 @@ fn run_service() -> Result<(), String> { persistent_service_status.set_running().unwrap(); - daemon.run().map_err(|e| e.display_chain()) - }); + runtime + .block_on(daemon.run()) + .map_err(|e| e.display_chain()) + } else { + result.map(|_| ()) + }; let exit_code = match result { Ok(()) => { diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs index 6092c5b0de..183082f457 100644 --- a/mullvad-daemon/src/version_check.rs +++ b/mullvad-daemon/src/version_check.rs @@ -17,7 +17,7 @@ use std::{ }; use talpid_core::mpsc::Sender; use talpid_types::ErrorExt; -use tokio02::fs::File; +use tokio::fs::File; const VERSION_INFO_FILENAME: &str = "version-info.json"; @@ -165,7 +165,7 @@ impl VersionUpdater { let mut buf = serde_json::to_vec_pretty(&cached_app_version).map_err(Error::Serialize)?; let mut read_buf: &[u8] = buf.as_mut(); - let _ = tokio02::io::copy(&mut read_buf, &mut file) + let _ = tokio::io::copy(&mut read_buf, &mut file) .await .map_err(Error::WriteVersionCache)?; Ok(()) @@ -218,7 +218,7 @@ impl VersionUpdater { pub async fn run(mut self) { let mut rx = self.rx.take().unwrap().fuse(); - let next_delay = || tokio02::time::delay_for(UPDATE_CHECK_INTERVAL).fuse(); + let next_delay = || tokio::time::delay_for(UPDATE_CHECK_INTERVAL).fuse(); let mut check_delay = next_delay(); let mut version_check = futures::future::Fuse::terminated(); diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index 2ebd5fae50..189d1f5a6a 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -19,7 +19,6 @@ pub use talpid_types::net::wireguard::{ ConnectionConfig, PrivateKey, TunnelConfig, TunnelParameters, }; use talpid_types::ErrorExt; -use tokio_timer; /// Default automatic key rotation const DEFAULT_AUTOMATIC_KEY_ROTATION: Duration = Duration::from_secs(7 * 24 * 60 * 60); @@ -35,8 +34,6 @@ pub enum Error { RestError(#[error(source)] mullvad_rpc::rest::Error), #[error(display = "Account already has maximum number of keys")] TooManyKeys, - #[error(display = "Failed to create rotation timer")] - RotationScheduleError(#[error(source)] tokio_timer::TimerError), } pub type Result<T> = std::result::Result<T, Error>; @@ -63,16 +60,19 @@ impl KeyManager { /// Reset key rotation, cancelling the current one and starting a new one for the specified /// account - pub fn reset_rotation( + pub async fn reset_rotation( &mut self, account_history: &mut AccountHistory, account_token: AccountToken, ) { match account_history .get(&account_token) + .await .map(|entry| entry.map(|entry| entry.wireguard.map(|wg| wg.get_public_key()))) { - Ok(Some(Some(public_key))) => self.run_automatic_rotation(account_token, public_key), + Ok(Some(Some(public_key))) => { + self.run_automatic_rotation(account_token, public_key).await + } Ok(Some(None)) => { log::error!("reset_rotation: failed to obtain public key for account entry.") } @@ -84,7 +84,7 @@ impl KeyManager { /// Update automatic key rotation interval /// Passing `None` for the interval will use the default value. /// A duration of `0` disables automatic key rotation. - pub fn set_rotation_interval( + pub async fn set_rotation_interval( &mut self, account_history: &mut AccountHistory, account_token: AccountToken, @@ -93,7 +93,7 @@ impl KeyManager { self.auto_rotation_interval = auto_rotation_interval.unwrap_or(DEFAULT_AUTOMATIC_KEY_ROTATION); - self.reset_rotation(account_history, account_token); + self.reset_rotation(account_history, account_token).await; } /// Stop current key generation @@ -104,19 +104,18 @@ impl KeyManager { } /// Generate a new private key - pub fn generate_key_sync(&mut self, account: AccountToken) -> Result<WireguardData> { + pub async fn generate_key_sync(&mut self, account: AccountToken) -> Result<WireguardData> { self.reset(); let private_key = PrivateKey::new_from_random(); - self.http_handle - .service() - .block_on(self.push_future_generator(account, private_key, None)()) + self.push_future_generator(account, private_key, None)() + .await .map_err(Self::map_rpc_error) } /// Replace a key for an account synchronously - pub fn replace_key( + pub async fn replace_key( &mut self, account: AccountToken, old_key: PublicKey, @@ -124,12 +123,7 @@ impl KeyManager { self.reset(); let new_key = PrivateKey::new_from_random(); - self.http_handle.service().block_on(Self::replace_key_rpc( - self.http_handle.clone(), - account, - old_key, - new_key, - )) + Self::replace_key_rpc(self.http_handle.clone(), account, old_key, new_key).await } /// Verifies whether a key is valid or not. @@ -154,7 +148,7 @@ impl KeyManager { /// Generate a new private key asynchronously. The new keys will be sent to the daemon channel. - pub fn generate_key_async(&mut self, account: AccountToken, timeout: Option<Duration>) { + pub async fn generate_key_async(&mut self, account: AccountToken, timeout: Option<Duration>) { self.reset(); let private_key = PrivateKey::new_from_random(); @@ -222,7 +216,7 @@ impl KeyManager { }; - self.http_handle.service().spawn(Box::pin(future)); + tokio::spawn(Box::pin(future)); self.current_job = Some(abort_handle); } @@ -291,7 +285,7 @@ impl KeyManager { } async fn key_rotation_timer(key: PublicKey, rotation_interval_secs: u64) { - let mut interval = tokio02::time::interval(KEY_CHECK_INTERVAL); + let mut interval = tokio::time::interval(KEY_CHECK_INTERVAL); loop { interval.tick().await; if (Utc::now().signed_duration_since(key.created)).num_seconds() as u64 @@ -342,7 +336,7 @@ impl KeyManager { rotation_interval_secs: u64, account_token: AccountToken, ) { - let mut interval = tokio02::time::interval_at( + let mut interval = tokio::time::interval_at( (Instant::now() + AUTOMATIC_ROTATION_RETRY_DELAY).into(), AUTOMATIC_ROTATION_RETRY_DELAY, ); @@ -375,7 +369,7 @@ impl KeyManager { } } - fn run_automatic_rotation(&mut self, account_token: AccountToken, public_key: PublicKey) { + async fn run_automatic_rotation(&mut self, account_token: AccountToken, public_key: PublicKey) { self.stop_automatic_rotation(); if self.auto_rotation_interval == Duration::new(0, 0) { @@ -394,7 +388,7 @@ impl KeyManager { ); let (request, abort_handle) = abortable(Box::pin(fut)); - self.http_handle.service().spawn(request); + tokio::spawn(request); self.abort_scheduler_tx = Some(abort_handle); } diff --git a/mullvad-jni/Cargo.toml b/mullvad-jni/Cargo.toml index b65909d669..d30a6502f7 100644 --- a/mullvad-jni/Cargo.toml +++ b/mullvad-jni/Cargo.toml @@ -12,16 +12,15 @@ crate_type = ["cdylib"] [target.'cfg(target_os = "android")'.dependencies] err-derive = "0.2.1" -futures = "0.1" +futures = "0.3" ipnetwork = "0.16" jnix = { version = "0.2.3", features = ["derive"] } -jsonrpc-client-core = "0.5" -jsonrpc-core = "8" lazy_static = "1" log = "0.4" log-panics = "2" nix = "0.17" rand = "0.7" +tokio = "0.2" mullvad-daemon = { path = "../mullvad-daemon" } mullvad-paths = { path = "../mullvad-paths" } diff --git a/mullvad-jni/src/daemon_interface.rs b/mullvad-jni/src/daemon_interface.rs index c8294fa744..d8a3b980cf 100644 --- a/mullvad-jni/src/daemon_interface.rs +++ b/mullvad-jni/src/daemon_interface.rs @@ -1,4 +1,4 @@ -use futures::{sync::oneshot, Future}; +use futures::{channel::oneshot, executor::block_on}; use mullvad_daemon::{DaemonCommand, DaemonCommandSender}; use mullvad_types::{ account::{AccountData, VoucherSubmission}, @@ -42,9 +42,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Secured))?; - rx.wait().map_err(|_| Error::NoResponse)?.unwrap(); - - Ok(()) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn create_new_account(&self) -> Result<String> { @@ -52,7 +50,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::CreateNewAccount(tx))?; - rx.wait() + block_on(rx) .map_err(|_| Error::NoResponse)? .map_err(Error::RpcError) } @@ -62,9 +60,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?; - rx.wait().map_err(|_| Error::NoResponse)?.unwrap(); - - Ok(()) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn generate_wireguard_key(&self) -> Result<KeygenEvent> { @@ -72,7 +68,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GenerateWireguardKey(tx))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn get_account_data(&self, account_token: String) -> Result<AccountData> { @@ -80,9 +76,8 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetAccountData(tx, account_token))?; - rx.wait() + block_on(rx) .map_err(|_| Error::NoResponse)? - .wait() .map_err(Error::RpcError) } @@ -91,7 +86,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetAccountHistory(tx))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn get_www_auth_token(&self) -> Result<String> { @@ -99,9 +94,8 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetWwwAuthToken(tx))?; - rx.wait() + block_on(rx) .map_err(|_| Error::NoResponse)? - .wait() .map_err(Error::RpcError) } @@ -110,7 +104,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetCurrentLocation(tx))?; - Ok(rx.wait().map_err(|_| Error::NoResponse)?) + Ok(block_on(rx).map_err(|_| Error::NoResponse)?) } pub fn get_current_version(&self) -> Result<String> { @@ -118,7 +112,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetCurrentVersion(tx))?; - Ok(rx.wait().map_err(|_| Error::NoResponse)?) + Ok(block_on(rx).map_err(|_| Error::NoResponse)?) } pub fn get_relay_locations(&self) -> Result<RelayList> { @@ -126,7 +120,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetRelayLocations(tx))?; - Ok(rx.wait().map_err(|_| Error::NoResponse)?) + Ok(block_on(rx).map_err(|_| Error::NoResponse)?) } pub fn get_settings(&self) -> Result<Settings> { @@ -134,7 +128,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetSettings(tx))?; - Ok(rx.wait().map_err(|_| Error::NoResponse)?) + Ok(block_on(rx).map_err(|_| Error::NoResponse)?) } pub fn get_state(&self) -> Result<TunnelState> { @@ -142,7 +136,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetState(tx))?; - Ok(rx.wait().map_err(|_| Error::NoResponse)?) + Ok(block_on(rx).map_err(|_| Error::NoResponse)?) } pub fn get_version_info(&self) -> Result<AppVersionInfo> { @@ -150,7 +144,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetVersionInfo(tx))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn reconnect(&self) -> Result<()> { @@ -164,14 +158,14 @@ impl DaemonInterface { self.send_command(DaemonCommand::GetWireguardKey(tx))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn verify_wireguard_key(&self) -> Result<bool> { let (tx, rx) = oneshot::channel(); self.send_command(DaemonCommand::VerifyWireguardKey(tx))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn set_account(&self, account_token: Option<String>) -> Result<()> { @@ -179,7 +173,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetAccount(tx, account_token))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn set_allow_lan(&self, allow_lan: bool) -> Result<()> { @@ -187,7 +181,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetAllowLan(tx, allow_lan))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn set_auto_connect(&self, auto_connect: bool) -> Result<()> { @@ -195,7 +189,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetAutoConnect(tx, auto_connect))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn set_wireguard_mtu(&self, wireguard_mtu: Option<u16>) -> Result<()> { @@ -203,7 +197,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::SetWireguardMtu(tx, wireguard_mtu))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } pub fn shutdown(&self) -> Result<()> { @@ -215,9 +209,8 @@ impl DaemonInterface { self.send_command(DaemonCommand::SubmitVoucher(tx, voucher))?; - rx.wait() + block_on(rx) .map_err(|_| Error::NoResponse)? - .wait() .map_err(Error::RpcError) } @@ -226,7 +219,7 @@ impl DaemonInterface { self.send_command(DaemonCommand::UpdateRelaySettings(tx, update))?; - rx.wait().map_err(|_| Error::NoResponse) + block_on(rx).map_err(|_| Error::NoResponse) } fn send_command(&self, command: DaemonCommand) -> Result<()> { diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs index 78cee57d1e..1abd4d093d 100644 --- a/mullvad-jni/src/lib.rs +++ b/mullvad-jni/src/lib.rs @@ -17,10 +17,13 @@ use jnix::{ }, FromJava, IntoJava, JnixEnv, }; -use mullvad_daemon::{exception_logging, logging, version, Daemon, DaemonCommandChannel}; +use mullvad_daemon::{ + exception_logging, logging, runtime::new_runtime_builder, version, Daemon, DaemonCommandChannel, +}; use mullvad_rpc::{rest::Error as RestError, StatusCode}; use mullvad_types::account::{AccountData, VoucherSubmission}; use std::{ + io, path::{Path, PathBuf}, ptr, sync::{mpsc, Arc, Once}, @@ -46,6 +49,9 @@ pub enum Error { #[error(display = "Failed to initialize the mullvad daemon")] InitializeDaemon(#[error(source)] mullvad_daemon::Error), + #[error(display = "Failed to spawn the tokio runtime")] + InitializeTokioRuntime(#[error(source)] io::Error), + #[error(display = "Failed to spawn the JNI event listener")] SpawnJniEventListener(#[error(source)] jni_event_listener::Error), } @@ -202,9 +208,13 @@ fn spawn_daemon( .map_err(Error::CreateGlobalReference)?; let (tx, rx) = mpsc::channel(); + let mut runtime = new_runtime_builder() + .build() + .map_err(Error::InitializeTokioRuntime)?; + thread::spawn(move || { let jvm = android_context.jvm.clone(); - let daemon = Daemon::start( + let daemon = runtime.block_on(Daemon::start( Some(resource_dir.clone()), resource_dir.clone(), resource_dir, @@ -212,12 +222,12 @@ fn spawn_daemon( listener, command_channel, android_context, - ); + )); match daemon { Ok(daemon) => { let _ = tx.send(Ok(())); - match daemon.run() { + match runtime.block_on(daemon.run()) { Ok(()) => log::info!("Mullvad daemon has stopped"), Err(error) => log::error!("{}", error.display_chain()), } diff --git a/mullvad-problem-report/Cargo.toml b/mullvad-problem-report/Cargo.toml index 2642fa6acb..68925c5f62 100644 --- a/mullvad-problem-report/Cargo.toml +++ b/mullvad-problem-report/Cargo.toml @@ -16,6 +16,7 @@ futures01 = { version = "0.1", package = "futures" } lazy_static = "1.0" regex = "1.0" uuid = { version = "0.7", features = ["v4"] } +tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] } mullvad-paths = { path = "../mullvad-paths" } mullvad-rpc = { path = "../mullvad-rpc" } diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs index 019dd82f62..4265d70268 100644 --- a/mullvad-problem-report/src/lib.rs +++ b/mullvad-problem-report/src/lib.rs @@ -67,6 +67,9 @@ pub enum Error { #[error(display = "Error during RPC call")] SendRpcError(#[error(source)] mullvad_rpc::rest::Error), + + #[error(display = "Unable to spawn Tokio runtime")] + CreateRuntime(#[error(source)] io::Error), } /// These are errors that can happen during problem report collection. @@ -263,8 +266,14 @@ pub fn send_problem_report( let metadata = ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect()); - let mut rpc_manager = - mullvad_rpc::MullvadRpcRuntime::new().map_err(Error::CreateRpcClientError)?; + let runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .map_err(Error::CreateRuntime)?; + + let mut rpc_manager = mullvad_rpc::MullvadRpcRuntime::new(runtime.handle().clone()) + .map_err(Error::CreateRpcClientError)?; let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle()); rpc_client diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml index e06985e34b..18fa0994e7 100644 --- a/mullvad-rpc/Cargo.toml +++ b/mullvad-rpc/Cargo.toml @@ -20,7 +20,7 @@ regex = "1" serde = "1" serde_json = "1.0" hyper-rustls = "0.20" -tokio = { version = "0.2", features = [ "time", "rt-threaded", "net", "io-std", "io-driver" ] } +tokio = { version = "0.2", features = [ "macros", "time", "rt-threaded", "net", "io-std", "io-driver" ] } tokio-rustls = "0.13" urlencoding = "1" webpki = { version = "0.21", features = [] } diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs index 27601d4b21..8213d736da 100644 --- a/mullvad-rpc/src/bin/relay_list.rs +++ b/mullvad-rpc/src/bin/relay_list.rs @@ -4,12 +4,16 @@ use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy}; use std::process; use talpid_types::ErrorExt; -fn main() { - let mut runtime = MullvadRpcRuntime::new().expect("Failed to load runtime"); +#[tokio::main] +async fn main() { + let mut runtime = + MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime"); - let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()).relay_list(); + let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()) + .relay_list() + .await; - let relay_list = match runtime.runtime().block_on(relay_list_request) { + let relay_list = match relay_list_request { Ok(relay_list) => relay_list, Err(RestError::TimeoutError(_)) => { eprintln!("Request timed out"); diff --git a/mullvad-rpc/src/event_loop.rs b/mullvad-rpc/src/event_loop.rs deleted file mode 100644 index ea93de2493..0000000000 --- a/mullvad-rpc/src/event_loop.rs +++ /dev/null @@ -1,14 +0,0 @@ -use tokio::runtime::{Builder, Runtime}; - -/// Creates a new tokio runtime to be exclusively used for HTTP requests. -// FIXME: Remove this once the daemon has migrated. -pub fn create_runtime() -> Result<Runtime, crate::Error> { - let runtime = Builder::new() - .threaded_scheduler() - .core_threads(2) - .enable_all() - .thread_name("mullvad-rpc-event-loop") - .build(); - - runtime.map_err(crate::Error::TokioRuntimeError) -} diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs index 796cb56553..5e2e6f80fa 100644 --- a/mullvad-rpc/src/lib.rs +++ b/mullvad-rpc/src/lib.rs @@ -16,7 +16,6 @@ use std::{ use talpid_types::net::wireguard; -pub mod event_loop; pub mod rest; mod cached_dns_resolver; @@ -44,29 +43,27 @@ const API_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(193, 138, 218, 78)); pub struct MullvadRpcRuntime { cached_dns_resolver: CachedDnsResolver, https_connector: HttpsConnectorWithSni, - runtime: tokio::runtime::Runtime, + handle: tokio::runtime::Handle, } #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Failed to construct a rest client")] RestError(#[error(source)] rest::Error), - #[error(display = "Failed to spawn a tokio runtime")] - TokioRuntimeError(#[error(source)] tokio::io::Error), } impl MullvadRpcRuntime { /// Create a new `MullvadRpcRuntime`. - pub fn new() -> Result<Self, Error> { + pub fn new(handle: tokio::runtime::Handle) -> Result<Self, Error> { Ok(MullvadRpcRuntime { cached_dns_resolver: CachedDnsResolver::new(API_HOST.to_owned(), None, API_IP), - runtime: event_loop::create_runtime()?, https_connector: HttpsConnectorWithSni::new(), + handle, }) } /// Create a new `MullvadRpcRuntime` using the specified cache directory. - pub fn with_cache_dir(cache_dir: &Path) -> Result<Self, Error> { + pub fn with_cache_dir(handle: tokio::runtime::Handle, cache_dir: &Path) -> Result<Self, Error> { let cache_file = cache_dir.join(API_IP_CACHE_FILENAME); let cached_dns_resolver = CachedDnsResolver::new(API_HOST.to_owned(), Some(cache_file), API_IP); @@ -75,8 +72,8 @@ impl MullvadRpcRuntime { Ok(MullvadRpcRuntime { cached_dns_resolver, - runtime: event_loop::create_runtime()?, https_connector, + handle, }) } @@ -85,9 +82,9 @@ impl MullvadRpcRuntime { let mut https_connector = self.https_connector.clone(); https_connector.set_sni_hostname(sni_hostname); - let service = rest::RequestService::new(https_connector, self.runtime.handle().clone()); + let service = rest::RequestService::new(https_connector, self.handle.clone()); let handle = service.handle(); - self.runtime.spawn(service.into_future()); + self.handle.spawn(service.into_future()); handle } @@ -106,17 +103,8 @@ impl MullvadRpcRuntime { self.new_request_service(None) } - pub fn runtime(&mut self) -> &mut tokio::runtime::Runtime { - &mut self.runtime - } -} - -impl Drop for MullvadRpcRuntime { - fn drop(&mut self) { - if let Ok(runtime) = event_loop::create_runtime() { - let old_runtime = std::mem::replace(&mut self.runtime, runtime); - old_runtime.shutdown_timeout(std::time::Duration::from_secs(1)); - } + pub fn handle(&mut self) -> &mut tokio::runtime::Handle { + &mut self.handle } } diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 205f350ea5..0e8994b1c3 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -176,12 +176,10 @@ pub struct RequestServiceHandle { impl RequestServiceHandle { /// Resets the corresponding RequestService, dropping all in-flight requests. - pub fn reset(&self) { + pub async fn reset(&self) { let mut tx = self.tx.clone(); - self.handle.block_on(async move { - let _ = tx.send(RequestCommand::Reset).await; - }); + let _ = tx.send(RequestCommand::Reset).await; } /// Submits a `RestRequest` for exectuion to the request service. @@ -216,13 +214,6 @@ impl RequestServiceHandle { pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) { let _ = self.handle.spawn(future); } - - pub fn block_on<T: Send + 'static>( - &self, - future: impl Future<Output = T> + Send + 'static, - ) -> T { - self.handle.block_on(future) - } } #[derive(Debug)] diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index f135e52b26..5222c5d26d 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -24,12 +24,10 @@ parking_lot = "0.9" regex = "1.1.0" shell-escape = "0.1" talpid-types = { path = "../talpid-types" } -tokio-core = "0.1" -tokio-executor = "0.1" uuid = { version = "0.7", features = ["v4"] } zeroize = "1" chrono = "0.4" -tokio02 = { package = "tokio", version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream"] } +tokio = { version = "0.2", features = [ "io-util", "process", "rt-core", "rt-threaded", "stream"] } rand = "0.7" @@ -42,7 +40,6 @@ prost = "0.6" [target.'cfg(unix)'.dependencies] nix = "0.17" -tokio-io = "0.1" [target.'cfg(target_os = "android")'.dependencies] diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs index bf992117bd..cc1fb1ed20 100644 --- a/talpid-core/src/future_retry.rs +++ b/talpid-core/src/future_retry.rs @@ -32,10 +32,10 @@ pub async fn retry_future_with_backoff< async fn sleep(mut delay: Duration) { while delay > MAX_SINGLE_DELAY { delay -= MAX_SINGLE_DELAY; - tokio02::time::delay_for(MAX_SINGLE_DELAY).await; + tokio::time::delay_for(MAX_SINGLE_DELAY).await; } - tokio02::time::delay_for(delay).await; + tokio::time::delay_for(delay).await; } /// Provides an exponential back-off timer to delay the next retry of a failed operation. diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs index 3863415cfb..fefe2556cf 100644 --- a/talpid-core/src/offline/android.rs +++ b/talpid-core/src/offline/android.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use jnix::{ jni::{ self, @@ -96,7 +96,7 @@ impl MonitorHandle { }) } - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { match self.get_is_connected() { Ok(is_connected) => !is_connected, Err(error) => { @@ -205,7 +205,7 @@ unsafe fn get_sender_from_address(address: jlong) -> Box<Weak<UnboundedSender<Tu Box::from_raw(address as *mut Weak<UnboundedSender<TunnelCommand>>) } -pub fn spawn_monitor( +pub async fn spawn_monitor( sender: Weak<UnboundedSender<TunnelCommand>>, android_context: AndroidContext, ) -> Result<MonitorHandle, Error> { diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs index 0b6526f0e5..5e4a4fa7e2 100644 --- a/talpid-core/src/offline/linux.rs +++ b/talpid-core/src/offline/linux.rs @@ -1,6 +1,8 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures::{StreamExt, TryStreamExt}; -use futures01::sync::mpsc::UnboundedSender; +use futures::{ + channel::{mpsc::UnboundedSender, oneshot}, + FutureExt, StreamExt, TryStreamExt, +}; use netlink_packet_route::{ constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP}, rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla}, @@ -15,8 +17,6 @@ use std::{collections::BTreeSet, io, sync::Weak}; pub type Result<T> = std::result::Result<T, Error>; -const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop"; - #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { @@ -47,12 +47,12 @@ pub enum Error { pub struct MonitorHandle { handle: rtnetlink::Handle, - runtime: tokio02::runtime::Runtime, + _stop_connection_tx: oneshot::Sender<()>, } impl MonitorHandle { - pub fn is_offline(&mut self) -> bool { - match self.runtime.block_on(check_offline_state(&self.handle)) { + pub async fn is_offline(&mut self) -> bool { + match check_offline_state(&self.handle).await { Ok(is_offline) => is_offline, Err(err) => { log::error!( @@ -65,41 +65,36 @@ impl MonitorHandle { } } -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { - let mut runtime = tokio02::runtime::Builder::new() - .threaded_scheduler() - .core_threads(1) - .enable_all() - .thread_name(EVENT_LOOP_THREAD_NAME) - .build() - .map_err(Error::EventLoopError)?; - - let (connection, handle, mut messages) = runtime.block_on(async move { - let (mut connection, handle, messages) = - rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; +pub async fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { + let (mut connection, handle, mut messages) = + rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; - let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY; - let addr = SocketAddr::new(0, mgroup_flags); + let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY; + let addr = SocketAddr::new(0, mgroup_flags); - connection - .socket_mut() - .bind(&addr) - .map_err(Error::BindError)?; + connection + .socket_mut() + .bind(&addr) + .map_err(Error::BindError)?; - Ok((connection, handle, messages)) - })?; + let (stop_connection_tx, stop_rx) = oneshot::channel(); - // Connection will be closed once the runtime is dropped - let _ = runtime.spawn(connection); - let mut is_offline = runtime.block_on(check_offline_state(&handle))?; + // Connection will be closed once the channel is dropped + tokio::spawn(async { + futures::select! { + _ = connection.fuse() => (), + _ = stop_rx.fuse() => (), + } + }); + let mut is_offline = check_offline_state(&handle).await?; let monitor_handle = MonitorHandle { handle: handle.clone(), - runtime, + _stop_connection_tx: stop_connection_tx, }; - let _ = monitor_handle.runtime.spawn(async move { + tokio::spawn(async move { while let Some(_new_message) = messages.next().await { match sender.upgrade() { Some(sender) => { diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index 602da6cda9..2569fa06c6 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use std::{ net::{Ipv4Addr, SocketAddr}, sync::{ @@ -44,7 +44,7 @@ pub struct MonitorHandle; impl MonitorHandle { /// Host is considered to be offline if the IPv4 internet is considered to be unreachable by the /// given reachability flags *or* there are no active physical interfaces. - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { let reachability = SCNetworkReachability::from(ipv4_internet()); let store = SCDynamicStoreBuilder::new("talpid-offline-check").build(); reachability @@ -54,7 +54,9 @@ impl MonitorHandle { } } -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { +pub async fn spawn_monitor( + sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<MonitorHandle, Error> { let (result_tx, result_rx) = mpsc::channel(); thread::spawn(move || { let mut reachability_ref = SCNetworkReachability::from(ipv4_internet()); diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs index 5cda6290a3..9a6ce1dae5 100644 --- a/talpid-core/src/offline/mod.rs +++ b/talpid-core/src/offline/mod.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use std::sync::Weak; #[cfg(target_os = "android")] use talpid_types::android::AndroidContext; @@ -25,18 +25,21 @@ pub use self::imp::Error; pub struct MonitorHandle(imp::MonitorHandle); impl MonitorHandle { - pub fn is_offline(&mut self) -> bool { - self.0.is_offline() + pub async fn is_offline(&mut self) -> bool { + self.0.is_offline().await } } -pub fn spawn_monitor( +pub async fn spawn_monitor( sender: Weak<UnboundedSender<TunnelCommand>>, #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<MonitorHandle, Error> { - Ok(MonitorHandle(imp::spawn_monitor( - sender, - #[cfg(target_os = "android")] - android_context, - )?)) + Ok(MonitorHandle( + imp::spawn_monitor( + sender, + #[cfg(target_os = "android")] + android_context, + ) + .await?, + )) } diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs index c7a86e4073..d9e5c7782d 100644 --- a/talpid-core/src/offline/windows.rs +++ b/talpid-core/src/offline/windows.rs @@ -1,5 +1,5 @@ use crate::{logging::windows::log_sink, tunnel_state_machine::TunnelCommand, winnet}; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use parking_lot::Mutex; use std::{ ffi::c_void, @@ -203,7 +203,7 @@ impl BroadcastListener { state.apply_change(StateChange::NetworkConnectivity(connectivity)); } - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { let state = self._system_state.lock(); state.is_offline_currently().unwrap_or(false) } @@ -264,7 +264,9 @@ impl SystemState { pub type MonitorHandle = BroadcastListener; -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { +pub async fn spawn_monitor( + sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<MonitorHandle, Error> { BroadcastListener::start(sender) } diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs index ba5ffb66a6..566e1c027b 100644 --- a/talpid-core/src/routing/linux.rs +++ b/talpid-core/src/routing/linux.rs @@ -124,7 +124,7 @@ impl RouteManagerImpl { .bind(&addr) .map_err(Error::BindError)?; - tokio02::spawn(connection); + tokio::spawn(connection); let iface_map = Self::initialize_link_map(&handle).await?; let split_table_id = Self::initialize_exclusions_table().await?; @@ -903,7 +903,7 @@ mod test { /// Tests if dropping inside a tokio runtime panics #[test] fn test_drop_in_executor() { - let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime"); + let mut runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime"); runtime.block_on(async { let manager = RouteManagerImpl::new(HashSet::new()) .await @@ -915,7 +915,7 @@ mod test { /// Tests if dropping outside a runtime panics #[test] fn test_drop() { - let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime"); + let mut runtime = tokio::runtime::Runtime::new().expect("Failed to initialize runtime"); let manager = runtime.block_on(async { RouteManagerImpl::new(HashSet::new()) .await diff --git a/talpid-core/src/routing/macos.rs b/talpid-core/src/routing/macos.rs index 6a4512d144..6304dcc24f 100644 --- a/talpid-core/src/routing/macos.rs +++ b/talpid-core/src/routing/macos.rs @@ -12,7 +12,7 @@ use std::{ net::IpAddr, process::{ExitStatus, Stdio}, }; -use tokio02::{io::AsyncBufReadExt, process::Command}; +use tokio::{io::AsyncBufReadExt, process::Command}; pub type Result<T> = std::result::Result<T, Error>; @@ -310,7 +310,7 @@ async fn listen_for_default_route_changes() -> Result<impl Stream<Item = std::io let mut process = cmd.spawn().map_err(Error::FailedToMonitorRoutes)?; - let reader = tokio02::io::BufReader::new(process.stdout.take().unwrap()); + let reader = tokio::io::BufReader::new(process.stdout.take().unwrap()); let lines = reader.lines(); // route -n monitor will produce netlink messages in the following format diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs index 38896a4392..a59ee6ed27 100644 --- a/talpid-core/src/routing/unix.rs +++ b/talpid-core/src/routing/unix.rs @@ -7,7 +7,7 @@ use futures::channel::{ mpsc::{self, UnboundedSender}, oneshot, }; -use std::collections::HashSet; +use std::{collections::HashSet, io}; use talpid_types::ErrorExt; #[cfg(target_os = "linux")] @@ -39,6 +39,9 @@ pub enum Error { /// Failed to spawn route manager future #[error(display = "Failed to spawn route manager on the provided executor")] FailedToSpawnManager, + /// Failed to spawn route manager runtime + #[error(display = "Failed to spawn route manager runtime")] + FailedToSpawnRuntime(#[error(source)] io::Error), /// Attempt to use route manager that has been dropped #[error(display = "Cannot send message to route manager since it is down")] RouteManagerDown, @@ -69,7 +72,7 @@ pub enum RouteManagerCommand { /// the route will be adjusted dynamically when the default route changes. pub struct RouteManager { manage_tx: Option<UnboundedSender<RouteManagerCommand>>, - runtime: tokio02::runtime::Runtime, + runtime: tokio::runtime::Runtime, } impl RouteManager { @@ -78,7 +81,12 @@ impl RouteManager { /// routes. pub fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self, Error> { let (manage_tx, manage_rx) = mpsc::unbounded(); - let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to spawn runtime"); + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .core_threads(1) + .max_threads(1) + .enable_all() + .build()?; let manager = runtime.block_on(imp::RouteManagerImpl::new(required_routes))?; runtime.handle().spawn(manager.run(manage_rx)); diff --git a/talpid-core/src/tunnel/openvpn.rs b/talpid-core/src/tunnel/openvpn.rs index 5dc14ba8d9..02ff92f4e7 100644 --- a/talpid-core/src/tunnel/openvpn.rs +++ b/talpid-core/src/tunnel/openvpn.rs @@ -21,7 +21,7 @@ use std::{ time::Duration, }; use talpid_types::net::openvpn; -use tokio02::task; +use tokio::task; #[cfg(target_os = "linux")] use which; @@ -132,7 +132,7 @@ pub struct OpenVpnMonitor<C: OpenVpnBuilder = OpenVpnCommand> { /// Keep the 'TempFile' for the proxy user-pass file in the struct, so it's removed on drop. _proxy_auth_file: Option<mktemp::TempFile>, - runtime: tokio02::runtime::Runtime, + runtime: tokio::runtime::Runtime, event_server_abort_tx: triggered::Trigger, server_join_handle: Option<task::JoinHandle<std::result::Result<(), event_server::Error>>>, } @@ -239,9 +239,10 @@ impl<C: OpenVpnBuilder + 'static> OpenVpnMonitor<C> { let (event_server_abort_tx, event_server_abort_rx) = triggered::trigger(); - let mut runtime = tokio02::runtime::Builder::new() + let mut runtime = tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(1) + .max_threads(1) .enable_all() .build() .map_err(Error::RuntimeError)?; @@ -619,7 +620,7 @@ mod event_server { pin::Pin, task::{Context, Poll}, }; - use tokio02::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite}; use tonic::{ self, transport::{server::Connected, Server}, diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index b0222ade67..4d96a2fe3c 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -23,25 +23,23 @@ use crate::{ tunnel::tun_provider::TunProvider, }; -use futures01::{ - sync::{mpsc, oneshot}, - Async, Future, Poll, Stream, +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, }; +use futures01::{sync::mpsc as old_mpsc, Async, Poll, Stream}; use std::{ collections::HashSet, io, path::{Path, PathBuf}, sync::{mpsc as sync_mpsc, Arc}, - thread, }; #[cfg(target_os = "android")] -use talpid_types::android::AndroidContext; +use talpid_types::{android::AndroidContext, ErrorExt}; use talpid_types::{ net::TunnelParameters, tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition}, - ErrorExt, }; -use tokio_core::reactor::Core; /// Errors that can happen when setting up or using the state machine. #[derive(err_derive::Error, Debug)] @@ -77,7 +75,7 @@ pub enum Error { } /// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. -pub fn spawn( +pub async fn spawn( allow_lan: bool, block_when_disconnected: bool, tunnel_parameters_generator: impl TunnelParametersGenerator, @@ -88,15 +86,16 @@ pub fn spawn( shutdown_tx: oneshot::Sender<()>, #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> { - let (command_tx, command_rx) = mpsc::unbounded(); + let (command_tx, mut command_rx) = mpsc::unbounded(); let command_tx = Arc::new(command_tx); let mut offline_monitor = offline::spawn_monitor( Arc::downgrade(&command_tx), #[cfg(target_os = "android")] android_context.clone(), ) + .await .map_err(Error::OfflineMonitorError)?; - let is_offline = offline_monitor.is_offline(); + let is_offline = offline_monitor.is_offline().await; let tun_provider = TunProvider::new( #[cfg(target_os = "android")] @@ -105,9 +104,19 @@ pub fn spawn( allow_lan, ); + // Hide internal 0.1 futures from the client + let (command_adapter_tx, command_adapter_rx) = old_mpsc::unbounded(); + tokio::spawn(async move { + while let Some(command) = command_rx.next().await { + if command_adapter_tx.unbounded_send(command).is_err() { + log::error!("Failed to forward daemon command"); + } + } + }); + let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); - thread::spawn(move || { - match create_event_loop( + std::thread::spawn(move || { + let state_machine = TunnelStateMachine::new( allow_lan, block_when_disconnected, is_offline, @@ -116,28 +125,33 @@ pub fn spawn( log_dir, resource_dir, cache_dir, - command_rx, - state_change_listener, - shutdown_tx, - ) { - Ok((mut reactor, event_loop)) => { - startup_result_tx.send(Ok(())).expect( - "Tunnel state machine won't be started because the owner thread crashed", - ); - - if let Err(e) = reactor.run(event_loop) { - log::error!( - "{}", - e.display_chain_with_msg("Tunnel state machine exited with an error") - ); - } + command_adapter_rx, + ); + let state_machine = match state_machine { + Ok(state_machine) => { + startup_result_tx.send(Ok(())).unwrap(); + state_machine + } + Err(error) => { + startup_result_tx.send(Err(error)).unwrap(); + return; } - Err(startup_error) => { - startup_result_tx - .send(Err(startup_error)) - .expect("Failed to send startup error"); + }; + + let mut iter = state_machine.wait(); + while let Some(Ok(change_event)) = iter.next() { + if let Err(error) = state_change_listener + .send(change_event) + .map_err(|_| Error::SendStateChange) + { + log::error!("{}", error); + break; } } + if shutdown_tx.send(()).is_err() { + log::error!("Can't send shutdown completion to daemon"); + } + std::mem::drop(offline_monitor); }); @@ -147,48 +161,6 @@ pub fn spawn( Ok(command_tx) } -fn create_event_loop( - allow_lan: bool, - block_when_disconnected: bool, - is_offline: bool, - tunnel_parameters_generator: impl TunnelParametersGenerator, - tun_provider: TunProvider, - log_dir: Option<PathBuf>, - resource_dir: PathBuf, - cache_dir: impl AsRef<Path>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, - state_change_listener: impl Sender<TunnelStateTransition>, - shutdown_tx: oneshot::Sender<()>, -) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> { - let reactor = Core::new().map_err(Error::ReactorError)?; - let state_machine = TunnelStateMachine::new( - allow_lan, - block_when_disconnected, - is_offline, - tunnel_parameters_generator, - tun_provider, - log_dir, - resource_dir, - cache_dir, - commands, - )?; - - let future = state_machine - .for_each(move |state_change_event| { - state_change_listener - .send(state_change_event) - .map_err(|_| Error::SendStateChange) - }) - .then(move |_| { - if shutdown_tx.send(()).is_err() { - log::error!("Can't send shutdown completion to daemon"); - } - Ok(()) - }); - - Ok((reactor, future)) -} - /// Representation of external commands for the tunnel state machine. pub enum TunnelCommand { /// Enable or disable LAN access in the firewall. @@ -213,7 +185,7 @@ pub enum TunnelCommand { /// by the stream. struct TunnelStateMachine { current_state: Option<TunnelStateWrapper>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, + commands: old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: SharedTunnelStateValues, } @@ -227,7 +199,7 @@ impl TunnelStateMachine { log_dir: Option<PathBuf>, resource_dir: PathBuf, cache_dir: impl AsRef<Path>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, + commands: old_mpsc::UnboundedReceiver<TunnelCommand>, ) -> Result<Self, Error> { let args = if block_when_disconnected { FirewallArguments { @@ -432,7 +404,7 @@ trait TunnelState: Into<TunnelStateWrapper> + Sized { /// [`EventConsequence`]: enum.EventConsequence.html fn handle_event( self, - commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: &mut SharedTunnelStateValues, ) -> EventConsequence<Self>; } @@ -456,7 +428,7 @@ macro_rules! state_wrapper { impl $wrapper_name { fn handle_event( self, - commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: &mut SharedTunnelStateValues, ) -> TunnelStateMachineAction { match self { diff --git a/talpid-openvpn-plugin/src/processing.rs b/talpid-openvpn-plugin/src/processing.rs index f03442e2fb..291ad41092 100644 --- a/talpid-openvpn-plugin/src/processing.rs +++ b/talpid-openvpn-plugin/src/processing.rs @@ -28,6 +28,7 @@ impl EventProcessor { let mut runtime = runtime::Builder::new() .basic_scheduler() .core_threads(1) + .max_threads(1) .enable_all() .build() .map_err(Error::CreateRuntime)?; |
