diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-03-01 15:31:52 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-03-01 15:31:52 +0100 |
| commit | 8e601e3145c558ab80a5e2f4310977f26e402183 (patch) | |
| tree | eeed77609088bbc08330066192428bf5bfd83a6c | |
| parent | 710b361a8c209dc670f2d459eacd2d6ad3661156 (diff) | |
| parent | 63b6585cd02693dedc4ce41598407131cbb9d93f (diff) | |
| download | mullvadvpn-8e601e3145c558ab80a5e2f4310977f26e402183.tar.xz mullvadvpn-8e601e3145c558ab80a5e2f4310977f26e402183.zip | |
Merge branch 'proxy-api-reqs'
27 files changed, 1476 insertions, 625 deletions
diff --git a/.github/workflows/android-app.yml b/.github/workflows/android-app.yml index d340431ddc..3bc2192146 100644 --- a/.github/workflows/android-app.yml +++ b/.github/workflows/android-app.yml @@ -95,7 +95,6 @@ jobs: source env.sh $TARGET cargo build --target $TARGET --verbose --package mullvad-jni cargo run --bin relay_list > dist-assets/relays.json - cargo run --bin address_cache > dist-assets/api-ip-address.txt $NDK_TOOLCHAIN_STRIP_TOOL --strip-debug --strip-unneeded -o "$STRIPPED_LIB_PATH" "$UNSTRIPPED_LIB_PATH" - name: Configure Android SDK diff --git a/.gitignore b/.gitignore index e41edf6b7c..b75c65b33e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,6 @@ .DS_Store *.log /dist-assets/relays.json -/dist-assets/api-ip-address.txt /dist-assets/mullvad /dist-assets/mullvad.exe /dist-assets/mullvad-daemon diff --git a/CHANGELOG.md b/CHANGELOG.md index 23cff810b4..1fe3d7173e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Line wrap the file at 100 chars. Th ## [Unreleased] ### Added +- Obfuscate traffic to the Mullvad API using bridges if it cannot be reached directly. + #### Windows - Detect mounting and dismounting of volumes, such as VeraCrypt volumes or USB drives, and exclude paths from the tunnel correctly when these occur. This sometimes only works diff --git a/Cargo.lock b/Cargo.lock index 510a331371..c6d4a3c483 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if 1.0.0", + "cipher", + "cpufeatures", + "ctr", + "opaque-debug", +] + +[[package]] +name = "aes-gcm" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + +[[package]] name = "aho-corasick" version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -130,6 +166,12 @@ dependencies = [ ] [[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + +[[package]] name = "base64" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -142,12 +184,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] +name = "block-buffer" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1d36a02058e76b040de25a4464ba1c80935655595b661505c8b39b664828b95" +dependencies = [ + "generic-array", +] + +[[package]] name = "bumpalo" version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" [[package]] +name = "byte_string" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11aade7a05aa8c3a351cedc44c3fc45806430543382fcc4743a9b757a2a0b4ed" + +[[package]] name = "byteorder" version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -184,6 +241,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "chacha20" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b72a433d0cf2aef113ba70f62634c56fddb0f244e6377185c56a7cadbd8f91" +dependencies = [ + "cfg-if 1.0.0", + "cipher", + "cpufeatures", + "zeroize", +] + +[[package]] +name = "chacha20poly1305" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b84ed6d1d5f7aa9bdde921a5090e0ca4d934d250ea3b402a5fab3a994e28a2a" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + +[[package]] name = "chrono" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -198,6 +280,15 @@ dependencies = [ ] [[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + +[[package]] name = "clap" version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -262,6 +353,12 @@ dependencies = [ ] [[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + +[[package]] name = "convert_case" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -284,6 +381,45 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" [[package]] +name = "cpufeatures" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-bigint" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" +dependencies = [ + "generic-array", + "rand_core 0.6.3", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-common" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0" +dependencies = [ + "generic-array", +] + +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher", +] + +[[package]] name = "ctrlc" version = "3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -300,7 +436,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" dependencies = [ "byteorder", - "digest", + "digest 0.9.0", "rand_core 0.5.1", "subtle", "zeroize", @@ -359,6 +495,15 @@ dependencies = [ ] [[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid", +] + +[[package]] name = "derive-try-from-primitive" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -417,6 +562,18 @@ dependencies = [ ] [[package]] +name = "digest" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b" +dependencies = [ + "block-buffer", + "crypto-common", + "generic-array", + "subtle", +] + +[[package]] name = "dirs-next" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -450,12 +607,48 @@ dependencies = [ ] [[package]] +name = "ecdsa" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0d69ae62e0ce582d56380743515fefaf1a8c70cec685d9677636d7e30ae9dc9" +dependencies = [ + "der", + "elliptic-curve", + "signature", +] + +[[package]] +name = "ed25519" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816" +dependencies = [ + "signature", +] + +[[package]] name = "either" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] +name = "elliptic-curve" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b477563c2bfed38a3b7a60964c49e058b2510ad3f12ba3483fd8f62c2306d6" +dependencies = [ + "base16ct", + "crypto-bigint", + "der", + "generic-array", + "rand_core 0.6.3", + "sec1", + "subtle", + "zeroize", +] + +[[package]] name = "endian-type" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -766,6 +959,16 @@ dependencies = [ ] [[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval", +] + +[[package]] name = "gimli" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -821,6 +1024,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] +name = "hkdf" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "158bc31e00a68e380286904cc598715f861f2b0ccf7aa6fe20c6d0c49ca5d0f6" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddca131f3e7f2ce2df364b57949a9d47915cfbd35e46cfee355ccebbf794d6a2" +dependencies = [ + "digest 0.10.1", +] + +[[package]] name = "hostname" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -849,7 +1070,7 @@ checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 0.4.8", ] [[package]] @@ -896,7 +1117,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa", + "itoa 0.4.8", "pin-project-lite", "socket2", "tokio", @@ -1055,6 +1276,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] name = "jni" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1199,6 +1426,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] +name = "md-5" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6a38fc55c8bbc10058782919516f88826e70320db6d206aebc49611d24216ae" +dependencies = [ + "digest 0.10.1", + "md5-asm", +] + +[[package]] +name = "md5-asm" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b9a6f25ec11ea27e22d7fc8beafda909da44ece95f63e94f1eeb23d19bb5c7" +dependencies = [ + "cc", +] + +[[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1477,6 +1723,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", + "shadowsocks", "talpid-types", "tokio", "tokio-rustls", @@ -1752,6 +1999,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + +[[package]] name = "openssl-probe" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1788,6 +2041,28 @@ dependencies = [ ] [[package]] +name = "p256" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19736d80675fbe9fe33426268150b951a3fb8f5cfca2a23a17c85ef3adb24e3b" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sec1", +] + +[[package]] +name = "p384" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "755d8266e41f57bd8562ed9b6e93cdcf73ead050e1e8c3a27ea3871b6643a20c" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sec1", +] + +[[package]] name = "parity-tokio-ipc" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1977,6 +2252,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" [[package]] +name = "poly1305" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + +[[package]] name = "ppv-lite86" version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2277,6 +2575,24 @@ dependencies = [ ] [[package]] +name = "ring-compat" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80f9cf4a178de62d388e6502dae2101b37c1becf65227bf1210e6cf12dc633a3" +dependencies = [ + "aead", + "digest 0.9.0", + "ecdsa", + "ed25519", + "generic-array", + "opaque-debug", + "p256", + "p384", + "ring", + "zeroize", +] + +[[package]] name = "rs-release" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2393,6 +2709,18 @@ dependencies = [ ] [[package]] +name = "sec1" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08da66b8b0965a5555b6bd6639e68ccba85e1e2506f5fbb089e93f8a04e1a2d1" +dependencies = [ + "der", + "generic-array", + "subtle", + "zeroize", +] + +[[package]] name = "security-framework" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2434,6 +2762,16 @@ dependencies = [ ] [[package]] +name = "sendfd" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa25200c6de90f8da82d63f8806bd2ea1261018620dd4881626d6b146e13bd7" +dependencies = [ + "libc", + "tokio", +] + +[[package]] name = "serde" version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2471,12 +2809,93 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ - "itoa", + "itoa 0.4.8", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa 1.0.1", "ryu", "serde", ] [[package]] +name = "sha1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04cc229fb94bcb689ffc39bd4ded842f6ff76885efede7c6d1ffb62582878bea" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.1", + "sha1-asm", +] + +[[package]] +name = "sha1-asm" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "563d4f7100bc3fce234e5f37bbf63dc2752558964505ba6ac3f7204bdc59eaac" +dependencies = [ + "cc", +] + +[[package]] +name = "shadowsocks" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "356770455d9fb911a6b559ceedaa7f2d34e47d2d8ae606041dffba390abb7522" +dependencies = [ + "async-trait", + "base64", + "byte_string", + "bytes", + "cfg-if 1.0.0", + "futures", + "libc", + "log", + "nix 0.23.1", + "once_cell", + "pin-project", + "sendfd", + "serde", + "serde_json", + "serde_urlencoded", + "shadowsocks-crypto", + "socket2", + "thiserror", + "tokio", + "tokio-tfo", + "url", + "winapi 0.3.9", +] + +[[package]] +name = "shadowsocks-crypto" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748be76f2786bcf817d86c08c6cc2245e436e3ae0b064b581e3d4fc81700360c" +dependencies = [ + "aes", + "aes-gcm", + "cfg-if 1.0.0", + "chacha20", + "chacha20poly1305", + "hkdf", + "md-5", + "rand 0.8.4", + "ring-compat", + "sha1", +] + +[[package]] name = "shared_child" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2502,6 +2921,15 @@ dependencies = [ ] [[package]] +name = "signature" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02658e48d89f2bec991f9a78e69cfa4c316f8d6a6c4ec12fae1aeb263d486788" +dependencies = [ + "rand_core 0.6.3", +] + +[[package]] name = "simple-signal" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2869,6 +3297,7 @@ dependencies = [ "mio 0.7.13", "num_cpus", "once_cell", + "parking_lot 0.11.2", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -2919,6 +3348,23 @@ dependencies = [ ] [[package]] +name = "tokio-tfo" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4279aec5ded232170bf39130dd0e0deaed2c9f31cd3b5db1f2021056bcf5f94a" +dependencies = [ + "cfg-if 1.0.0", + "futures", + "libc", + "log", + "once_cell", + "pin-project", + "socket2", + "tokio", + "winapi 0.3.9", +] + +[[package]] name = "tokio-util" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3246,6 +3692,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" [[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + +[[package]] name = "unreachable" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/android/app/build.gradle.kts b/android/app/build.gradle.kts index 7fa36a9986..dd7d97d1ec 100644 --- a/android/app/build.gradle.kts +++ b/android/app/build.gradle.kts @@ -128,7 +128,6 @@ tasks.withType<KotlinCompile>().all { tasks.register("copyExtraAssets", Copy::class) { from("$repoRootPath/dist-assets") include("relays.json") - include("api-ip-address.txt") into(extraAssetsDirectory) } diff --git a/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt b/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt index 23b127addf..8b900038fc 100644 --- a/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt +++ b/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt @@ -11,7 +11,6 @@ import kotlinx.coroutines.channels.actor import kotlinx.coroutines.channels.sendBlocking import net.mullvad.mullvadvpn.util.Intermittent -private const val API_IP_ADDRESS_FILE = "api-ip-address.txt" private const val RELAYS_FILE = "relays.json" class DaemonInstance(val vpnService: MullvadVpnService) { @@ -88,7 +87,6 @@ class DaemonInstance(val vpnService: MullvadVpnService) { lastUpdatedTime() > File(vpnService.filesDir, RELAYS_FILE).lastModified() FileResourceExtractor(vpnService).apply { - extract(API_IP_ADDRESS_FILE, false) extract(RELAYS_FILE, shouldOverwriteRelayList) } } diff --git a/build-apk.sh b/build-apk.sh index c5a1fea9d1..8cc4e6ce76 100755 --- a/build-apk.sh +++ b/build-apk.sh @@ -128,8 +128,6 @@ done echo "Updating relays.json..." cargo run --bin relay_list $CARGO_ARGS > dist-assets/relays.json -echo "Updating api-ip-address..." -cargo run --bin address_cache $CARGO_ARGS > dist-assets/api-ip-address.txt cd "$SCRIPT_DIR/android" $GRADLE_CMD --console plain "$GRADLE_TASK" @@ -361,8 +361,6 @@ fi log_info "Updating relays.json..." cargo run --bin relay_list "${CARGO_ARGS[@]}" > dist-assets/relays.json -log_info "Updating api-ip-address..." -cargo run --bin address_cache "${CARGO_ARGS[@]}" > dist-assets/api-ip-address.txt log_header "Installing JavaScript dependencies" diff --git a/gui/tasks/distribution.js b/gui/tasks/distribution.js index f0eb41de67..dcdde80841 100644 --- a/gui/tasks/distribution.js +++ b/gui/tasks/distribution.js @@ -20,7 +20,6 @@ const config = { extraResources: [ { from: distAssets('ca.crt'), to: '.' }, { from: distAssets('relays.json'), to: '.' }, - { from: distAssets('api-ip-address.txt'), to: '.' }, { from: root('CHANGELOG.md'), to: '.' }, ], diff --git a/mullvad-daemon/src/api.rs b/mullvad-daemon/src/api.rs new file mode 100644 index 0000000000..2ed689b418 --- /dev/null +++ b/mullvad-daemon/src/api.rs @@ -0,0 +1,54 @@ +use crate::DaemonEventSender; +use futures::{channel::oneshot, stream, Stream, StreamExt}; +use mullvad_rpc::proxy::ApiConnectionMode; +use talpid_core::mpsc::Sender; +use talpid_types::ErrorExt; + +pub(crate) struct ApiConnectionModeRequest { + pub response_tx: oneshot::Sender<ApiConnectionMode>, + pub retry_attempt: u32, +} + +/// Returns a stream that returns the next API bridge to try. +/// `initial_config` refers to the first config returned by the stream. The daemon is not notified +/// of this. +pub(crate) fn create_api_config_provider( + daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, + initial_config: ApiConnectionMode, +) -> impl Stream<Item = ApiConnectionMode> + Unpin { + struct Context { + attempt: u32, + daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, + } + + let ctx = Context { + attempt: 1, + daemon_sender, + }; + + Box::pin( + stream::once(async move { initial_config }).chain(stream::unfold( + ctx, + |mut ctx| async move { + ctx.attempt = ctx.attempt.wrapping_add(1); + let (response_tx, response_rx) = oneshot::channel(); + + let _ = ctx.daemon_sender.send(ApiConnectionModeRequest { + response_tx, + retry_attempt: ctx.attempt, + }); + + let new_config = response_rx.await.unwrap_or_else(|error| { + log::error!( + "{}", + error.display_chain_with_msg("Failed to receive API proxy config") + ); + // Fall back on unbridged connection + ApiConnectionMode::Direct + }); + + Some((new_config, ctx)) + }, + )), + ) +} diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index a62979b63f..42d988f41f 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -6,6 +6,7 @@ extern crate serde; mod account; pub mod account_history; +mod api; pub mod exception_logging; #[cfg(target_os = "macos")] pub mod exclusion_gid; @@ -29,11 +30,14 @@ use futures::{ future::{abortable, AbortHandle, Future}, StreamExt, }; -use mullvad_rpc::availability::ApiAvailabilityHandle; +use mullvad_rpc::{ + availability::ApiAvailabilityHandle, + proxy::{ApiConnectionMode, ProxyConfig}, +}; use mullvad_types::{ account::{AccountData, AccountToken, VoucherSubmission}, endpoint::MullvadEndpoint, - location::GeoIpLocation, + location::{Coordinates, GeoIpLocation}, relay_constraints::{ BridgeSettings, BridgeState, Constraint, InternalBridgeConstraints, RelaySettings, RelaySettingsUpdate, @@ -54,7 +58,7 @@ use std::{collections::HashSet, ffi::OsString}; use std::{ marker::PhantomData, mem, - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, pin::Pin, sync::{mpsc as sync_mpsc, Arc, Weak}, @@ -70,8 +74,8 @@ use talpid_core::{ use talpid_types::android::AndroidContext; use talpid_types::{ net::{ - openvpn, AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, - TunnelType, + openvpn::{self, ProxySettings}, + AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, TunnelType, }, tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition}, ErrorExt, @@ -327,6 +331,8 @@ pub(crate) enum InternalDaemonEvent { NewAccountEvent(AccountToken, oneshot::Sender<Result<String, Error>>), /// The background job fetching new `AppVersionInfo`s got a new info object. NewAppVersionInfo(AppVersionInfo), + /// Request from REST client to use a different API endpoint. + GenerateApiConnectionMode(api::ApiConnectionModeRequest), /// The split tunnel paths or state were updated. #[cfg(target_os = "windows")] ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>), @@ -356,6 +362,12 @@ impl From<AppVersionInfo> for InternalDaemonEvent { } } +impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent { + fn from(request: api::ApiConnectionModeRequest) -> Self { + InternalDaemonEvent::GenerateApiConnectionMode(request) + } +} + #[derive(Clone, Debug, Eq, PartialEq)] enum DaemonExecutionState { Running, @@ -546,6 +558,7 @@ pub struct Daemon<L: EventListener> { app_version_info: Option<AppVersionInfo>, shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>, tunnel_state_machine_handle: tunnel_state_machine::JoinHandle, + cache_dir: PathBuf, #[cfg(target_os = "windows")] volume_update_tx: mpsc::UnboundedSender<()>, } @@ -569,6 +582,8 @@ where exclusion_gid::set_exclusion_gid().map_err(Error::GroupIdError)? }; + mullvad_rpc::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await; + let runtime = tokio::runtime::Handle::current(); let (internal_event_tx, internal_event_rx) = command_channel.destructure(); @@ -606,8 +621,7 @@ where vec![] }; - let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( - Some(&resource_dir), + let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( &cache_dir, true, #[cfg(target_os = "android")] @@ -620,7 +634,7 @@ where api_availability.suspend(); let initial_api_endpoint = - Self::get_allowed_endpoint(rpc_runtime.address_cache.peek_address()); + Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address().await); let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); #[cfg(target_os = "windows")] @@ -650,25 +664,37 @@ where .await .map_err(Error::TunnelError)?; - let address_change_runtime = runtime.clone(); - let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx); - rpc_runtime.set_address_change_listener(move |address| { - let (result_tx, result_rx) = oneshot::channel(); - let tx = tunnel_cmd_weak_tx.clone(); - address_change_runtime.block_on(async move { - if let Some(tx) = tx.upgrade() { - let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint( - Self::get_allowed_endpoint(address), + let api_endpoint_tunnel_tx = Arc::downgrade(&tunnel_command_tx); + let api_endpoint_handler = move |address: SocketAddr| { + let tunnel_tx = api_endpoint_tunnel_tx.clone(); + async move { + let (result_tx, result_rx) = oneshot::channel(); + if let Some(tunnel_tx) = tunnel_tx.upgrade() { + let _ = tunnel_tx.unbounded_send(TunnelCommand::AllowEndpoint( + Self::get_allowed_endpoint(address.clone()), result_tx, )); - result_rx.await.map_err(|_| ()) + if result_rx.await.is_ok() { + log::debug!("API endpoint: {}", address); + true + } else { + log::error!("Failed to update allowed endpoint"); + false + } } else { - Err(()) + log::error!("Tunnel state machine is down"); + false } - }) - }); + } + }; - let rpc_handle = rpc_runtime.mullvad_rest_handle(); + let proxy_provider = api::create_api_config_provider( + internal_event_tx.to_specialized_sender(), + ApiConnectionMode::Direct, + ); + let rpc_handle = rpc_runtime + .mullvad_rest_handle(proxy_provider, api_endpoint_handler) + .await; Self::forward_offline_state(api_availability.clone(), offline_state_rx).await; @@ -741,6 +767,7 @@ where app_version_info, shutdown_tasks: vec![], tunnel_state_machine_handle, + cache_dir, #[cfg(target_os = "windows")] volume_update_tx, }; @@ -903,6 +930,9 @@ where NewAppVersionInfo(app_version_info) => { self.handle_new_app_version_info(app_version_info) } + GenerateApiConnectionMode(request) => { + self.handle_generate_api_connection_mode(request).await + } #[cfg(windows)] ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await, } @@ -1073,7 +1103,7 @@ where BridgeState::On => { let (bridge_settings, bridge_relay) = self .relay_selector - .get_proxy_settings(&bridge_constraints, location) + .get_proxy_settings(&bridge_constraints, Some(location)) .ok_or(Error::NoBridgeAvailable)?; self.last_generated_bridge_relay = Some(bridge_relay); Some(bridge_settings) @@ -1082,7 +1112,7 @@ where if let Some((bridge_settings, bridge_relay)) = self.relay_selector.get_auto_proxy_settings( &bridge_constraints, - location, + Some(location), retry_attempt, ) { @@ -1349,6 +1379,78 @@ where self.event_listener.notify_app_version(app_version_info); } + /// Returns the next API connection mode to use for reaching the API. + /// + /// When `mullvad-rpc` fails to contact the API, it requests a new connection mode + /// from this function, which will be used for future requests. The API can be + /// connected to either directly (i.e., [`ApiConnectionMode::Direct`]) or from + /// a bridge ([`ApiConnectionMode::Proxied`]). + /// + /// * Every 3rd attempt returns [`ApiConnectionMode::Direct`] (i.e., no bridge). + /// * For any other attempt, this function returns a configuration for the bridge that is + /// closest to the selected relay location[^note] and matches all bridge constraints. + /// * When no matching bridge is found, e.g. if the selected hosting providers don't match any + /// bridge, [`ApiConnectionMode::Direct`] is returned. + /// + /// [^note]: The "selected relay location" is the location of the last relay that + /// the daemon connected to, or, if no relay was connected to, the "midpoint" of + /// all relays that match the selected relay location constraint. + async fn handle_generate_api_connection_mode( + &mut self, + request: api::ApiConnectionModeRequest, + ) { + let location = self + .last_generated_entry_relay + .as_ref() + .or(self.last_generated_relay.as_ref()) + .and_then(|relay| relay.location.as_ref().map(Coordinates::from)) + .or_else(|| { + if let RelaySettings::Normal(settings) = self.settings.get_relay_settings() { + self.relay_selector.get_relay_midpoint(&settings) + } else { + None + } + }); + let bridge = if request.retry_attempt % 3 > 0 { + let constraints = match &self.settings.bridge_settings { + BridgeSettings::Normal(settings) => InternalBridgeConstraints { + location: settings.location.clone(), + providers: settings.providers.clone(), + transport_protocol: Constraint::Only(TransportProtocol::Tcp), + }, + _ => InternalBridgeConstraints { + location: Constraint::Any, + providers: Constraint::Any, + transport_protocol: Constraint::Only(TransportProtocol::Tcp), + }, + }; + self.relay_selector + .get_proxy_settings(&constraints, location) + } else { + None + }; + let config = match bridge { + Some((settings, _relay)) => match settings { + ProxySettings::Shadowsocks(ss_settings) => { + ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings)) + } + _ => { + log::error!("Received unexpected proxy settings type"); + ApiConnectionMode::Direct + } + }, + None => ApiConnectionMode::Direct, + }; + + if let Err(error) = config.save(&self.cache_dir).await { + log::debug!( + "{}", + error.display_chain_with_msg("Failed to save API endpoint") + ); + } + let _ = request.response_tx.send(config); + } + #[cfg(windows)] async fn handle_new_excluded_paths( &mut self, @@ -1407,7 +1509,7 @@ where match &self.tunnel_state { Disconnected => { - let location = self.get_geo_location(); + let location = self.get_geo_location().await; tokio::spawn(async { Self::oneshot_send(tx, location.await.ok(), "current location"); }); @@ -1420,7 +1522,7 @@ where } Connected { location, .. } => { let relay_location = location.clone(); - let location_future = self.get_geo_location(); + let location_future = self.get_geo_location().await; tokio::spawn(async { let location = location_future.await; Self::oneshot_send( @@ -1441,8 +1543,8 @@ where } } - fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> { - let rpc_service = self.rpc_runtime.rest_handle(); + async fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> { + let rpc_service = self.rpc_runtime.rest_handle().await; async { geoip::send_location_request(rpc_service) .await diff --git a/mullvad-daemon/src/relays/mod.rs b/mullvad-daemon/src/relays/mod.rs index 4f96a6ddcd..332ca5fea3 100644 --- a/mullvad-daemon/src/relays/mod.rs +++ b/mullvad-daemon/src/relays/mod.rs @@ -6,7 +6,7 @@ use ipnetwork::IpNetwork; use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle}; use mullvad_types::{ endpoint::{MullvadEndpoint, MullvadWireguardEndpoint}, - location::Location, + location::{Coordinates, Location}, relay_constraints::{ BridgeState, Constraint, InternalBridgeConstraints, LocationConstraint, Match, OpenVpnConstraints, Providers, RelayConstraints, Set, TransportPort, WireguardConstraints, @@ -302,6 +302,34 @@ impl RelaySelector { } } + /// Returns the average location of relays that match the given constraints. + /// This returns none if the location is `any` or if no relays match the constraints. + pub fn get_relay_midpoint(&self, relay_constraints: &RelayConstraints) -> Option<Coordinates> { + if relay_constraints.location.is_any() { + return None; + } + + let matcher = RelayMatcher::from(relay_constraints.clone()); + let mut matching_locations: Vec<Location> = self + .parsed_relays + .lock() + .relays() + .iter() + .filter(|relay| relay.active) + .filter_map(|relay| { + matcher + .filter_matching_relay(relay) + .and_then(|relay| relay.location) + }) + .collect(); + matching_locations.dedup_by(|a, b| a.has_same_city(b)); + + if matching_locations.is_empty() { + return None; + } + Some(Coordinates::midpoint(&matching_locations)) + } + /// Returns an OpenVpn endpoint, should only ever be used when the user has specified the tunnel /// protocol as only OpenVPN. fn get_openvpn_endpoint( @@ -635,10 +663,10 @@ impl RelaySelector { entry_endpoint.exit_peer = Some(exit_peer.clone()); } - pub fn get_auto_proxy_settings( + pub fn get_auto_proxy_settings<T: Into<Coordinates>>( &mut self, bridge_constraints: &InternalBridgeConstraints, - location: &Location, + location: Option<T>, retry_attempt: u32, ) -> Option<(ProxySettings, Relay)> { if !self.should_use_bridge(retry_attempt) { @@ -663,10 +691,10 @@ impl RelaySelector { (retry_attempt % 4) < 2 } - pub fn get_proxy_settings( + pub fn get_proxy_settings<T: Into<Coordinates>>( &mut self, constraints: &InternalBridgeConstraints, - location: &Location, + location: Option<T>, ) -> Option<(ProxySettings, Relay)> { let mut matching_relays: Vec<Relay> = self .parsed_relays @@ -681,10 +709,16 @@ impl RelaySelector { return None; } - matching_relays.sort_by_cached_key(|relay| { - (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64 - }); - matching_relays.get(0).and_then(|relay| { + let relay = if let Some(location) = location { + let location = location.into(); + matching_relays.sort_by_cached_key(|relay| { + (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64 + }); + matching_relays.get(0) + } else { + self.pick_random_relay(&matching_relays) + }; + relay.and_then(|relay| { self.pick_random_bridge(&relay) .map(|bridge| (bridge, relay.clone())) }) diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs index 0c06beb43f..381e5bd54c 100644 --- a/mullvad-daemon/src/relays/updater.rs +++ b/mullvad-daemon/src/relays/updater.rs @@ -58,7 +58,6 @@ impl RelayListUpdater { api_availability: ApiAvailabilityHandle, ) -> RelayListUpdaterHandle { let (tx, cmd_rx) = mpsc::channel(1); - let service = rpc_handle.service(); let rpc_client = RelayListProxy::new(rpc_handle); let updater = RelayListUpdater { rpc_client, @@ -69,7 +68,7 @@ impl RelayListUpdater { api_availability, }; - service.spawn(updater.run(cmd_rx)); + tokio::spawn(updater.run(cmd_rx)); RelayListUpdaterHandle { tx } } diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs index 72c75e7360..646988e11b 100644 --- a/mullvad-jni/src/lib.rs +++ b/mullvad-jni/src/lib.rs @@ -940,7 +940,7 @@ pub extern "system" fn Java_net_mullvad_mullvadvpn_service_MullvadDaemon_updateR fn log_request_error(request: &str, error: &daemon_interface::Error) { match error { - daemon_interface::Error::RpcError(RestError::Aborted(_)) => { + daemon_interface::Error::RpcError(RestError::Aborted) => { log::debug!("Request to {} cancelled", request); } error => { diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs index 48b80dd1ff..4b9def6adb 100644 --- a/mullvad-problem-report/src/lib.rs +++ b/mullvad-problem-report/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms)] use lazy_static::lazy_static; +use mullvad_rpc::proxy::ApiConnectionMode; use regex::Regex; use std::{ borrow::Cow, @@ -270,50 +271,70 @@ pub fn send_problem_report( } })?, ); - let metadata = - ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect()); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() .map_err(Error::CreateRuntime)?; + runtime.block_on(send_problem_report_inner( + user_email, + user_message, + &report_content, + cache_dir, + )) +} - let mut rpc_manager = runtime - .block_on(mullvad_rpc::MullvadRpcRuntime::with_cache( - None, - cache_dir, - false, - #[cfg(target_os = "android")] - None, - )) - .map_err(Error::CreateRpcClientError)?; - let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle()); +async fn send_problem_report_inner( + user_email: &str, + user_message: &str, + report_content: &str, + cache_dir: &Path, +) -> Result<(), Error> { + let metadata = + ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect()); + let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( + cache_dir, + false, + #[cfg(target_os = "android")] + None, + ) + .await + .map_err(Error::CreateRpcClientError)?; - runtime.block_on(async move { - for _attempt in 0..MAX_SEND_ATTEMPTS { - match rpc_client - .problem_report(user_email, user_message, &report_content, &metadata) - .await - { - Ok(()) => { - return Ok(()); - } - Err(error) => { - if !error.is_network_error() { - return Err(Error::SendProblemReportError(error)); - } - log::error!( - "{}", - error.display_chain_with_msg( - "Failed to send problem report due to network error" - ) - ); + let rpc_client = mullvad_rpc::ProblemReportProxy::new( + rpc_runtime + .mullvad_rest_handle( + ApiConnectionMode::try_from_cache(cache_dir) + .await + .into_repeat(), + |_| async { true }, + ) + .await, + ); + + for _attempt in 0..MAX_SEND_ATTEMPTS { + match rpc_client + .problem_report(user_email, user_message, &report_content, &metadata) + .await + { + Ok(()) => { + return Ok(()); + } + Err(error) => { + if !error.is_network_error() { + return Err(Error::SendProblemReportError(error)); } + log::error!( + "{}", + error.display_chain_with_msg( + "Failed to send problem report due to network error" + ) + ); } } - Err(Error::SendFailedTooManyTimes) - }) + } + Err(Error::SendFailedTooManyTimes) } fn write_problem_report(path: &Path, problem_report: &ProblemReport) -> io::Result<()> { diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml index 6fdf3e3400..4b18512c75 100644 --- a/mullvad-rpc/Cargo.toml +++ b/mullvad-rpc/Cargo.toml @@ -34,5 +34,7 @@ lazy_static = "1.1.0" mullvad-types = { path = "../mullvad-types" } talpid-types = { path = "../talpid-types" } +shadowsocks = { version = "1.12", default-features = false, features = ["stream-cipher"] } + [target.'cfg(target_os="macos")'.dependencies] tokio-stream = { version = "0.1", features = ["io-util"] } diff --git a/mullvad-rpc/src/abortable_stream.rs b/mullvad-rpc/src/abortable_stream.rs index 160e329dfb..af217c5768 100644 --- a/mullvad-rpc/src/abortable_stream.rs +++ b/mullvad-rpc/src/abortable_stream.rs @@ -12,6 +12,10 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +#[derive(err_derive::Error, Debug)] +#[error(display = "Stream is closed")] +pub struct Aborted(()); + #[derive(Clone, Debug)] pub struct AbortableStreamHandle { tx: Arc<Mutex<Option<oneshot::Sender<()>>>>, @@ -71,7 +75,7 @@ where if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) { return Poll::Ready(Err(io::Error::new( io::ErrorKind::ConnectionReset, - "stream is closed", + Aborted(()), ))); } Pin::new(&mut self.stream).poll_write(cx, buf) @@ -81,7 +85,7 @@ where if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) { return Poll::Ready(Err(io::Error::new( io::ErrorKind::ConnectionReset, - "stream is closed", + Aborted(()), ))); } Pin::new(&mut self.stream).poll_flush(cx) @@ -104,7 +108,7 @@ where if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) { return Poll::Ready(Err(io::Error::new( io::ErrorKind::ConnectionReset, - "stream is closed", + Aborted(()), ))); } Pin::new(&mut self.stream).poll_read(cx, buf) diff --git a/mullvad-rpc/src/address_cache.rs b/mullvad-rpc/src/address_cache.rs index f1048b7ede..3b6fcba074 100644 --- a/mullvad-rpc/src/address_cache.rs +++ b/mullvad-rpc/src/address_cache.rs @@ -1,16 +1,9 @@ use super::API; -use rand::seq::SliceRandom; -use std::{ - io, - net::SocketAddr, - ops::{Deref, DerefMut}, - path::Path, - sync::{Arc, Mutex}, -}; -use talpid_types::ErrorExt; +use std::{io, net::SocketAddr, path::Path, sync::Arc}; use tokio::{ fs, - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::Mutex, }; #[derive(err_derive::Error, Debug)] @@ -22,212 +15,78 @@ pub enum Error { #[error(display = "Failed to read the address cache file")] ReadAddressCache(#[error(source)] io::Error), + #[error(display = "Failed to parse the address cache file")] + ParseAddressCache, + #[error(display = "Failed to update the address cache file")] WriteAddressCache(#[error(source)] io::Error), #[error(display = "The address cache is empty")] EmptyAddressCache, - - #[error(display = "The address change listener returned an error")] - ChangeListenerError, } -pub type CurrentAddressChangeListener = - dyn Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static; - #[derive(Clone)] pub struct AddressCache { inner: Arc<Mutex<AddressCacheInner>>, write_path: Option<Arc<Path>>, - change_listener: Arc<Box<CurrentAddressChangeListener>>, } impl AddressCache { - /// Initialize cache using the given list, and write changes to `write_path`. - pub fn new(addresses: Vec<SocketAddr>, write_path: Option<Box<Path>>) -> Result<Self, Error> { - let mut cache = AddressCacheInner::from_addresses(addresses)?; - cache.shuffle_tail(); - log::trace!("API address cache: {:?}", cache.addresses); - log::debug!("Using API address: {:?}", Self::get_address_inner(&cache)); - - let address_cache = Self { - inner: Arc::new(Mutex::new(cache)), - write_path: write_path.map(|cache| Arc::from(cache)), - change_listener: Arc::new(Box::new(|_| Ok(()))), - }; - Ok(address_cache) + /// Initialize cache using the hardcoded address, and write changes to `write_path`. + pub fn new(write_path: Option<Box<Path>>) -> Result<Self, Error> { + Self::new_inner(API.addr, write_path) } /// Initialize cache using `read_path`, and write changes to `write_path`. pub async fn from_file(read_path: &Path, write_path: Option<Box<Path>>) -> Result<Self, Error> { log::debug!("Loading API addresses from {}", read_path.display()); - Self::new(read_address_file(read_path).await?, write_path) + Self::new_inner(read_address_file(read_path).await?, write_path) } - pub fn set_change_listener(&mut self, change_listener: Arc<Box<CurrentAddressChangeListener>>) { - self.change_listener = change_listener; - } + fn new_inner(address: SocketAddr, write_path: Option<Box<Path>>) -> Result<Self, Error> { + let cache = AddressCacheInner::from_address(address); + log::debug!("Using API address: {}", cache.address); - /// Returns the currently selected address. - pub fn get_address(&self) -> SocketAddr { - let mut inner = self.inner.lock().unwrap(); - inner.tried_current = true; - Self::get_address_inner(&inner) - } - - /// Returns the current address without registering it as "tried" - /// in [Self::has_tried_current_address]. - pub fn peek_address(&self) -> SocketAddr { - let inner = self.inner.lock().unwrap(); - Self::get_address_inner(&inner) + let address_cache = Self { + inner: Arc::new(Mutex::new(cache)), + write_path: write_path.map(|cache| Arc::from(cache)), + }; + Ok(address_cache) } - fn get_address_inner(inner: &AddressCacheInner) -> SocketAddr { - if inner.addresses.is_empty() { - return API.addr; + /// Returns the address if the hostname equals `API.host`. Otherwise, returns `None`. + pub async fn resolve_hostname(&self, hostname: &str) -> Option<SocketAddr> { + if hostname.eq_ignore_ascii_case(&API.host) { + Some(self.get_address().await) + } else { + None } - *inner - .addresses - .get(inner.choice % inner.addresses.len()) - .unwrap_or(&API.addr) } - pub fn has_tried_current_address(&self) -> bool { - let inner = self.inner.lock().unwrap(); - inner.tried_current - } - - pub async fn select_new_address(&self) { - { - let mut inner = self.inner.lock().unwrap(); - let mut transaction = AddressCacheTransaction::new(&mut inner); - - transaction.choice = transaction.current.choice.wrapping_add(1); - if transaction.choice == transaction.current.choice { - return; - } - transaction.tried_current = false; - - tokio::task::block_in_place(move || { - if (*self.change_listener)(Self::get_address_inner(&transaction)).is_err() { - log::error!("Failed to select a new API endpoint"); - return; - } - transaction.commit(); - }); - } - - if let Err(error) = self.save_to_disk().await { - log::error!("{}", error.display_chain()); - } + /// Returns the currently selected address. + pub async fn get_address(&self) -> SocketAddr { + self.inner.lock().await.address } - /// Forgets the currently selected address and randomizes - /// the entire list. - pub async fn randomize(&self) -> Result<(), Error> { - { - let mut inner = self.inner.lock().unwrap(); - - let mut transaction = AddressCacheTransaction::new(&mut inner); - transaction.shuffle(); - transaction.choice = 0; - - let current_address = Self::get_address_inner(&transaction.current); - let new_address = Self::get_address_inner(&transaction); - - tokio::task::block_in_place(move || { - if new_address != current_address { - transaction.tried_current = false; - if (*self.change_listener)(new_address).is_err() { - return Err(Error::ChangeListenerError); - } - } - - transaction.commit(); - Ok(()) - })?; - } - self.save_to_disk().await.map_err(Error::WriteAddressCache) - } - - pub async fn set_addresses(&self, mut addresses: Vec<SocketAddr>) -> io::Result<()> { - let should_update = { - let mut inner = self.inner.lock().unwrap(); - let mut transaction = AddressCacheTransaction::new(&mut inner); - - addresses.sort(); - - let mut current_sorted = transaction.addresses.clone(); - current_sorted.sort(); - - if addresses != current_sorted { - let current_address = Self::get_address_inner(&transaction); - - transaction.addresses = addresses.clone(); - transaction.shuffle(); - - // Prefer a likely-working address - let choice = transaction - .addresses - .iter() - .position(|&addr| addr == current_address); - if let Some(choice) = choice { - transaction.choice = choice; - transaction.commit(); - } else { - transaction.choice = 0; - transaction.tried_current = false; - - tokio::task::block_in_place(move || { - if (*self.change_listener)(Self::get_address_inner(&transaction)).is_err() { - log::error!("Failed to select a new API endpoint"); - return Err(io::Error::new( - io::ErrorKind::Other, - "callback returned an error", - )); - } - transaction.commit(); - Ok(()) - })?; - } - - true - } else { - false - } - }; - if should_update { - log::trace!("API address cache: {:?}", addresses); - self.save_to_disk().await?; + pub async fn set_address(&self, address: SocketAddr) -> io::Result<()> { + let mut inner = self.inner.lock().await; + if address != inner.address { + self.save_to_disk(&address).await?; + inner.address = address; } Ok(()) } - async fn save_to_disk(&self) -> io::Result<()> { + async fn save_to_disk(&self, address: &SocketAddr) -> io::Result<()> { let write_path = match self.write_path.as_ref() { Some(write_path) => write_path, None => return Ok(()), }; - let (mut addresses, choice) = { - let inner = self.inner.lock().unwrap(); - (inner.addresses.clone(), inner.choice) - }; - - // Place the current choice on top - if !addresses.is_empty() { - let addresses_len = addresses.len(); - addresses.swap(0, choice % addresses_len); - } - let temp_path = write_path.with_file_name("api-cache.temp"); let mut file = fs::File::create(&temp_path).await?; - let mut contents = addresses - .iter() - .map(ToString::to_string) - .collect::<Vec<String>>() - .join("\n"); + let mut contents = address.to_string(); contents += "\n"; file.write_all(contents.as_bytes()).await?; file.sync_data().await?; @@ -236,96 +95,24 @@ impl AddressCache { } } -impl crate::rest::AddressProvider for AddressCache { - fn get_address(&self) -> String { - self.get_address().to_string() - } - - fn clone_box(&self) -> Box<dyn crate::rest::AddressProvider> { - Box::new(self.clone()) - } -} - #[derive(Clone, PartialEq, Eq)] struct AddressCacheInner { - addresses: Vec<SocketAddr>, - choice: usize, - tried_current: bool, + address: SocketAddr, } impl AddressCacheInner { - fn from_addresses(addresses: Vec<SocketAddr>) -> Result<Self, Error> { - if addresses.is_empty() { - return Err(Error::EmptyAddressCache); - } - Ok(Self { - addresses, - choice: 0, - tried_current: false, - }) - } - - fn shuffle(&mut self) { - let mut rng = rand::thread_rng(); - (&mut self.addresses[..]).shuffle(&mut rng); - } - - /// Shuffle all but the first element - fn shuffle_tail(&mut self) { - let mut rng = rand::thread_rng(); - (&mut self.addresses[1..]).shuffle(&mut rng); - } -} - -struct AddressCacheTransaction<'a> { - current: &'a mut AddressCacheInner, - working_cache: AddressCacheInner, -} - -impl<'a> AddressCacheTransaction<'a> { - fn new(cache: &'a mut AddressCacheInner) -> Self { - Self { - working_cache: cache.clone(), - current: cache, - } - } - - fn commit(self) { - *self.current = self.working_cache; + fn from_address(address: SocketAddr) -> Self { + Self { address } } } -impl<'a> Deref for AddressCacheTransaction<'a> { - type Target = AddressCacheInner; - - fn deref(&self) -> &Self::Target { - &self.working_cache - } -} - -impl<'a> DerefMut for AddressCacheTransaction<'a> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.working_cache - } -} - -async fn read_address_file(path: &Path) -> Result<Vec<SocketAddr>, Error> { - let file = fs::File::open(path) +async fn read_address_file(path: &Path) -> Result<SocketAddr, Error> { + let mut file = fs::File::open(path) .await .map_err(|error| Error::OpenAddressCache(error))?; - let mut lines = BufReader::new(file).lines(); - let mut addresses = vec![]; - while let Some(line) = lines - .next_line() + let mut address = String::new(); + file.read_to_string(&mut address) .await - .map_err(|error| Error::ReadAddressCache(error))? - { - match line.trim().parse() { - Ok(address) => addresses.push(address), - Err(err) => { - log::error!("Failed to parse cached address line: {}", err); - } - } - } - Ok(addresses) + .map_err(Error::ReadAddressCache)?; + address.trim().parse().map_err(|_| Error::ParseAddressCache) } diff --git a/mullvad-rpc/src/bin/address_cache.rs b/mullvad-rpc/src/bin/address_cache.rs deleted file mode 100644 index 066ce5c910..0000000000 --- a/mullvad-rpc/src/bin/address_cache.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Fetches and prints a list of IP addresses and ports where the Mullvad API -//! can be reached. -//! Used by the installer artifact packer to bundle the latest available list -//! of API IPs. - -use mullvad_rpc::{rest::Error as RestError, ApiProxy, MullvadRpcRuntime}; -use std::process; -use talpid_types::ErrorExt; - -#[tokio::main] -async fn main() { - let mut runtime = - MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime"); - - let api_proxy = ApiProxy::new(runtime.mullvad_rest_handle()); - let request = api_proxy.get_api_addrs().await; - - let api_list = match request { - Ok(api_list) => api_list, - Err(RestError::TimeoutError(_)) => { - eprintln!("Request timed out"); - process::exit(2); - } - Err(e @ RestError::DeserializeError(_)) => { - eprintln!( - "{}", - e.display_chain_with_msg("Failed to deserialize API address list") - ); - process::exit(3); - } - Err(e) => { - eprintln!( - "{}", - e.display_chain_with_msg("Failed to fetch API address list") - ); - process::exit(1); - } - }; - - for address in api_list { - println!("{}", address); - } -} diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs index db7fc29854..66118d3ade 100644 --- a/mullvad-rpc/src/bin/relay_list.rs +++ b/mullvad-rpc/src/bin/relay_list.rs @@ -2,18 +2,24 @@ //! Used by the installer artifact packer to bundle the latest available //! relay list at the time of creating the installer. -use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy}; +use mullvad_rpc::{ + proxy::ApiConnectionMode, rest::Error as RestError, MullvadRpcRuntime, RelayListProxy, +}; use std::process; use talpid_types::ErrorExt; #[tokio::main] async fn main() { - let mut runtime = + let runtime = MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime"); - let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()) - .relay_list(None) - .await; + let relay_list_request = RelayListProxy::new( + runtime + .mullvad_rest_handle(ApiConnectionMode::Direct.into_repeat(), |_| async { true }) + .await, + ) + .relay_list(None) + .await; let relay_list = match relay_list_request { Ok(relay_list) => relay_list, diff --git a/mullvad-rpc/src/https_client_with_sni.rs b/mullvad-rpc/src/https_client_with_sni.rs index 4d8108f89b..238b191491 100644 --- a/mullvad-rpc/src/https_client_with_sni.rs +++ b/mullvad-rpc/src/https_client_with_sni.rs @@ -1,8 +1,10 @@ use crate::{ abortable_stream::{AbortableStream, AbortableStreamHandle}, + proxy::{ApiConnection, ApiConnectionMode, ProxyConfig}, tls_stream::TlsStream, + AddressCache, }; -use futures::{channel::mpsc, StreamExt}; +use futures::{channel::mpsc, future, StreamExt}; #[cfg(target_os = "android")] use futures::{channel::oneshot, sink::SinkExt}; use http::uri::Scheme; @@ -11,6 +13,13 @@ use hyper::{ service::Service, Uri, }; +use shadowsocks::{ + config::ServerType, + context::{Context as SsContext, SharedContext}, + crypto::v1::CipherKind, + relay::tcprelay::ProxyClientStream, + ServerConfig, +}; #[cfg(target_os = "android")] use std::os::unix::io::{AsRawFd, RawFd}; use std::{ @@ -24,22 +33,80 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use talpid_types::ErrorExt; #[cfg(target_os = "android")] use tokio::net::TcpSocket; -use tokio::{net::TcpStream, runtime::Handle, time::timeout}; +use tokio::{net::TcpStream, time::timeout}; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); #[derive(Clone)] pub struct HttpsConnectorWithSniHandle { - tx: mpsc::UnboundedSender<()>, + tx: mpsc::UnboundedSender<HttpsConnectorRequest>, } impl HttpsConnectorWithSniHandle { /// Stop all streams produced by this connector pub fn reset(&self) { - let _ = self.tx.unbounded_send(()); + let _ = self.tx.unbounded_send(HttpsConnectorRequest::Reset); + } + + /// Change the proxy settings for the connector + pub fn set_connection_mode(&self, proxy: ApiConnectionMode) { + let _ = self + .tx + .unbounded_send(HttpsConnectorRequest::SetConnectionMode(proxy)); + } +} + +enum HttpsConnectorRequest { + Reset, + SetConnectionMode(ApiConnectionMode), +} + +#[derive(Clone)] +enum InnerConnectionMode { + /// Connect directly to the target. + Direct, + /// Connect to the destination via a proxy. + Proxied(ParsedShadowsocksConfig), +} + +#[derive(Clone)] +struct ParsedShadowsocksConfig { + peer: SocketAddr, + password: String, + cipher: CipherKind, +} + +impl From<ParsedShadowsocksConfig> for ServerConfig { + fn from(config: ParsedShadowsocksConfig) -> Self { + ServerConfig::new(config.peer, config.password, config.cipher) + } +} + +#[derive(err_derive::Error, Debug)] +enum ProxyConfigError { + #[error(display = "Unrecognized cipher selected: {}", _0)] + InvalidCipher(String), +} + +impl TryFrom<ApiConnectionMode> for InnerConnectionMode { + type Error = ProxyConfigError; + + fn try_from(config: ApiConnectionMode) -> Result<Self, Self::Error> { + Ok(match config { + ApiConnectionMode::Direct => InnerConnectionMode::Direct, + ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(config)) => { + InnerConnectionMode::Proxied(ParsedShadowsocksConfig { + peer: config.peer, + password: config.password, + cipher: CipherKind::from_str(&config.cipher) + .map_err(|_| ProxyConfigError::InvalidCipher(config.cipher))?, + }) + } + }) } } @@ -48,12 +115,16 @@ impl HttpsConnectorWithSniHandle { pub struct HttpsConnectorWithSni { inner: Arc<Mutex<HttpsConnectorWithSniInner>>, sni_hostname: Option<String>, + address_cache: AddressCache, + abort_notify: Arc<tokio::sync::Notify>, + proxy_context: SharedContext, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, } struct HttpsConnectorWithSniInner { stream_handles: Vec<AbortableStreamHandle>, + proxy_config: InnerConnectionMode, } #[cfg(target_os = "android")] @@ -61,26 +132,47 @@ pub type SocketBypassRequest = (RawFd, oneshot::Sender<()>); impl HttpsConnectorWithSni { pub fn new( - handle: Handle, sni_hostname: Option<String>, + address_cache: AddressCache, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, ) -> (Self, HttpsConnectorWithSniHandle) { - let (tx, mut rx): (_, mpsc::UnboundedReceiver<()>) = mpsc::unbounded(); + let (tx, mut rx) = mpsc::unbounded(); + let abort_notify = Arc::new(tokio::sync::Notify::new()); let inner = Arc::new(Mutex::new(HttpsConnectorWithSniInner { stream_handles: vec![], + proxy_config: InnerConnectionMode::Direct, })); let inner_copy = inner.clone(); - handle.spawn(async move { + let notify = abort_notify.clone(); + tokio::spawn(async move { // Handle requests by `HttpsConnectorWithSniHandle`s - while let Some(()) = rx.next().await { + while let Some(request) = rx.next().await { let handles = { let mut inner = inner_copy.lock().unwrap(); + + if let HttpsConnectorRequest::SetConnectionMode(config) = request { + match InnerConnectionMode::try_from(config) { + Ok(config) => { + inner.proxy_config = config; + } + Err(error) => { + log::error!( + "{}", + error.display_chain_with_msg( + "Failed to parse new API proxy config" + ) + ); + } + } + } + std::mem::take(&mut inner.stream_handles) }; for handle in handles { handle.close(); } + notify.notify_waiters(); } }); @@ -88,6 +180,9 @@ impl HttpsConnectorWithSni { HttpsConnectorWithSni { inner, sni_hostname, + address_cache, + abort_notify, + proxy_context: SsContext::new_shared(ServerType::Local), #[cfg(target_os = "android")] socket_bypass_tx, }, @@ -125,17 +220,24 @@ impl HttpsConnectorWithSni { .map_err(|err| io::Error::new(io::ErrorKind::TimedOut, err))? } - async fn resolve_address(uri: &Uri) -> io::Result<SocketAddr> { + async fn resolve_address(address_cache: AddressCache, uri: Uri) -> io::Result<SocketAddr> { let hostname = uri.host().ok_or(io::Error::new( io::ErrorKind::InvalidInput, "invalid url, missing host", ))?; let port = uri.port_u16().unwrap_or(443); - if let Some(addr) = hostname.parse::<IpAddr>().ok() { return Ok(SocketAddr::new(addr, port)); } + // Preferentially, use cached address. + // + if let Some(addr) = address_cache.resolve_hostname(hostname).await { + return Ok(SocketAddr::new(addr.ip(), port)); + } + + // Use getaddrinfo as a fallback + // let mut addrs = GaiResolver::new() .call( Name::from_str(&hostname) @@ -157,7 +259,7 @@ impl fmt::Debug for HttpsConnectorWithSni { } impl Service<Uri> for HttpsConnectorWithSni { - type Response = TlsStream<AbortableStream<TcpStream>>; + type Response = AbortableStream<ApiConnection>; type Error = io::Error; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; @@ -175,8 +277,11 @@ impl Service<Uri> for HttpsConnectorWithSni { io::Error::new(io::ErrorKind::InvalidInput, "invalid url, missing host") }); let inner = self.inner.clone(); + let abort_notify = self.abort_notify.clone(); + let proxy_context = self.proxy_context.clone(); #[cfg(target_os = "android")] let socket_bypass_tx = self.socket_bypass_tx.clone(); + let address_cache = self.address_cache.clone(); let fut = async move { if uri.scheme() != Some(&Scheme::HTTPS) { @@ -187,16 +292,62 @@ impl Service<Uri> for HttpsConnectorWithSni { } let hostname = sni_hostname?; - let addr = Self::resolve_address(&uri).await?; + let addr = Self::resolve_address(address_cache, uri).await?; - let tokio_connection = Self::open_socket( - addr, + // Loop until we have established a connection. This starts over if a new endpoint + // is selected while connecting. + let stream = loop { + let config = { inner.lock().unwrap().proxy_config.clone() }; + let hostname_copy = hostname.clone(); + let addr_copy = addr.clone(); + let context = proxy_context.clone(); #[cfg(target_os = "android")] - socket_bypass_tx, - ) - .await?; + let socket_bypass_tx_copy = socket_bypass_tx.clone(); + + let stream_fut: Pin< + Box<dyn Future<Output = Result<ApiConnection, io::Error>> + Send>, + > = Box::pin(async move { + match config { + InnerConnectionMode::Direct => { + let socket = Self::open_socket( + addr_copy, + #[cfg(target_os = "android")] + socket_bypass_tx_copy, + ) + .await?; + let tls_stream = + TlsStream::connect_https(socket, &hostname_copy).await?; + Ok(ApiConnection::Direct(tls_stream)) + } + InnerConnectionMode::Proxied(proxy_config) => { + let socket = Self::open_socket( + proxy_config.peer, + #[cfg(target_os = "android")] + socket_bypass_tx_copy, + ) + .await?; + let proxy = ProxyClientStream::from_stream( + context, + socket, + &ServerConfig::from(proxy_config), + addr, + ); + let tls_stream = + TlsStream::connect_https(proxy, &hostname_copy).await?; + Ok(ApiConnection::Proxied(tls_stream)) + } + } + }); + + // Wait for connection. Abort and retry if we switched to a different server. + if let future::Either::Left((stream, _)) = + future::select(stream_fut, Box::pin(abort_notify.notified())).await + { + break stream?; + } + }; - let (tcp_stream, socket_handle) = AbortableStream::new(tokio_connection); + let (stream, socket_handle) = AbortableStream::new(stream); { let mut inner = inner.lock().unwrap(); @@ -204,7 +355,7 @@ impl Service<Uri> for HttpsConnectorWithSni { inner.stream_handles.push(socket_handle); } - Ok(TlsStream::connect_https(tcp_stream, &hostname).await?) + Ok(stream) }; Box::pin(fut) diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs index faf20e00c9..cef6a73cc0 100644 --- a/mullvad-rpc/src/lib.rs +++ b/mullvad-rpc/src/lib.rs @@ -3,17 +3,18 @@ use chrono::{offset::Utc, DateTime}; #[cfg(target_os = "android")] use futures::channel::mpsc; +use futures::Stream; use hyper::Method; use mullvad_types::{ account::{AccountToken, VoucherSubmission}, version::AppVersion, }; +use proxy::ApiConnectionMode; use std::{ collections::BTreeMap, future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, - sync::Arc, }; use talpid_types::{net::wireguard, ErrorExt}; @@ -23,13 +24,14 @@ pub mod rest; mod abortable_stream; mod https_client_with_sni; +pub mod proxy; mod tls_stream; #[cfg(target_os = "android")] pub use crate::https_client_with_sni::SocketBypassRequest; mod address_cache; mod relay_list; -pub use address_cache::{AddressCache, CurrentAddressChangeListener}; +pub use address_cache::AddressCache; pub use hyper::StatusCode; pub use relay_list::RelayListProxy; @@ -150,7 +152,7 @@ impl MullvadRpcRuntime { ) -> Result<Self, Error> { Ok(MullvadRpcRuntime { handle, - address_cache: AddressCache::new(vec![API.addr], None)?, + address_cache: AddressCache::new(None)?, api_availability: ApiAvailability::new(availability::State::default()), #[cfg(target_os = "android")] socket_bypass_tx, @@ -158,10 +160,8 @@ impl MullvadRpcRuntime { } /// Create a new `MullvadRpcRuntime` using the specified directories. - /// Try to use the cache directory first, and fall back on the resource directory - /// if it fails. + /// Try to use the cache directory first, and fall back on the bundled address otherwise. pub async fn with_cache( - resource_dir: Option<&Path>, cache_dir: &Path, write_changes: bool, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, @@ -185,26 +185,15 @@ impl MullvadRpcRuntime { let address_cache = match AddressCache::from_file(&cache_file, write_file.clone()).await { Ok(cache) => cache, Err(error) => { - let cache_exists = cache_file.exists(); - if cache_exists { + if cache_file.exists() { log::error!( "{}", error.display_chain_with_msg( - "Failed to load cached API addresses. Falling back on bundled list" + "Failed to load cached API addresses. Falling back on bundled address" ) ); } - - // Initialize the cache directory cache using the resource directory - match resource_dir { - Some(resource_dir) => { - let read_file = resource_dir.join(API_IP_CACHE_FILENAME); - let cache = AddressCache::from_file(&read_file, write_file).await?; - cache.randomize().await?; - cache - } - None => return Err(Error::AddressCacheError(error)), - } + AddressCache::new(write_file)? } }; @@ -217,45 +206,49 @@ impl MullvadRpcRuntime { }) } - pub fn set_address_change_listener( - &mut self, - address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static, - ) { - self.address_cache - .set_change_listener(Arc::new(Box::new(address_change_listener))); - } - /// Creates a new request service and returns a handle to it. - fn new_request_service( - &mut self, + async fn new_request_service< + T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static, + AcceptedNewEndpoint: Future<Output = bool> + Send + 'static, + >( + &self, sni_hostname: Option<String>, + proxy_provider: T, + new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, ) -> rest::RequestServiceHandle { - let service = rest::RequestService::new( - self.handle.clone(), + let service_handle = rest::RequestService::new( sni_hostname, self.api_availability.handle(), self.address_cache.clone(), + proxy_provider, + new_address_callback, #[cfg(target_os = "android")] socket_bypass_tx, - ); - let handle = service.handle(); - self.handle.spawn(service.into_future()); - handle + ) + .await; + service_handle } /// Returns a request factory initialized to create requests for the master API - pub fn mullvad_rest_handle(&mut self) -> rest::MullvadRestHandle { - let service = self.new_request_service( - Some(API.host.clone()), - #[cfg(target_os = "android")] - self.socket_bypass_tx.clone(), - ); - let factory = rest::RequestFactory::new( - API.host.clone(), - Box::new(self.address_cache.clone()), - Some("app".to_owned()), - ); + pub async fn mullvad_rest_handle< + T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static, + AcceptedNewEndpoint: Future<Output = bool> + Send + 'static, + >( + &self, + proxy_provider: T, + new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static, + ) -> rest::MullvadRestHandle { + let service = self + .new_request_service( + Some(API.host.clone()), + proxy_provider, + new_address_callback, + #[cfg(target_os = "android")] + self.socket_bypass_tx.clone(), + ) + .await; + let factory = rest::RequestFactory::new(API.host.clone(), Some("app".to_owned())); rest::MullvadRestHandle::new( service, @@ -266,12 +259,15 @@ impl MullvadRpcRuntime { } /// Returns a new request service handle - pub fn rest_handle(&mut self) -> rest::RequestServiceHandle { + pub async fn rest_handle(&mut self) -> rest::RequestServiceHandle { self.new_request_service( None, + ApiConnectionMode::Direct.into_repeat(), + |_| async { true }, #[cfg(target_os = "android")] None, ) + .await } pub fn handle(&mut self) -> &mut tokio::runtime::Handle { diff --git a/mullvad-rpc/src/proxy.rs b/mullvad-rpc/src/proxy.rs new file mode 100644 index 0000000000..009a1960dc --- /dev/null +++ b/mullvad-rpc/src/proxy.rs @@ -0,0 +1,204 @@ +use crate::tls_stream::TlsStream; +use futures::Stream; +use hyper::client::connect::{Connected, Connection}; +use rand::{distributions::Alphanumeric, Rng}; +use serde::{Deserialize, Serialize}; +use shadowsocks::relay::tcprelay::ProxyClientStream; +use std::{ + fmt, io, + net::SocketAddr, + path::Path, + pin::Pin, + task::{self, Poll}, +}; +use talpid_types::{net::openvpn::ShadowsocksProxySettings, ErrorExt}; +use tokio::{ + fs, + io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}, + net::TcpStream, +}; + +const CURRENT_CONFIG_FILENAME: &str = "api-endpoint.json"; + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub enum ApiConnectionMode { + /// Connect directly to the target. + Direct, + /// Connect to the destination via a proxy. + Proxied(ProxyConfig), +} + +impl fmt::Display for ApiConnectionMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + ApiConnectionMode::Direct => write!(f, "unproxied"), + ApiConnectionMode::Proxied(settings) => settings.fmt(f), + } + } +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub enum ProxyConfig { + Shadowsocks(ShadowsocksProxySettings), +} + +impl fmt::Display for ProxyConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + // TODO: Do not hardcode TCP + ProxyConfig::Shadowsocks(ss) => write!(f, "Shadowsocks {}/TCP", ss.peer), + } + } +} + +impl ApiConnectionMode { + /// Reads the proxy config from `CURRENT_CONFIG_FILENAME`. + /// This returns `ApiConnectionMode::Direct` if reading from disk fails for any reason. + pub async fn try_from_cache(cache_dir: &Path) -> Self { + Self::from_cache(cache_dir).await.unwrap_or_else(|error| { + log::error!( + "{}", + error.display_chain_with_msg("Failed to read API endpoint cache") + ); + ApiConnectionMode::Direct + }) + } + + /// Reads the proxy config from `CURRENT_CONFIG_FILENAME`. + /// If the file does not exist, this returns `Ok(ApiConnectionMode::Direct)`. + async fn from_cache(cache_dir: &Path) -> io::Result<Self> { + let path = cache_dir.join(CURRENT_CONFIG_FILENAME); + match fs::read_to_string(path).await { + Ok(s) => serde_json::from_str(&s).map_err(|error| { + log::error!( + "{}", + error.display_chain_with_msg(&format!( + "Failed to deserialize \"{}\"", + CURRENT_CONFIG_FILENAME + )) + ); + io::Error::new(io::ErrorKind::Other, "deserialization failed") + }), + Err(error) => { + if error.kind() == io::ErrorKind::NotFound { + Ok(ApiConnectionMode::Direct) + } else { + Err(error) + } + } + } + } + + /// Stores this config to `CURRENT_CONFIG_FILENAME`. + /// The content is saved to a temporary file first, which ensures that + /// consumers of the file never end up with partial content. + pub async fn save(&self, cache_dir: &Path) -> io::Result<()> { + let path = cache_dir.join(CURRENT_CONFIG_FILENAME); + let mut temp_ext = String::from("temp"); + temp_ext.push_str( + &rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(5) + .map(char::from) + .collect::<String>(), + ); + let temp_path = path.with_extension(temp_ext); + + { + let mut file = fs::File::create(&temp_path).await?; + let json = serde_json::to_string_pretty(self) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "serialization failed"))?; + file.write_all(json.as_bytes()).await?; + file.write_all(b"\n").await?; + file.sync_data().await?; + } + + fs::rename(&temp_path, path).await + } + + /// Attempts to remove `CURRENT_CONFIG_FILENAME`, if it exists. + pub async fn try_delete_cache(cache_dir: &Path) { + let path = cache_dir.join(CURRENT_CONFIG_FILENAME); + if let Err(err) = fs::remove_file(path).await { + if err.kind() != std::io::ErrorKind::NotFound { + log::error!( + "{}", + err.display_chain_with_msg("Failed to remove old API config") + ); + } + } + } + + /// Returns the remote address, or `None` for `ApiConnectionMode::Direct`. + pub fn get_endpoint(&self) -> Option<SocketAddr> { + match self { + ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss)) => Some(ss.peer), + ApiConnectionMode::Direct => None, + } + } + + pub fn is_proxy(&self) -> bool { + *self != ApiConnectionMode::Direct + } + + /// Convenience function that returns a stream that repeats + /// this config forever. + pub fn into_repeat(self) -> impl Stream<Item = ApiConnectionMode> { + futures::stream::repeat(self) + } +} + +/// Stream that is either a regular TLS stream or TLS via shadowsocks +pub enum ApiConnection { + Direct(TlsStream<TcpStream>), + Proxied(TlsStream<ProxyClientStream<TcpStream>>), +} + +impl AsyncRead for ApiConnection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + match Pin::get_mut(self) { + ApiConnection::Direct(s) => Pin::new(s).poll_read(cx, buf), + ApiConnection::Proxied(s) => Pin::new(s).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for ApiConnection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + match Pin::get_mut(self) { + ApiConnection::Direct(s) => Pin::new(s).poll_write(cx, buf), + ApiConnection::Proxied(s) => Pin::new(s).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + match Pin::get_mut(self) { + ApiConnection::Direct(s) => Pin::new(s).poll_flush(cx), + ApiConnection::Proxied(s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + match Pin::get_mut(self) { + ApiConnection::Direct(s) => Pin::new(s).poll_shutdown(cx), + ApiConnection::Proxied(s) => Pin::new(s).poll_shutdown(cx), + } + } +} + +impl Connection for ApiConnection { + fn connected(&self) -> Connected { + match self { + ApiConnection::Direct(s) => s.connected(), + ApiConnection::Proxied(s) => s.connected(), + } + } +} diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 41b637f84d..8bb5efc72b 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -4,13 +4,13 @@ use crate::{ address_cache::AddressCache, availability::ApiAvailabilityHandle, https_client_with_sni::{HttpsConnectorWithSni, HttpsConnectorWithSniHandle}, + proxy::ApiConnectionMode, }; use futures::{ channel::{mpsc, oneshot}, - future::{abortable, AbortHandle, Aborted}, sink::SinkExt, stream::StreamExt, - TryFutureExt, + Stream, TryFutureExt, }; use hyper::{ client::Client, @@ -18,15 +18,12 @@ use hyper::{ Method, Uri, }; use std::{ - collections::BTreeMap, future::Future, - mem, - net::{IpAddr, SocketAddr}, + net::SocketAddr, str::FromStr, time::{Duration, Instant}, }; use talpid_types::ErrorExt; -use tokio::runtime::Handle; pub use hyper::StatusCode; @@ -45,7 +42,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Request cancelled")] - Aborted(Aborted), + Aborted, #[error(display = "Hyper error")] HyperError(#[error(source)] hyper::Error), @@ -84,114 +81,124 @@ impl Error { _ => false, } } + + /// Returns a new instance for which `abortable_stream::Aborted` is mapped to `Self::Aborted`. + fn map_aborted(self) -> Self { + if let Error::HyperError(error) = &self { + use std::error::Error; + let mut source = error.source(); + while let Some(error) = source { + let io_error: Option<&std::io::Error> = error.downcast_ref(); + if let Some(io_error) = io_error { + let abort_error: Option<&crate::abortable_stream::Aborted> = + io_error.get_ref().and_then(|inner| inner.downcast_ref()); + if abort_error.is_some() { + return Self::Aborted; + } + } + source = error.source(); + } + } + self + } } /// A service that executes HTTP requests, allowing for on-demand termination of all in-flight /// requests -pub(crate) struct RequestService { +pub(crate) struct RequestService< + T: Stream<Item = ApiConnectionMode>, + F: Fn(SocketAddr) -> AcceptedNewEndpoint, + AcceptedNewEndpoint: Future<Output = bool>, +> { command_tx: mpsc::Sender<RequestCommand>, command_rx: mpsc::Receiver<RequestCommand>, connector_handle: HttpsConnectorWithSniHandle, client: hyper::Client<HttpsConnectorWithSni, hyper::Body>, - handle: Handle, - next_id: u64, - in_flight_requests: BTreeMap<u64, AbortHandle>, - api_availability: ApiAvailabilityHandle, + proxy_config_provider: T, + new_address_callback: F, address_cache: AddressCache, + api_availability: ApiAvailabilityHandle, } -impl RequestService { +impl< + T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static, + F: (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static, + AcceptedNewEndpoint: Future<Output = bool> + Send + 'static, + > RequestService<T, F, AcceptedNewEndpoint> +{ /// Constructs a new request service. - pub fn new( - handle: Handle, + pub async fn new( sni_hostname: Option<String>, api_availability: ApiAvailabilityHandle, address_cache: AddressCache, + mut proxy_config_provider: T, + new_address_callback: F, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, - ) -> RequestService { + ) -> RequestServiceHandle { let (connector, connector_handle) = HttpsConnectorWithSni::new( - handle.clone(), sni_hostname, + address_cache.clone(), #[cfg(target_os = "android")] socket_bypass_tx.clone(), ); + proxy_config_provider + .next() + .await + .map(|config| connector_handle.set_connection_mode(config)); + let (command_tx, command_rx) = mpsc::channel(1); let client = Client::builder().build(connector); - Self { + let service = Self { command_tx, command_rx, connector_handle, client, - handle, - in_flight_requests: BTreeMap::new(), - next_id: 0, - api_availability, + proxy_config_provider, + new_address_callback, address_cache, - } + api_availability, + }; + let handle = service.handle(); + tokio::spawn(service.into_future()); + handle } - /// Constructs a handle - pub fn handle(&self) -> RequestServiceHandle { + fn handle(&self) -> RequestServiceHandle { RequestServiceHandle { tx: self.command_tx.clone(), - handle: self.handle.clone(), } } - fn process_command(&mut self, command: RequestCommand) { + async fn process_command(&mut self, command: RequestCommand) { match command { RequestCommand::NewRequest(request, completion_tx) => { - let id = self.id(); let mut tx = self.command_tx.clone(); let timeout = request.timeout(); let hyper_request = request.into_request(); - let host_addr = get_request_socket_addr(&hyper_request); let api_availability = self.api_availability.clone(); let suspend_fut = api_availability.wait_for_unsuspend(); let request_fut = self.client.request(hyper_request).map_err(Error::from); - let (request_future, abort_handle) = abortable(async move { + let request_future = async move { let _ = suspend_fut.await; request_fut.await - }); - let address_cache = self.address_cache.clone(); - let handle = self.handle.clone(); + }; let future = async move { - let response = - tokio::time::timeout(timeout, request_future.map_err(Error::Aborted)) - .await - .map_err(Error::TimeoutError); + let response = tokio::time::timeout(timeout, request_future) + .await + .map_err(Error::TimeoutError); - let response = flatten_result(flatten_result(response)); - if let Some(host_addr) = host_addr { - if let Err(err) = &response { - if err.is_network_error() { - log::error!( - "{}", - err.display_chain_with_msg("HTTP request failed") - ); - if !api_availability.get_state().is_offline() { - let current_address = address_cache.peek_address(); - if current_address == host_addr - && address_cache.has_tried_current_address() - { - handle.spawn(async move { - address_cache.select_new_address().await; - let new_address = address_cache.peek_address(); - log::error!( - "Request failed using address {}. Trying next API address: {}", - current_address, - new_address, - ); - }); - } - } - } + let response = flatten_result(response).map_err(|error| error.map_aborted()); + + if let Err(err) = &response { + if err.is_network_error() && !api_availability.get_state().is_offline() { + log::error!("{}", err.display_chain_with_msg("HTTP request failed")); + let _ = tx.send(RequestCommand::NextApiConfig).await; } } @@ -200,72 +207,46 @@ impl RequestService { "Failed to send response to caller, caller channel is shut down" ); } - let _ = tx.send(RequestCommand::RequestFinished(id)).await; }; - - self.in_flight_requests.insert(id, abort_handle); - self.handle.spawn(future); + tokio::spawn(future); } - RequestCommand::RequestFinished(id) => { - self.in_flight_requests.remove(&id); + RequestCommand::Reset => { + self.connector_handle.reset(); } - RequestCommand::Reset(tx) => { - self.reset(); - let _ = tx.send(()); + RequestCommand::NextApiConfig => { + if let Some(new_config) = self.proxy_config_provider.next().await { + let endpoint = match new_config.get_endpoint() { + Some(endpoint) => endpoint, + None => self.address_cache.get_address().await, + }; + // Switch to new connection mode unless rejected by address change callback + if (self.new_address_callback)(endpoint).await { + self.connector_handle.set_connection_mode(new_config); + } + } } } } - fn reset(&mut self) { - let old_requests = mem::take(&mut self.in_flight_requests); - for (_, abort_handle) in old_requests { - abort_handle.abort(); - } - - self.connector_handle.reset(); - } - - fn id(&mut self) -> u64 { - let id = self.next_id; - self.next_id = id.wrapping_add(1); - id - } - - pub async fn into_future(mut self) { + async fn into_future(mut self) { while let Some(command) = self.command_rx.next().await { - self.process_command(command); + self.process_command(command).await; } - self.reset(); + self.connector_handle.reset(); } } -fn get_request_socket_addr(request: &Request) -> Option<SocketAddr> { - let uri = request.uri(); - let port = uri - .port_u16() - // Assuming HTTPS always - .unwrap_or(443); - - let host_addr = uri.host().and_then(|host| host.parse::<IpAddr>().ok())?; - - Some(SocketAddr::new(host_addr, port)) -} - #[derive(Clone)] /// A handle to interact with a spawned `RequestService`. pub struct RequestServiceHandle { tx: mpsc::Sender<RequestCommand>, - handle: Handle, } impl RequestServiceHandle { /// Resets the corresponding RequestService, dropping all in-flight requests. pub async fn reset(&self) { let mut tx = self.tx.clone(); - let (done_tx, done_rx) = oneshot::channel(); - - let _ = tx.send(RequestCommand::Reset(done_tx)).await; - let _ = done_rx.await; + let _ = tx.send(RequestCommand::Reset).await; } /// Submits a `RestRequest` for exectuion to the request service. @@ -278,11 +259,6 @@ impl RequestServiceHandle { completion_rx.await.map_err(|_| Error::ReceiveError)? } - - /// Spawns a future on the RPC runtime. - pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) { - let _ = self.handle.spawn(future); - } } #[derive(Debug)] @@ -291,8 +267,8 @@ pub(crate) enum RequestCommand { RestRequest, oneshot::Sender<std::result::Result<Response, Error>>, ), - RequestFinished(u64), - Reset(oneshot::Sender<()>), + Reset, + NextApiConfig, } /// A REST request that is sent to the RequestService to be executed. @@ -392,20 +368,14 @@ pub struct ErrorResponse { #[derive(Clone)] pub struct RequestFactory { hostname: String, - address_provider: Box<dyn AddressProvider>, path_prefix: Option<String>, pub timeout: Duration, } impl RequestFactory { - pub fn new( - hostname: String, - address_provider: Box<dyn AddressProvider>, - path_prefix: Option<String>, - ) -> Self { + pub fn new(hostname: String, path_prefix: Option<String>) -> Self { Self { hostname, - address_provider, path_prefix, timeout: DEFAULT_TIMEOUT, } @@ -466,9 +436,8 @@ impl RequestFactory { } fn get_uri(&self, path: &str) -> Result<Uri> { - let host = self.address_provider.get_address(); let prefix = self.path_prefix.as_ref().map(AsRef::as_ref).unwrap_or(""); - let uri = format!("https://{}/{}{}", host, prefix, path); + let uri = format!("https://{}/{}{}", self.hostname, prefix, path); hyper::Uri::from_str(&uri).map_err(Error::UriError) } @@ -478,29 +447,6 @@ impl RequestFactory { } } -pub trait AddressProvider: Send + Sync { - /// Must return a string that represents either a host or a host with port - fn get_address(&self) -> String; - fn clone_box(&self) -> Box<dyn AddressProvider>; -} - -impl Clone for Box<dyn AddressProvider> { - fn clone(&self) -> Self { - self.clone_box() - } -} - -impl AddressProvider for IpAddr { - /// Must return a string that represents either a host or a host with port - fn get_address(&self) -> String { - self.to_string() - } - - fn clone_box(&self) -> Box<dyn AddressProvider> { - Box::new(*self) - } -} - pub fn get_request<T: serde::de::DeserializeOwned>( factory: &RequestFactory, service: RequestServiceHandle, @@ -632,7 +578,7 @@ impl MullvadRestHandle { let handle = self.clone(); let availability = self.availability.clone(); - self.service.spawn(async move { + tokio::spawn(async move { // always start the fetch after 15 minutes let api_proxy = crate::ApiProxy::new(handle); let mut next_check = Instant::now() + API_IP_CHECK_DELAY; @@ -652,14 +598,29 @@ impl MullvadRestHandle { } match api_proxy.clone().get_api_addrs().await { Ok(new_addrs) => { - log::debug!("Fetched new API addresses {:?}, will fetch again in {} hours", new_addrs, API_IP_CHECK_INTERVAL.as_secs() / ( 60 * 60 )); - if let Err(err) = address_cache.set_addresses(new_addrs).await { - log::error!("Failed to save newly updated API addresses: {}", err); + if let Some(addr) = new_addrs.get(0) { + log::debug!( + "Fetched new API address {:?}. Fetching again in {} hours", + addr, + API_IP_CHECK_INTERVAL.as_secs() / (60 * 60) + ); + if let Err(err) = address_cache.set_address(*addr).await { + log::error!( + "Failed to save newly updated API address: {}", + err + ); + } + } else { + log::error!("API returned no API addresses"); } next_check = next_regular_check(); } Err(err) => { - log::error!("Failed to fetch new API addresses: {}, will retry again in {} seconds", err, API_IP_CHECK_ERROR_INTERVAL.as_secs()); + log::error!( + "Failed to fetch new API addresses: {}. Retrying in {} seconds", + err, + API_IP_CHECK_ERROR_INTERVAL.as_secs() + ); next_check = next_error_check(); } } diff --git a/mullvad-rpc/src/tls_stream.rs b/mullvad-rpc/src/tls_stream.rs index 232bb39b92..cad0268ac3 100644 --- a/mullvad-rpc/src/tls_stream.rs +++ b/mullvad-rpc/src/tls_stream.rs @@ -16,7 +16,7 @@ use tokio_rustls::{ const LE_ROOT_CERT: &[u8] = include_bytes!("../le_root_cert.pem"); pub struct TlsStream<S: AsyncRead + AsyncWrite + Unpin> { - stream: Pin<Box<tokio_rustls::client::TlsStream<S>>>, + stream: tokio_rustls::client::TlsStream<S>, } impl<S> TlsStream<S> @@ -49,11 +49,9 @@ where } }; - let tls_stream = connector.connect(host, stream).await?; + let stream = connector.connect(host, stream).await?; - Ok(TlsStream { - stream: Box::pin(tls_stream), - }) + Ok(TlsStream { stream }) } } @@ -79,7 +77,7 @@ where cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - self.stream.as_mut().poll_read(cx, buf) + Pin::new(&mut self.stream).poll_read(cx, buf) } } @@ -92,15 +90,15 @@ where cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - self.stream.as_mut().poll_write(cx, buf) + Pin::new(&mut self.stream).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { - self.stream.as_mut().poll_flush(cx) + Pin::new(&mut self.stream).poll_flush(cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { - self.stream.as_mut().poll_shutdown(cx) + Pin::new(&mut self.stream).poll_shutdown(cx) } } diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs index 947f9bd9ca..e65b1278f8 100644 --- a/mullvad-setup/src/main.rs +++ b/mullvad-setup/src/main.rs @@ -1,6 +1,6 @@ use clap::{crate_authors, crate_description, crate_name, App}; use mullvad_management_interface::new_rpc_client; -use mullvad_rpc::MullvadRpcRuntime; +use mullvad_rpc::{proxy::ApiConnectionMode, MullvadRpcRuntime}; use mullvad_types::version::ParsedAppVersion; use std::{path::PathBuf, process, time::Duration}; use talpid_core::{ @@ -165,11 +165,19 @@ async fn remove_wireguard_key() -> Result<(), Error> { if let Some(token) = settings.get_account_token() { if let Some(wg_data) = settings.get_wireguard() { - let mut rpc_runtime = MullvadRpcRuntime::with_cache(None, &cache_path, false) + let rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false) .await .map_err(Error::RpcInitializationError)?; - let mut key_proxy = - mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle()); + let mut key_proxy = mullvad_rpc::WireguardKeyProxy::new( + rpc_runtime + .mullvad_rest_handle( + ApiConnectionMode::try_from_cache(&cache_path) + .await + .into_repeat(), + |_| async { true }, + ) + .await, + ); retry_future_n( move || { key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key()) diff --git a/mullvad-types/src/location.rs b/mullvad-types/src/location.rs index b9c460fa87..5a962dafb1 100644 --- a/mullvad-types/src/location.rs +++ b/mullvad-types/src/location.rs @@ -21,7 +21,7 @@ pub struct Location { const RAIDUS_OF_EARTH: f64 = 6372.8; impl Location { - pub fn distance_from(&self, other: &Location) -> f64 { + pub fn distance_from(&self, other: &Coordinates) -> f64 { haversine_dist_deg( self.latitude, self.longitude, @@ -29,6 +29,76 @@ impl Location { other.longitude, ) } + + pub fn has_same_city(&self, other: &Self) -> bool { + self.country_code == other.country_code && self.city_code == other.city_code + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Coordinates { + pub latitude: f64, + pub longitude: f64, +} + +impl From<&Location> for Coordinates { + fn from(location: &Location) -> Self { + Self { + latitude: location.latitude, + longitude: location.longitude, + } + } +} + +impl From<Location> for Coordinates { + fn from(location: Location) -> Self { + Coordinates::from(&location) + } +} + +impl Coordinates { + /// Computes the approximate midpoint of a set of locations. + /// + /// This works by calculating the mean Cartesian coordinates, and converting them + /// back to spherical coordinates. This is approximate, because the semi-minor (polar) + /// axis is assumed to equal the semi-major (equatorial) axis. + /// + /// https://en.wikipedia.org/wiki/Spherical_coordinate_system#Cartesian_coordinates + pub fn midpoint(locations: &[Location]) -> Self { + Self::midpoint_inner(locations.iter().map(Coordinates::from)) + } + + fn midpoint_inner(locations: impl std::iter::Iterator<Item = Coordinates>) -> Self { + let mut x = 0f64; + let mut y = 0f64; + let mut z = 0f64; + + let mut count = 0; + + for location in locations { + let cos_lat = location.latitude.to_radians().cos(); + let sin_lat = location.latitude.to_radians().sin(); + let cos_lon = location.longitude.to_radians().cos(); + let sin_lon = location.longitude.to_radians().sin(); + x += cos_lat * cos_lon; + y += cos_lat * sin_lon; + z += sin_lat; + count += 1; + } + let inv_total_weight = 1f64 / (count as f64); + x *= inv_total_weight; + y *= inv_total_weight; + z *= inv_total_weight; + + let longitude = y.atan2(x); + let hypotenuse = (x * x + y * y).sqrt(); + let latitude = z.atan2(hypotenuse); + + Coordinates { + latitude: latitude.to_degrees(), + longitude: longitude.to_degrees(), + } + } } /// Takes input as latitude and longitude degrees. @@ -110,6 +180,16 @@ impl From<AmIMullvad> for GeoIpLocation { #[cfg(test)] mod tests { + use super::Coordinates; + + impl Coordinates { + fn equal(&self, other: Coordinates) -> bool { + const EPS: f64 = 0.1; + (self.latitude - other.latitude).abs() < EPS + && (self.longitude - other.longitude).abs() < EPS + } + } + #[test] fn test_haversine_dist_deg() { use super::haversine_dist_deg; @@ -129,4 +209,43 @@ mod tests { 111.22634257109495 ); } + + #[test] + fn test_midpoint() { + assert!(Coordinates::midpoint_inner( + [ + Coordinates { + latitude: 0.0, + longitude: 90.0, + }, + Coordinates { + latitude: 90.0, + longitude: 0.0, + }, + ] + .into_iter() + ) + .equal(Coordinates { + latitude: 45.0, + longitude: 90.0, + })); + + assert!(Coordinates::midpoint_inner( + [ + Coordinates { + latitude: -20.0, + longitude: 90.0, + }, + Coordinates { + latitude: -20.0, + longitude: -90.0, + }, + ] + .into_iter() + ) + .equal(Coordinates { + latitude: -90.0, + longitude: 0.0, + })); + } } |
