summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-03-01 15:31:52 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-03-01 15:31:52 +0100
commit8e601e3145c558ab80a5e2f4310977f26e402183 (patch)
treeeeed77609088bbc08330066192428bf5bfd83a6c
parent710b361a8c209dc670f2d459eacd2d6ad3661156 (diff)
parent63b6585cd02693dedc4ce41598407131cbb9d93f (diff)
downloadmullvadvpn-8e601e3145c558ab80a5e2f4310977f26e402183.tar.xz
mullvadvpn-8e601e3145c558ab80a5e2f4310977f26e402183.zip
Merge branch 'proxy-api-reqs'
-rw-r--r--.github/workflows/android-app.yml1
-rw-r--r--.gitignore1
-rw-r--r--CHANGELOG.md2
-rw-r--r--Cargo.lock464
-rw-r--r--android/app/build.gradle.kts1
-rw-r--r--android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt2
-rwxr-xr-xbuild-apk.sh2
-rwxr-xr-xbuild.sh2
-rw-r--r--gui/tasks/distribution.js1
-rw-r--r--mullvad-daemon/src/api.rs54
-rw-r--r--mullvad-daemon/src/lib.rs158
-rw-r--r--mullvad-daemon/src/relays/mod.rs52
-rw-r--r--mullvad-daemon/src/relays/updater.rs3
-rw-r--r--mullvad-jni/src/lib.rs2
-rw-r--r--mullvad-problem-report/src/lib.rs87
-rw-r--r--mullvad-rpc/Cargo.toml2
-rw-r--r--mullvad-rpc/src/abortable_stream.rs10
-rw-r--r--mullvad-rpc/src/address_cache.rs299
-rw-r--r--mullvad-rpc/src/bin/address_cache.rs43
-rw-r--r--mullvad-rpc/src/bin/relay_list.rs16
-rw-r--r--mullvad-rpc/src/https_client_with_sni.rs189
-rw-r--r--mullvad-rpc/src/lib.rs92
-rw-r--r--mullvad-rpc/src/proxy.rs204
-rw-r--r--mullvad-rpc/src/rest.rs261
-rw-r--r--mullvad-rpc/src/tls_stream.rs16
-rw-r--r--mullvad-setup/src/main.rs16
-rw-r--r--mullvad-types/src/location.rs121
27 files changed, 1476 insertions, 625 deletions
diff --git a/.github/workflows/android-app.yml b/.github/workflows/android-app.yml
index d340431ddc..3bc2192146 100644
--- a/.github/workflows/android-app.yml
+++ b/.github/workflows/android-app.yml
@@ -95,7 +95,6 @@ jobs:
source env.sh $TARGET
cargo build --target $TARGET --verbose --package mullvad-jni
cargo run --bin relay_list > dist-assets/relays.json
- cargo run --bin address_cache > dist-assets/api-ip-address.txt
$NDK_TOOLCHAIN_STRIP_TOOL --strip-debug --strip-unneeded -o "$STRIPPED_LIB_PATH" "$UNSTRIPPED_LIB_PATH"
- name: Configure Android SDK
diff --git a/.gitignore b/.gitignore
index e41edf6b7c..b75c65b33e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,7 +11,6 @@
.DS_Store
*.log
/dist-assets/relays.json
-/dist-assets/api-ip-address.txt
/dist-assets/mullvad
/dist-assets/mullvad.exe
/dist-assets/mullvad-daemon
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 23cff810b4..1fe3d7173e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,8 @@ Line wrap the file at 100 chars. Th
## [Unreleased]
### Added
+- Obfuscate traffic to the Mullvad API using bridges if it cannot be reached directly.
+
#### Windows
- Detect mounting and dismounting of volumes, such as VeraCrypt volumes or USB drives,
and exclude paths from the tunnel correctly when these occur. This sometimes only works
diff --git a/Cargo.lock b/Cargo.lock
index 510a331371..c6d4a3c483 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -18,6 +18,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
+name = "aead"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
+name = "aes"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
+dependencies = [
+ "cfg-if 1.0.0",
+ "cipher",
+ "cpufeatures",
+ "ctr",
+ "opaque-debug",
+]
+
+[[package]]
+name = "aes-gcm"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6"
+dependencies = [
+ "aead",
+ "aes",
+ "cipher",
+ "ctr",
+ "ghash",
+ "subtle",
+]
+
+[[package]]
name = "aho-corasick"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -130,6 +166,12 @@ dependencies = [
]
[[package]]
+name = "base16ct"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
+
+[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -142,12 +184,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
+name = "block-buffer"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f1d36a02058e76b040de25a4464ba1c80935655595b661505c8b39b664828b95"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
name = "bumpalo"
version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538"
[[package]]
+name = "byte_string"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11aade7a05aa8c3a351cedc44c3fc45806430543382fcc4743a9b757a2a0b4ed"
+
+[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -184,6 +241,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
+name = "chacha20"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01b72a433d0cf2aef113ba70f62634c56fddb0f244e6377185c56a7cadbd8f91"
+dependencies = [
+ "cfg-if 1.0.0",
+ "cipher",
+ "cpufeatures",
+ "zeroize",
+]
+
+[[package]]
+name = "chacha20poly1305"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b84ed6d1d5f7aa9bdde921a5090e0ca4d934d250ea3b402a5fab3a994e28a2a"
+dependencies = [
+ "aead",
+ "chacha20",
+ "cipher",
+ "poly1305",
+ "zeroize",
+]
+
+[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -198,6 +280,15 @@ dependencies = [
]
[[package]]
+name = "cipher"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -262,6 +353,12 @@ dependencies = [
]
[[package]]
+name = "const-oid"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
+
+[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -284,6 +381,45 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b"
[[package]]
+name = "cpufeatures"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "crypto-bigint"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21"
+dependencies = [
+ "generic-array",
+ "rand_core 0.6.3",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "crypto-common"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
+name = "ctr"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea"
+dependencies = [
+ "cipher",
+]
+
+[[package]]
name = "ctrlc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -300,7 +436,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61"
dependencies = [
"byteorder",
- "digest",
+ "digest 0.9.0",
"rand_core 0.5.1",
"subtle",
"zeroize",
@@ -359,6 +495,15 @@ dependencies = [
]
[[package]]
+name = "der"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
+dependencies = [
+ "const-oid",
+]
+
+[[package]]
name = "derive-try-from-primitive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -417,6 +562,18 @@ dependencies = [
]
[[package]]
+name = "digest"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b"
+dependencies = [
+ "block-buffer",
+ "crypto-common",
+ "generic-array",
+ "subtle",
+]
+
+[[package]]
name = "dirs-next"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -450,12 +607,48 @@ dependencies = [
]
[[package]]
+name = "ecdsa"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d0d69ae62e0ce582d56380743515fefaf1a8c70cec685d9677636d7e30ae9dc9"
+dependencies = [
+ "der",
+ "elliptic-curve",
+ "signature",
+]
+
+[[package]]
+name = "ed25519"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816"
+dependencies = [
+ "signature",
+]
+
+[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
+name = "elliptic-curve"
+version = "0.11.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25b477563c2bfed38a3b7a60964c49e058b2510ad3f12ba3483fd8f62c2306d6"
+dependencies = [
+ "base16ct",
+ "crypto-bigint",
+ "der",
+ "generic-array",
+ "rand_core 0.6.3",
+ "sec1",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
name = "endian-type"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -766,6 +959,16 @@ dependencies = [
]
[[package]]
+name = "ghash"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99"
+dependencies = [
+ "opaque-debug",
+ "polyval",
+]
+
+[[package]]
name = "gimli"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -821,6 +1024,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
+name = "hkdf"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "158bc31e00a68e380286904cc598715f861f2b0ccf7aa6fe20c6d0c49ca5d0f6"
+dependencies = [
+ "hmac",
+]
+
+[[package]]
+name = "hmac"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ddca131f3e7f2ce2df364b57949a9d47915cfbd35e46cfee355ccebbf794d6a2"
+dependencies = [
+ "digest 0.10.1",
+]
+
+[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -849,7 +1070,7 @@ checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b"
dependencies = [
"bytes",
"fnv",
- "itoa",
+ "itoa 0.4.8",
]
[[package]]
@@ -896,7 +1117,7 @@ dependencies = [
"http-body",
"httparse",
"httpdate",
- "itoa",
+ "itoa 0.4.8",
"pin-project-lite",
"socket2",
"tokio",
@@ -1055,6 +1276,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
+name = "itoa"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
+
+[[package]]
name = "jni"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1199,6 +1426,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
+name = "md-5"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6a38fc55c8bbc10058782919516f88826e70320db6d206aebc49611d24216ae"
+dependencies = [
+ "digest 0.10.1",
+ "md5-asm",
+]
+
+[[package]]
+name = "md5-asm"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73b9a6f25ec11ea27e22d7fc8beafda909da44ece95f63e94f1eeb23d19bb5c7"
+dependencies = [
+ "cc",
+]
+
+[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1477,6 +1723,7 @@ dependencies = [
"rustls-pemfile",
"serde",
"serde_json",
+ "shadowsocks",
"talpid-types",
"tokio",
"tokio-rustls",
@@ -1752,6 +1999,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
+name = "opaque-debug"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
+
+[[package]]
name = "openssl-probe"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1788,6 +2041,28 @@ dependencies = [
]
[[package]]
+name = "p256"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "19736d80675fbe9fe33426268150b951a3fb8f5cfca2a23a17c85ef3adb24e3b"
+dependencies = [
+ "ecdsa",
+ "elliptic-curve",
+ "sec1",
+]
+
+[[package]]
+name = "p384"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "755d8266e41f57bd8562ed9b6e93cdcf73ead050e1e8c3a27ea3871b6643a20c"
+dependencies = [
+ "ecdsa",
+ "elliptic-curve",
+ "sec1",
+]
+
+[[package]]
name = "parity-tokio-ipc"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1977,6 +2252,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
+name = "poly1305"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
+dependencies = [
+ "cpufeatures",
+ "opaque-debug",
+ "universal-hash",
+]
+
+[[package]]
+name = "polyval"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1"
+dependencies = [
+ "cfg-if 1.0.0",
+ "cpufeatures",
+ "opaque-debug",
+ "universal-hash",
+]
+
+[[package]]
name = "ppv-lite86"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2277,6 +2575,24 @@ dependencies = [
]
[[package]]
+name = "ring-compat"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "80f9cf4a178de62d388e6502dae2101b37c1becf65227bf1210e6cf12dc633a3"
+dependencies = [
+ "aead",
+ "digest 0.9.0",
+ "ecdsa",
+ "ed25519",
+ "generic-array",
+ "opaque-debug",
+ "p256",
+ "p384",
+ "ring",
+ "zeroize",
+]
+
+[[package]]
name = "rs-release"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2393,6 +2709,18 @@ dependencies = [
]
[[package]]
+name = "sec1"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08da66b8b0965a5555b6bd6639e68ccba85e1e2506f5fbb089e93f8a04e1a2d1"
+dependencies = [
+ "der",
+ "generic-array",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
name = "security-framework"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2434,6 +2762,16 @@ dependencies = [
]
[[package]]
+name = "sendfd"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fa25200c6de90f8da82d63f8806bd2ea1261018620dd4881626d6b146e13bd7"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
+[[package]]
name = "serde"
version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2471,12 +2809,93 @@ version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
dependencies = [
- "itoa",
+ "itoa 0.4.8",
+ "ryu",
+ "serde",
+]
+
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa 1.0.1",
"ryu",
"serde",
]
[[package]]
+name = "sha1"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04cc229fb94bcb689ffc39bd4ded842f6ff76885efede7c6d1ffb62582878bea"
+dependencies = [
+ "cfg-if 1.0.0",
+ "cpufeatures",
+ "digest 0.10.1",
+ "sha1-asm",
+]
+
+[[package]]
+name = "sha1-asm"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "563d4f7100bc3fce234e5f37bbf63dc2752558964505ba6ac3f7204bdc59eaac"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "shadowsocks"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "356770455d9fb911a6b559ceedaa7f2d34e47d2d8ae606041dffba390abb7522"
+dependencies = [
+ "async-trait",
+ "base64",
+ "byte_string",
+ "bytes",
+ "cfg-if 1.0.0",
+ "futures",
+ "libc",
+ "log",
+ "nix 0.23.1",
+ "once_cell",
+ "pin-project",
+ "sendfd",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "shadowsocks-crypto",
+ "socket2",
+ "thiserror",
+ "tokio",
+ "tokio-tfo",
+ "url",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "shadowsocks-crypto"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "748be76f2786bcf817d86c08c6cc2245e436e3ae0b064b581e3d4fc81700360c"
+dependencies = [
+ "aes",
+ "aes-gcm",
+ "cfg-if 1.0.0",
+ "chacha20",
+ "chacha20poly1305",
+ "hkdf",
+ "md-5",
+ "rand 0.8.4",
+ "ring-compat",
+ "sha1",
+]
+
+[[package]]
name = "shared_child"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2502,6 +2921,15 @@ dependencies = [
]
[[package]]
+name = "signature"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02658e48d89f2bec991f9a78e69cfa4c316f8d6a6c4ec12fae1aeb263d486788"
+dependencies = [
+ "rand_core 0.6.3",
+]
+
+[[package]]
name = "simple-signal"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2869,6 +3297,7 @@ dependencies = [
"mio 0.7.13",
"num_cpus",
"once_cell",
+ "parking_lot 0.11.2",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
@@ -2919,6 +3348,23 @@ dependencies = [
]
[[package]]
+name = "tokio-tfo"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4279aec5ded232170bf39130dd0e0deaed2c9f31cd3b5db1f2021056bcf5f94a"
+dependencies = [
+ "cfg-if 1.0.0",
+ "futures",
+ "libc",
+ "log",
+ "once_cell",
+ "pin-project",
+ "socket2",
+ "tokio",
+ "winapi 0.3.9",
+]
+
+[[package]]
name = "tokio-util"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3246,6 +3692,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
+name = "universal-hash"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
+dependencies = [
+ "generic-array",
+ "subtle",
+]
+
+[[package]]
name = "unreachable"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/android/app/build.gradle.kts b/android/app/build.gradle.kts
index 7fa36a9986..dd7d97d1ec 100644
--- a/android/app/build.gradle.kts
+++ b/android/app/build.gradle.kts
@@ -128,7 +128,6 @@ tasks.withType<KotlinCompile>().all {
tasks.register("copyExtraAssets", Copy::class) {
from("$repoRootPath/dist-assets")
include("relays.json")
- include("api-ip-address.txt")
into(extraAssetsDirectory)
}
diff --git a/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt b/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt
index 23b127addf..8b900038fc 100644
--- a/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt
+++ b/android/app/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt
@@ -11,7 +11,6 @@ import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.sendBlocking
import net.mullvad.mullvadvpn.util.Intermittent
-private const val API_IP_ADDRESS_FILE = "api-ip-address.txt"
private const val RELAYS_FILE = "relays.json"
class DaemonInstance(val vpnService: MullvadVpnService) {
@@ -88,7 +87,6 @@ class DaemonInstance(val vpnService: MullvadVpnService) {
lastUpdatedTime() > File(vpnService.filesDir, RELAYS_FILE).lastModified()
FileResourceExtractor(vpnService).apply {
- extract(API_IP_ADDRESS_FILE, false)
extract(RELAYS_FILE, shouldOverwriteRelayList)
}
}
diff --git a/build-apk.sh b/build-apk.sh
index c5a1fea9d1..8cc4e6ce76 100755
--- a/build-apk.sh
+++ b/build-apk.sh
@@ -128,8 +128,6 @@ done
echo "Updating relays.json..."
cargo run --bin relay_list $CARGO_ARGS > dist-assets/relays.json
-echo "Updating api-ip-address..."
-cargo run --bin address_cache $CARGO_ARGS > dist-assets/api-ip-address.txt
cd "$SCRIPT_DIR/android"
$GRADLE_CMD --console plain "$GRADLE_TASK"
diff --git a/build.sh b/build.sh
index 3c01b83735..22f154b90a 100755
--- a/build.sh
+++ b/build.sh
@@ -361,8 +361,6 @@ fi
log_info "Updating relays.json..."
cargo run --bin relay_list "${CARGO_ARGS[@]}" > dist-assets/relays.json
-log_info "Updating api-ip-address..."
-cargo run --bin address_cache "${CARGO_ARGS[@]}" > dist-assets/api-ip-address.txt
log_header "Installing JavaScript dependencies"
diff --git a/gui/tasks/distribution.js b/gui/tasks/distribution.js
index f0eb41de67..dcdde80841 100644
--- a/gui/tasks/distribution.js
+++ b/gui/tasks/distribution.js
@@ -20,7 +20,6 @@ const config = {
extraResources: [
{ from: distAssets('ca.crt'), to: '.' },
{ from: distAssets('relays.json'), to: '.' },
- { from: distAssets('api-ip-address.txt'), to: '.' },
{ from: root('CHANGELOG.md'), to: '.' },
],
diff --git a/mullvad-daemon/src/api.rs b/mullvad-daemon/src/api.rs
new file mode 100644
index 0000000000..2ed689b418
--- /dev/null
+++ b/mullvad-daemon/src/api.rs
@@ -0,0 +1,54 @@
+use crate::DaemonEventSender;
+use futures::{channel::oneshot, stream, Stream, StreamExt};
+use mullvad_rpc::proxy::ApiConnectionMode;
+use talpid_core::mpsc::Sender;
+use talpid_types::ErrorExt;
+
+pub(crate) struct ApiConnectionModeRequest {
+ pub response_tx: oneshot::Sender<ApiConnectionMode>,
+ pub retry_attempt: u32,
+}
+
+/// Returns a stream that returns the next API bridge to try.
+/// `initial_config` refers to the first config returned by the stream. The daemon is not notified
+/// of this.
+pub(crate) fn create_api_config_provider(
+ daemon_sender: DaemonEventSender<ApiConnectionModeRequest>,
+ initial_config: ApiConnectionMode,
+) -> impl Stream<Item = ApiConnectionMode> + Unpin {
+ struct Context {
+ attempt: u32,
+ daemon_sender: DaemonEventSender<ApiConnectionModeRequest>,
+ }
+
+ let ctx = Context {
+ attempt: 1,
+ daemon_sender,
+ };
+
+ Box::pin(
+ stream::once(async move { initial_config }).chain(stream::unfold(
+ ctx,
+ |mut ctx| async move {
+ ctx.attempt = ctx.attempt.wrapping_add(1);
+ let (response_tx, response_rx) = oneshot::channel();
+
+ let _ = ctx.daemon_sender.send(ApiConnectionModeRequest {
+ response_tx,
+ retry_attempt: ctx.attempt,
+ });
+
+ let new_config = response_rx.await.unwrap_or_else(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to receive API proxy config")
+ );
+ // Fall back on unbridged connection
+ ApiConnectionMode::Direct
+ });
+
+ Some((new_config, ctx))
+ },
+ )),
+ )
+}
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index a62979b63f..42d988f41f 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -6,6 +6,7 @@ extern crate serde;
mod account;
pub mod account_history;
+mod api;
pub mod exception_logging;
#[cfg(target_os = "macos")]
pub mod exclusion_gid;
@@ -29,11 +30,14 @@ use futures::{
future::{abortable, AbortHandle, Future},
StreamExt,
};
-use mullvad_rpc::availability::ApiAvailabilityHandle;
+use mullvad_rpc::{
+ availability::ApiAvailabilityHandle,
+ proxy::{ApiConnectionMode, ProxyConfig},
+};
use mullvad_types::{
account::{AccountData, AccountToken, VoucherSubmission},
endpoint::MullvadEndpoint,
- location::GeoIpLocation,
+ location::{Coordinates, GeoIpLocation},
relay_constraints::{
BridgeSettings, BridgeState, Constraint, InternalBridgeConstraints, RelaySettings,
RelaySettingsUpdate,
@@ -54,7 +58,7 @@ use std::{collections::HashSet, ffi::OsString};
use std::{
marker::PhantomData,
mem,
- net::{IpAddr, Ipv4Addr},
+ net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
pin::Pin,
sync::{mpsc as sync_mpsc, Arc, Weak},
@@ -70,8 +74,8 @@ use talpid_core::{
use talpid_types::android::AndroidContext;
use talpid_types::{
net::{
- openvpn, AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters,
- TunnelType,
+ openvpn::{self, ProxySettings},
+ AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, TunnelType,
},
tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition},
ErrorExt,
@@ -327,6 +331,8 @@ pub(crate) enum InternalDaemonEvent {
NewAccountEvent(AccountToken, oneshot::Sender<Result<String, Error>>),
/// The background job fetching new `AppVersionInfo`s got a new info object.
NewAppVersionInfo(AppVersionInfo),
+ /// Request from REST client to use a different API endpoint.
+ GenerateApiConnectionMode(api::ApiConnectionModeRequest),
/// The split tunnel paths or state were updated.
#[cfg(target_os = "windows")]
ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>),
@@ -356,6 +362,12 @@ impl From<AppVersionInfo> for InternalDaemonEvent {
}
}
+impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent {
+ fn from(request: api::ApiConnectionModeRequest) -> Self {
+ InternalDaemonEvent::GenerateApiConnectionMode(request)
+ }
+}
+
#[derive(Clone, Debug, Eq, PartialEq)]
enum DaemonExecutionState {
Running,
@@ -546,6 +558,7 @@ pub struct Daemon<L: EventListener> {
app_version_info: Option<AppVersionInfo>,
shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
tunnel_state_machine_handle: tunnel_state_machine::JoinHandle,
+ cache_dir: PathBuf,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
}
@@ -569,6 +582,8 @@ where
exclusion_gid::set_exclusion_gid().map_err(Error::GroupIdError)?
};
+ mullvad_rpc::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
+
let runtime = tokio::runtime::Handle::current();
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
@@ -606,8 +621,7 @@ where
vec![]
};
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
- Some(&resource_dir),
+ let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
&cache_dir,
true,
#[cfg(target_os = "android")]
@@ -620,7 +634,7 @@ where
api_availability.suspend();
let initial_api_endpoint =
- Self::get_allowed_endpoint(rpc_runtime.address_cache.peek_address());
+ Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address().await);
let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
#[cfg(target_os = "windows")]
@@ -650,25 +664,37 @@ where
.await
.map_err(Error::TunnelError)?;
- let address_change_runtime = runtime.clone();
- let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx);
- rpc_runtime.set_address_change_listener(move |address| {
- let (result_tx, result_rx) = oneshot::channel();
- let tx = tunnel_cmd_weak_tx.clone();
- address_change_runtime.block_on(async move {
- if let Some(tx) = tx.upgrade() {
- let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint(
- Self::get_allowed_endpoint(address),
+ let api_endpoint_tunnel_tx = Arc::downgrade(&tunnel_command_tx);
+ let api_endpoint_handler = move |address: SocketAddr| {
+ let tunnel_tx = api_endpoint_tunnel_tx.clone();
+ async move {
+ let (result_tx, result_rx) = oneshot::channel();
+ if let Some(tunnel_tx) = tunnel_tx.upgrade() {
+ let _ = tunnel_tx.unbounded_send(TunnelCommand::AllowEndpoint(
+ Self::get_allowed_endpoint(address.clone()),
result_tx,
));
- result_rx.await.map_err(|_| ())
+ if result_rx.await.is_ok() {
+ log::debug!("API endpoint: {}", address);
+ true
+ } else {
+ log::error!("Failed to update allowed endpoint");
+ false
+ }
} else {
- Err(())
+ log::error!("Tunnel state machine is down");
+ false
}
- })
- });
+ }
+ };
- let rpc_handle = rpc_runtime.mullvad_rest_handle();
+ let proxy_provider = api::create_api_config_provider(
+ internal_event_tx.to_specialized_sender(),
+ ApiConnectionMode::Direct,
+ );
+ let rpc_handle = rpc_runtime
+ .mullvad_rest_handle(proxy_provider, api_endpoint_handler)
+ .await;
Self::forward_offline_state(api_availability.clone(), offline_state_rx).await;
@@ -741,6 +767,7 @@ where
app_version_info,
shutdown_tasks: vec![],
tunnel_state_machine_handle,
+ cache_dir,
#[cfg(target_os = "windows")]
volume_update_tx,
};
@@ -903,6 +930,9 @@ where
NewAppVersionInfo(app_version_info) => {
self.handle_new_app_version_info(app_version_info)
}
+ GenerateApiConnectionMode(request) => {
+ self.handle_generate_api_connection_mode(request).await
+ }
#[cfg(windows)]
ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await,
}
@@ -1073,7 +1103,7 @@ where
BridgeState::On => {
let (bridge_settings, bridge_relay) = self
.relay_selector
- .get_proxy_settings(&bridge_constraints, location)
+ .get_proxy_settings(&bridge_constraints, Some(location))
.ok_or(Error::NoBridgeAvailable)?;
self.last_generated_bridge_relay = Some(bridge_relay);
Some(bridge_settings)
@@ -1082,7 +1112,7 @@ where
if let Some((bridge_settings, bridge_relay)) =
self.relay_selector.get_auto_proxy_settings(
&bridge_constraints,
- location,
+ Some(location),
retry_attempt,
)
{
@@ -1349,6 +1379,78 @@ where
self.event_listener.notify_app_version(app_version_info);
}
+ /// Returns the next API connection mode to use for reaching the API.
+ ///
+ /// When `mullvad-rpc` fails to contact the API, it requests a new connection mode
+ /// from this function, which will be used for future requests. The API can be
+ /// connected to either directly (i.e., [`ApiConnectionMode::Direct`]) or from
+ /// a bridge ([`ApiConnectionMode::Proxied`]).
+ ///
+ /// * Every 3rd attempt returns [`ApiConnectionMode::Direct`] (i.e., no bridge).
+ /// * For any other attempt, this function returns a configuration for the bridge that is
+ /// closest to the selected relay location[^note] and matches all bridge constraints.
+ /// * When no matching bridge is found, e.g. if the selected hosting providers don't match any
+ /// bridge, [`ApiConnectionMode::Direct`] is returned.
+ ///
+ /// [^note]: The "selected relay location" is the location of the last relay that
+ /// the daemon connected to, or, if no relay was connected to, the "midpoint" of
+ /// all relays that match the selected relay location constraint.
+ async fn handle_generate_api_connection_mode(
+ &mut self,
+ request: api::ApiConnectionModeRequest,
+ ) {
+ let location = self
+ .last_generated_entry_relay
+ .as_ref()
+ .or(self.last_generated_relay.as_ref())
+ .and_then(|relay| relay.location.as_ref().map(Coordinates::from))
+ .or_else(|| {
+ if let RelaySettings::Normal(settings) = self.settings.get_relay_settings() {
+ self.relay_selector.get_relay_midpoint(&settings)
+ } else {
+ None
+ }
+ });
+ let bridge = if request.retry_attempt % 3 > 0 {
+ let constraints = match &self.settings.bridge_settings {
+ BridgeSettings::Normal(settings) => InternalBridgeConstraints {
+ location: settings.location.clone(),
+ providers: settings.providers.clone(),
+ transport_protocol: Constraint::Only(TransportProtocol::Tcp),
+ },
+ _ => InternalBridgeConstraints {
+ location: Constraint::Any,
+ providers: Constraint::Any,
+ transport_protocol: Constraint::Only(TransportProtocol::Tcp),
+ },
+ };
+ self.relay_selector
+ .get_proxy_settings(&constraints, location)
+ } else {
+ None
+ };
+ let config = match bridge {
+ Some((settings, _relay)) => match settings {
+ ProxySettings::Shadowsocks(ss_settings) => {
+ ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings))
+ }
+ _ => {
+ log::error!("Received unexpected proxy settings type");
+ ApiConnectionMode::Direct
+ }
+ },
+ None => ApiConnectionMode::Direct,
+ };
+
+ if let Err(error) = config.save(&self.cache_dir).await {
+ log::debug!(
+ "{}",
+ error.display_chain_with_msg("Failed to save API endpoint")
+ );
+ }
+ let _ = request.response_tx.send(config);
+ }
+
#[cfg(windows)]
async fn handle_new_excluded_paths(
&mut self,
@@ -1407,7 +1509,7 @@ where
match &self.tunnel_state {
Disconnected => {
- let location = self.get_geo_location();
+ let location = self.get_geo_location().await;
tokio::spawn(async {
Self::oneshot_send(tx, location.await.ok(), "current location");
});
@@ -1420,7 +1522,7 @@ where
}
Connected { location, .. } => {
let relay_location = location.clone();
- let location_future = self.get_geo_location();
+ let location_future = self.get_geo_location().await;
tokio::spawn(async {
let location = location_future.await;
Self::oneshot_send(
@@ -1441,8 +1543,8 @@ where
}
}
- fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
- let rpc_service = self.rpc_runtime.rest_handle();
+ async fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
+ let rpc_service = self.rpc_runtime.rest_handle().await;
async {
geoip::send_location_request(rpc_service)
.await
diff --git a/mullvad-daemon/src/relays/mod.rs b/mullvad-daemon/src/relays/mod.rs
index 4f96a6ddcd..332ca5fea3 100644
--- a/mullvad-daemon/src/relays/mod.rs
+++ b/mullvad-daemon/src/relays/mod.rs
@@ -6,7 +6,7 @@ use ipnetwork::IpNetwork;
use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle};
use mullvad_types::{
endpoint::{MullvadEndpoint, MullvadWireguardEndpoint},
- location::Location,
+ location::{Coordinates, Location},
relay_constraints::{
BridgeState, Constraint, InternalBridgeConstraints, LocationConstraint, Match,
OpenVpnConstraints, Providers, RelayConstraints, Set, TransportPort, WireguardConstraints,
@@ -302,6 +302,34 @@ impl RelaySelector {
}
}
+ /// Returns the average location of relays that match the given constraints.
+ /// This returns none if the location is `any` or if no relays match the constraints.
+ pub fn get_relay_midpoint(&self, relay_constraints: &RelayConstraints) -> Option<Coordinates> {
+ if relay_constraints.location.is_any() {
+ return None;
+ }
+
+ let matcher = RelayMatcher::from(relay_constraints.clone());
+ let mut matching_locations: Vec<Location> = self
+ .parsed_relays
+ .lock()
+ .relays()
+ .iter()
+ .filter(|relay| relay.active)
+ .filter_map(|relay| {
+ matcher
+ .filter_matching_relay(relay)
+ .and_then(|relay| relay.location)
+ })
+ .collect();
+ matching_locations.dedup_by(|a, b| a.has_same_city(b));
+
+ if matching_locations.is_empty() {
+ return None;
+ }
+ Some(Coordinates::midpoint(&matching_locations))
+ }
+
/// Returns an OpenVpn endpoint, should only ever be used when the user has specified the tunnel
/// protocol as only OpenVPN.
fn get_openvpn_endpoint(
@@ -635,10 +663,10 @@ impl RelaySelector {
entry_endpoint.exit_peer = Some(exit_peer.clone());
}
- pub fn get_auto_proxy_settings(
+ pub fn get_auto_proxy_settings<T: Into<Coordinates>>(
&mut self,
bridge_constraints: &InternalBridgeConstraints,
- location: &Location,
+ location: Option<T>,
retry_attempt: u32,
) -> Option<(ProxySettings, Relay)> {
if !self.should_use_bridge(retry_attempt) {
@@ -663,10 +691,10 @@ impl RelaySelector {
(retry_attempt % 4) < 2
}
- pub fn get_proxy_settings(
+ pub fn get_proxy_settings<T: Into<Coordinates>>(
&mut self,
constraints: &InternalBridgeConstraints,
- location: &Location,
+ location: Option<T>,
) -> Option<(ProxySettings, Relay)> {
let mut matching_relays: Vec<Relay> = self
.parsed_relays
@@ -681,10 +709,16 @@ impl RelaySelector {
return None;
}
- matching_relays.sort_by_cached_key(|relay| {
- (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64
- });
- matching_relays.get(0).and_then(|relay| {
+ let relay = if let Some(location) = location {
+ let location = location.into();
+ matching_relays.sort_by_cached_key(|relay| {
+ (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64
+ });
+ matching_relays.get(0)
+ } else {
+ self.pick_random_relay(&matching_relays)
+ };
+ relay.and_then(|relay| {
self.pick_random_bridge(&relay)
.map(|bridge| (bridge, relay.clone()))
})
diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs
index 0c06beb43f..381e5bd54c 100644
--- a/mullvad-daemon/src/relays/updater.rs
+++ b/mullvad-daemon/src/relays/updater.rs
@@ -58,7 +58,6 @@ impl RelayListUpdater {
api_availability: ApiAvailabilityHandle,
) -> RelayListUpdaterHandle {
let (tx, cmd_rx) = mpsc::channel(1);
- let service = rpc_handle.service();
let rpc_client = RelayListProxy::new(rpc_handle);
let updater = RelayListUpdater {
rpc_client,
@@ -69,7 +68,7 @@ impl RelayListUpdater {
api_availability,
};
- service.spawn(updater.run(cmd_rx));
+ tokio::spawn(updater.run(cmd_rx));
RelayListUpdaterHandle { tx }
}
diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs
index 72c75e7360..646988e11b 100644
--- a/mullvad-jni/src/lib.rs
+++ b/mullvad-jni/src/lib.rs
@@ -940,7 +940,7 @@ pub extern "system" fn Java_net_mullvad_mullvadvpn_service_MullvadDaemon_updateR
fn log_request_error(request: &str, error: &daemon_interface::Error) {
match error {
- daemon_interface::Error::RpcError(RestError::Aborted(_)) => {
+ daemon_interface::Error::RpcError(RestError::Aborted) => {
log::debug!("Request to {} cancelled", request);
}
error => {
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index 48b80dd1ff..4b9def6adb 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -1,6 +1,7 @@
#![deny(rust_2018_idioms)]
use lazy_static::lazy_static;
+use mullvad_rpc::proxy::ApiConnectionMode;
use regex::Regex;
use std::{
borrow::Cow,
@@ -270,50 +271,70 @@ pub fn send_problem_report(
}
})?,
);
- let metadata =
- ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(Error::CreateRuntime)?;
+ runtime.block_on(send_problem_report_inner(
+ user_email,
+ user_message,
+ &report_content,
+ cache_dir,
+ ))
+}
- let mut rpc_manager = runtime
- .block_on(mullvad_rpc::MullvadRpcRuntime::with_cache(
- None,
- cache_dir,
- false,
- #[cfg(target_os = "android")]
- None,
- ))
- .map_err(Error::CreateRpcClientError)?;
- let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle());
+async fn send_problem_report_inner(
+ user_email: &str,
+ user_message: &str,
+ report_content: &str,
+ cache_dir: &Path,
+) -> Result<(), Error> {
+ let metadata =
+ ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
+ let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
+ cache_dir,
+ false,
+ #[cfg(target_os = "android")]
+ None,
+ )
+ .await
+ .map_err(Error::CreateRpcClientError)?;
- runtime.block_on(async move {
- for _attempt in 0..MAX_SEND_ATTEMPTS {
- match rpc_client
- .problem_report(user_email, user_message, &report_content, &metadata)
- .await
- {
- Ok(()) => {
- return Ok(());
- }
- Err(error) => {
- if !error.is_network_error() {
- return Err(Error::SendProblemReportError(error));
- }
- log::error!(
- "{}",
- error.display_chain_with_msg(
- "Failed to send problem report due to network error"
- )
- );
+ let rpc_client = mullvad_rpc::ProblemReportProxy::new(
+ rpc_runtime
+ .mullvad_rest_handle(
+ ApiConnectionMode::try_from_cache(cache_dir)
+ .await
+ .into_repeat(),
+ |_| async { true },
+ )
+ .await,
+ );
+
+ for _attempt in 0..MAX_SEND_ATTEMPTS {
+ match rpc_client
+ .problem_report(user_email, user_message, &report_content, &metadata)
+ .await
+ {
+ Ok(()) => {
+ return Ok(());
+ }
+ Err(error) => {
+ if !error.is_network_error() {
+ return Err(Error::SendProblemReportError(error));
}
+ log::error!(
+ "{}",
+ error.display_chain_with_msg(
+ "Failed to send problem report due to network error"
+ )
+ );
}
}
- Err(Error::SendFailedTooManyTimes)
- })
+ }
+ Err(Error::SendFailedTooManyTimes)
}
fn write_problem_report(path: &Path, problem_report: &ProblemReport) -> io::Result<()> {
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml
index 6fdf3e3400..4b18512c75 100644
--- a/mullvad-rpc/Cargo.toml
+++ b/mullvad-rpc/Cargo.toml
@@ -34,5 +34,7 @@ lazy_static = "1.1.0"
mullvad-types = { path = "../mullvad-types" }
talpid-types = { path = "../talpid-types" }
+shadowsocks = { version = "1.12", default-features = false, features = ["stream-cipher"] }
+
[target.'cfg(target_os="macos")'.dependencies]
tokio-stream = { version = "0.1", features = ["io-util"] }
diff --git a/mullvad-rpc/src/abortable_stream.rs b/mullvad-rpc/src/abortable_stream.rs
index 160e329dfb..af217c5768 100644
--- a/mullvad-rpc/src/abortable_stream.rs
+++ b/mullvad-rpc/src/abortable_stream.rs
@@ -12,6 +12,10 @@ use std::{
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+#[derive(err_derive::Error, Debug)]
+#[error(display = "Stream is closed")]
+pub struct Aborted(());
+
#[derive(Clone, Debug)]
pub struct AbortableStreamHandle {
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
@@ -71,7 +75,7 @@ where
if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
- "stream is closed",
+ Aborted(()),
)));
}
Pin::new(&mut self.stream).poll_write(cx, buf)
@@ -81,7 +85,7 @@ where
if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
- "stream is closed",
+ Aborted(()),
)));
}
Pin::new(&mut self.stream).poll_flush(cx)
@@ -104,7 +108,7 @@ where
if let Poll::Ready(_) = Pin::new(&mut self.shutdown_rx).poll(cx) {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
- "stream is closed",
+ Aborted(()),
)));
}
Pin::new(&mut self.stream).poll_read(cx, buf)
diff --git a/mullvad-rpc/src/address_cache.rs b/mullvad-rpc/src/address_cache.rs
index f1048b7ede..3b6fcba074 100644
--- a/mullvad-rpc/src/address_cache.rs
+++ b/mullvad-rpc/src/address_cache.rs
@@ -1,16 +1,9 @@
use super::API;
-use rand::seq::SliceRandom;
-use std::{
- io,
- net::SocketAddr,
- ops::{Deref, DerefMut},
- path::Path,
- sync::{Arc, Mutex},
-};
-use talpid_types::ErrorExt;
+use std::{io, net::SocketAddr, path::Path, sync::Arc};
use tokio::{
fs,
- io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
+ io::{AsyncReadExt, AsyncWriteExt},
+ sync::Mutex,
};
#[derive(err_derive::Error, Debug)]
@@ -22,212 +15,78 @@ pub enum Error {
#[error(display = "Failed to read the address cache file")]
ReadAddressCache(#[error(source)] io::Error),
+ #[error(display = "Failed to parse the address cache file")]
+ ParseAddressCache,
+
#[error(display = "Failed to update the address cache file")]
WriteAddressCache(#[error(source)] io::Error),
#[error(display = "The address cache is empty")]
EmptyAddressCache,
-
- #[error(display = "The address change listener returned an error")]
- ChangeListenerError,
}
-pub type CurrentAddressChangeListener =
- dyn Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static;
-
#[derive(Clone)]
pub struct AddressCache {
inner: Arc<Mutex<AddressCacheInner>>,
write_path: Option<Arc<Path>>,
- change_listener: Arc<Box<CurrentAddressChangeListener>>,
}
impl AddressCache {
- /// Initialize cache using the given list, and write changes to `write_path`.
- pub fn new(addresses: Vec<SocketAddr>, write_path: Option<Box<Path>>) -> Result<Self, Error> {
- let mut cache = AddressCacheInner::from_addresses(addresses)?;
- cache.shuffle_tail();
- log::trace!("API address cache: {:?}", cache.addresses);
- log::debug!("Using API address: {:?}", Self::get_address_inner(&cache));
-
- let address_cache = Self {
- inner: Arc::new(Mutex::new(cache)),
- write_path: write_path.map(|cache| Arc::from(cache)),
- change_listener: Arc::new(Box::new(|_| Ok(()))),
- };
- Ok(address_cache)
+ /// Initialize cache using the hardcoded address, and write changes to `write_path`.
+ pub fn new(write_path: Option<Box<Path>>) -> Result<Self, Error> {
+ Self::new_inner(API.addr, write_path)
}
/// Initialize cache using `read_path`, and write changes to `write_path`.
pub async fn from_file(read_path: &Path, write_path: Option<Box<Path>>) -> Result<Self, Error> {
log::debug!("Loading API addresses from {}", read_path.display());
- Self::new(read_address_file(read_path).await?, write_path)
+ Self::new_inner(read_address_file(read_path).await?, write_path)
}
- pub fn set_change_listener(&mut self, change_listener: Arc<Box<CurrentAddressChangeListener>>) {
- self.change_listener = change_listener;
- }
+ fn new_inner(address: SocketAddr, write_path: Option<Box<Path>>) -> Result<Self, Error> {
+ let cache = AddressCacheInner::from_address(address);
+ log::debug!("Using API address: {}", cache.address);
- /// Returns the currently selected address.
- pub fn get_address(&self) -> SocketAddr {
- let mut inner = self.inner.lock().unwrap();
- inner.tried_current = true;
- Self::get_address_inner(&inner)
- }
-
- /// Returns the current address without registering it as "tried"
- /// in [Self::has_tried_current_address].
- pub fn peek_address(&self) -> SocketAddr {
- let inner = self.inner.lock().unwrap();
- Self::get_address_inner(&inner)
+ let address_cache = Self {
+ inner: Arc::new(Mutex::new(cache)),
+ write_path: write_path.map(|cache| Arc::from(cache)),
+ };
+ Ok(address_cache)
}
- fn get_address_inner(inner: &AddressCacheInner) -> SocketAddr {
- if inner.addresses.is_empty() {
- return API.addr;
+ /// Returns the address if the hostname equals `API.host`. Otherwise, returns `None`.
+ pub async fn resolve_hostname(&self, hostname: &str) -> Option<SocketAddr> {
+ if hostname.eq_ignore_ascii_case(&API.host) {
+ Some(self.get_address().await)
+ } else {
+ None
}
- *inner
- .addresses
- .get(inner.choice % inner.addresses.len())
- .unwrap_or(&API.addr)
}
- pub fn has_tried_current_address(&self) -> bool {
- let inner = self.inner.lock().unwrap();
- inner.tried_current
- }
-
- pub async fn select_new_address(&self) {
- {
- let mut inner = self.inner.lock().unwrap();
- let mut transaction = AddressCacheTransaction::new(&mut inner);
-
- transaction.choice = transaction.current.choice.wrapping_add(1);
- if transaction.choice == transaction.current.choice {
- return;
- }
- transaction.tried_current = false;
-
- tokio::task::block_in_place(move || {
- if (*self.change_listener)(Self::get_address_inner(&transaction)).is_err() {
- log::error!("Failed to select a new API endpoint");
- return;
- }
- transaction.commit();
- });
- }
-
- if let Err(error) = self.save_to_disk().await {
- log::error!("{}", error.display_chain());
- }
+ /// Returns the currently selected address.
+ pub async fn get_address(&self) -> SocketAddr {
+ self.inner.lock().await.address
}
- /// Forgets the currently selected address and randomizes
- /// the entire list.
- pub async fn randomize(&self) -> Result<(), Error> {
- {
- let mut inner = self.inner.lock().unwrap();
-
- let mut transaction = AddressCacheTransaction::new(&mut inner);
- transaction.shuffle();
- transaction.choice = 0;
-
- let current_address = Self::get_address_inner(&transaction.current);
- let new_address = Self::get_address_inner(&transaction);
-
- tokio::task::block_in_place(move || {
- if new_address != current_address {
- transaction.tried_current = false;
- if (*self.change_listener)(new_address).is_err() {
- return Err(Error::ChangeListenerError);
- }
- }
-
- transaction.commit();
- Ok(())
- })?;
- }
- self.save_to_disk().await.map_err(Error::WriteAddressCache)
- }
-
- pub async fn set_addresses(&self, mut addresses: Vec<SocketAddr>) -> io::Result<()> {
- let should_update = {
- let mut inner = self.inner.lock().unwrap();
- let mut transaction = AddressCacheTransaction::new(&mut inner);
-
- addresses.sort();
-
- let mut current_sorted = transaction.addresses.clone();
- current_sorted.sort();
-
- if addresses != current_sorted {
- let current_address = Self::get_address_inner(&transaction);
-
- transaction.addresses = addresses.clone();
- transaction.shuffle();
-
- // Prefer a likely-working address
- let choice = transaction
- .addresses
- .iter()
- .position(|&addr| addr == current_address);
- if let Some(choice) = choice {
- transaction.choice = choice;
- transaction.commit();
- } else {
- transaction.choice = 0;
- transaction.tried_current = false;
-
- tokio::task::block_in_place(move || {
- if (*self.change_listener)(Self::get_address_inner(&transaction)).is_err() {
- log::error!("Failed to select a new API endpoint");
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "callback returned an error",
- ));
- }
- transaction.commit();
- Ok(())
- })?;
- }
-
- true
- } else {
- false
- }
- };
- if should_update {
- log::trace!("API address cache: {:?}", addresses);
- self.save_to_disk().await?;
+ pub async fn set_address(&self, address: SocketAddr) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ if address != inner.address {
+ self.save_to_disk(&address).await?;
+ inner.address = address;
}
Ok(())
}
- async fn save_to_disk(&self) -> io::Result<()> {
+ async fn save_to_disk(&self, address: &SocketAddr) -> io::Result<()> {
let write_path = match self.write_path.as_ref() {
Some(write_path) => write_path,
None => return Ok(()),
};
- let (mut addresses, choice) = {
- let inner = self.inner.lock().unwrap();
- (inner.addresses.clone(), inner.choice)
- };
-
- // Place the current choice on top
- if !addresses.is_empty() {
- let addresses_len = addresses.len();
- addresses.swap(0, choice % addresses_len);
- }
-
let temp_path = write_path.with_file_name("api-cache.temp");
let mut file = fs::File::create(&temp_path).await?;
- let mut contents = addresses
- .iter()
- .map(ToString::to_string)
- .collect::<Vec<String>>()
- .join("\n");
+ let mut contents = address.to_string();
contents += "\n";
file.write_all(contents.as_bytes()).await?;
file.sync_data().await?;
@@ -236,96 +95,24 @@ impl AddressCache {
}
}
-impl crate::rest::AddressProvider for AddressCache {
- fn get_address(&self) -> String {
- self.get_address().to_string()
- }
-
- fn clone_box(&self) -> Box<dyn crate::rest::AddressProvider> {
- Box::new(self.clone())
- }
-}
-
#[derive(Clone, PartialEq, Eq)]
struct AddressCacheInner {
- addresses: Vec<SocketAddr>,
- choice: usize,
- tried_current: bool,
+ address: SocketAddr,
}
impl AddressCacheInner {
- fn from_addresses(addresses: Vec<SocketAddr>) -> Result<Self, Error> {
- if addresses.is_empty() {
- return Err(Error::EmptyAddressCache);
- }
- Ok(Self {
- addresses,
- choice: 0,
- tried_current: false,
- })
- }
-
- fn shuffle(&mut self) {
- let mut rng = rand::thread_rng();
- (&mut self.addresses[..]).shuffle(&mut rng);
- }
-
- /// Shuffle all but the first element
- fn shuffle_tail(&mut self) {
- let mut rng = rand::thread_rng();
- (&mut self.addresses[1..]).shuffle(&mut rng);
- }
-}
-
-struct AddressCacheTransaction<'a> {
- current: &'a mut AddressCacheInner,
- working_cache: AddressCacheInner,
-}
-
-impl<'a> AddressCacheTransaction<'a> {
- fn new(cache: &'a mut AddressCacheInner) -> Self {
- Self {
- working_cache: cache.clone(),
- current: cache,
- }
- }
-
- fn commit(self) {
- *self.current = self.working_cache;
+ fn from_address(address: SocketAddr) -> Self {
+ Self { address }
}
}
-impl<'a> Deref for AddressCacheTransaction<'a> {
- type Target = AddressCacheInner;
-
- fn deref(&self) -> &Self::Target {
- &self.working_cache
- }
-}
-
-impl<'a> DerefMut for AddressCacheTransaction<'a> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.working_cache
- }
-}
-
-async fn read_address_file(path: &Path) -> Result<Vec<SocketAddr>, Error> {
- let file = fs::File::open(path)
+async fn read_address_file(path: &Path) -> Result<SocketAddr, Error> {
+ let mut file = fs::File::open(path)
.await
.map_err(|error| Error::OpenAddressCache(error))?;
- let mut lines = BufReader::new(file).lines();
- let mut addresses = vec![];
- while let Some(line) = lines
- .next_line()
+ let mut address = String::new();
+ file.read_to_string(&mut address)
.await
- .map_err(|error| Error::ReadAddressCache(error))?
- {
- match line.trim().parse() {
- Ok(address) => addresses.push(address),
- Err(err) => {
- log::error!("Failed to parse cached address line: {}", err);
- }
- }
- }
- Ok(addresses)
+ .map_err(Error::ReadAddressCache)?;
+ address.trim().parse().map_err(|_| Error::ParseAddressCache)
}
diff --git a/mullvad-rpc/src/bin/address_cache.rs b/mullvad-rpc/src/bin/address_cache.rs
deleted file mode 100644
index 066ce5c910..0000000000
--- a/mullvad-rpc/src/bin/address_cache.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-//! Fetches and prints a list of IP addresses and ports where the Mullvad API
-//! can be reached.
-//! Used by the installer artifact packer to bundle the latest available list
-//! of API IPs.
-
-use mullvad_rpc::{rest::Error as RestError, ApiProxy, MullvadRpcRuntime};
-use std::process;
-use talpid_types::ErrorExt;
-
-#[tokio::main]
-async fn main() {
- let mut runtime =
- MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime");
-
- let api_proxy = ApiProxy::new(runtime.mullvad_rest_handle());
- let request = api_proxy.get_api_addrs().await;
-
- let api_list = match request {
- Ok(api_list) => api_list,
- Err(RestError::TimeoutError(_)) => {
- eprintln!("Request timed out");
- process::exit(2);
- }
- Err(e @ RestError::DeserializeError(_)) => {
- eprintln!(
- "{}",
- e.display_chain_with_msg("Failed to deserialize API address list")
- );
- process::exit(3);
- }
- Err(e) => {
- eprintln!(
- "{}",
- e.display_chain_with_msg("Failed to fetch API address list")
- );
- process::exit(1);
- }
- };
-
- for address in api_list {
- println!("{}", address);
- }
-}
diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs
index db7fc29854..66118d3ade 100644
--- a/mullvad-rpc/src/bin/relay_list.rs
+++ b/mullvad-rpc/src/bin/relay_list.rs
@@ -2,18 +2,24 @@
//! Used by the installer artifact packer to bundle the latest available
//! relay list at the time of creating the installer.
-use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy};
+use mullvad_rpc::{
+ proxy::ApiConnectionMode, rest::Error as RestError, MullvadRpcRuntime, RelayListProxy,
+};
use std::process;
use talpid_types::ErrorExt;
#[tokio::main]
async fn main() {
- let mut runtime =
+ let runtime =
MullvadRpcRuntime::new(tokio::runtime::Handle::current()).expect("Failed to load runtime");
- let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle())
- .relay_list(None)
- .await;
+ let relay_list_request = RelayListProxy::new(
+ runtime
+ .mullvad_rest_handle(ApiConnectionMode::Direct.into_repeat(), |_| async { true })
+ .await,
+ )
+ .relay_list(None)
+ .await;
let relay_list = match relay_list_request {
Ok(relay_list) => relay_list,
diff --git a/mullvad-rpc/src/https_client_with_sni.rs b/mullvad-rpc/src/https_client_with_sni.rs
index 4d8108f89b..238b191491 100644
--- a/mullvad-rpc/src/https_client_with_sni.rs
+++ b/mullvad-rpc/src/https_client_with_sni.rs
@@ -1,8 +1,10 @@
use crate::{
abortable_stream::{AbortableStream, AbortableStreamHandle},
+ proxy::{ApiConnection, ApiConnectionMode, ProxyConfig},
tls_stream::TlsStream,
+ AddressCache,
};
-use futures::{channel::mpsc, StreamExt};
+use futures::{channel::mpsc, future, StreamExt};
#[cfg(target_os = "android")]
use futures::{channel::oneshot, sink::SinkExt};
use http::uri::Scheme;
@@ -11,6 +13,13 @@ use hyper::{
service::Service,
Uri,
};
+use shadowsocks::{
+ config::ServerType,
+ context::{Context as SsContext, SharedContext},
+ crypto::v1::CipherKind,
+ relay::tcprelay::ProxyClientStream,
+ ServerConfig,
+};
#[cfg(target_os = "android")]
use std::os::unix::io::{AsRawFd, RawFd};
use std::{
@@ -24,22 +33,80 @@ use std::{
task::{Context, Poll},
time::Duration,
};
+use talpid_types::ErrorExt;
#[cfg(target_os = "android")]
use tokio::net::TcpSocket;
-use tokio::{net::TcpStream, runtime::Handle, time::timeout};
+use tokio::{net::TcpStream, time::timeout};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Clone)]
pub struct HttpsConnectorWithSniHandle {
- tx: mpsc::UnboundedSender<()>,
+ tx: mpsc::UnboundedSender<HttpsConnectorRequest>,
}
impl HttpsConnectorWithSniHandle {
/// Stop all streams produced by this connector
pub fn reset(&self) {
- let _ = self.tx.unbounded_send(());
+ let _ = self.tx.unbounded_send(HttpsConnectorRequest::Reset);
+ }
+
+ /// Change the proxy settings for the connector
+ pub fn set_connection_mode(&self, proxy: ApiConnectionMode) {
+ let _ = self
+ .tx
+ .unbounded_send(HttpsConnectorRequest::SetConnectionMode(proxy));
+ }
+}
+
+enum HttpsConnectorRequest {
+ Reset,
+ SetConnectionMode(ApiConnectionMode),
+}
+
+#[derive(Clone)]
+enum InnerConnectionMode {
+ /// Connect directly to the target.
+ Direct,
+ /// Connect to the destination via a proxy.
+ Proxied(ParsedShadowsocksConfig),
+}
+
+#[derive(Clone)]
+struct ParsedShadowsocksConfig {
+ peer: SocketAddr,
+ password: String,
+ cipher: CipherKind,
+}
+
+impl From<ParsedShadowsocksConfig> for ServerConfig {
+ fn from(config: ParsedShadowsocksConfig) -> Self {
+ ServerConfig::new(config.peer, config.password, config.cipher)
+ }
+}
+
+#[derive(err_derive::Error, Debug)]
+enum ProxyConfigError {
+ #[error(display = "Unrecognized cipher selected: {}", _0)]
+ InvalidCipher(String),
+}
+
+impl TryFrom<ApiConnectionMode> for InnerConnectionMode {
+ type Error = ProxyConfigError;
+
+ fn try_from(config: ApiConnectionMode) -> Result<Self, Self::Error> {
+ Ok(match config {
+ ApiConnectionMode::Direct => InnerConnectionMode::Direct,
+ ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(config)) => {
+ InnerConnectionMode::Proxied(ParsedShadowsocksConfig {
+ peer: config.peer,
+ password: config.password,
+ cipher: CipherKind::from_str(&config.cipher)
+ .map_err(|_| ProxyConfigError::InvalidCipher(config.cipher))?,
+ })
+ }
+ })
}
}
@@ -48,12 +115,16 @@ impl HttpsConnectorWithSniHandle {
pub struct HttpsConnectorWithSni {
inner: Arc<Mutex<HttpsConnectorWithSniInner>>,
sni_hostname: Option<String>,
+ address_cache: AddressCache,
+ abort_notify: Arc<tokio::sync::Notify>,
+ proxy_context: SharedContext,
#[cfg(target_os = "android")]
socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
}
struct HttpsConnectorWithSniInner {
stream_handles: Vec<AbortableStreamHandle>,
+ proxy_config: InnerConnectionMode,
}
#[cfg(target_os = "android")]
@@ -61,26 +132,47 @@ pub type SocketBypassRequest = (RawFd, oneshot::Sender<()>);
impl HttpsConnectorWithSni {
pub fn new(
- handle: Handle,
sni_hostname: Option<String>,
+ address_cache: AddressCache,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> (Self, HttpsConnectorWithSniHandle) {
- let (tx, mut rx): (_, mpsc::UnboundedReceiver<()>) = mpsc::unbounded();
+ let (tx, mut rx) = mpsc::unbounded();
+ let abort_notify = Arc::new(tokio::sync::Notify::new());
let inner = Arc::new(Mutex::new(HttpsConnectorWithSniInner {
stream_handles: vec![],
+ proxy_config: InnerConnectionMode::Direct,
}));
let inner_copy = inner.clone();
- handle.spawn(async move {
+ let notify = abort_notify.clone();
+ tokio::spawn(async move {
// Handle requests by `HttpsConnectorWithSniHandle`s
- while let Some(()) = rx.next().await {
+ while let Some(request) = rx.next().await {
let handles = {
let mut inner = inner_copy.lock().unwrap();
+
+ if let HttpsConnectorRequest::SetConnectionMode(config) = request {
+ match InnerConnectionMode::try_from(config) {
+ Ok(config) => {
+ inner.proxy_config = config;
+ }
+ Err(error) => {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg(
+ "Failed to parse new API proxy config"
+ )
+ );
+ }
+ }
+ }
+
std::mem::take(&mut inner.stream_handles)
};
for handle in handles {
handle.close();
}
+ notify.notify_waiters();
}
});
@@ -88,6 +180,9 @@ impl HttpsConnectorWithSni {
HttpsConnectorWithSni {
inner,
sni_hostname,
+ address_cache,
+ abort_notify,
+ proxy_context: SsContext::new_shared(ServerType::Local),
#[cfg(target_os = "android")]
socket_bypass_tx,
},
@@ -125,17 +220,24 @@ impl HttpsConnectorWithSni {
.map_err(|err| io::Error::new(io::ErrorKind::TimedOut, err))?
}
- async fn resolve_address(uri: &Uri) -> io::Result<SocketAddr> {
+ async fn resolve_address(address_cache: AddressCache, uri: Uri) -> io::Result<SocketAddr> {
let hostname = uri.host().ok_or(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid url, missing host",
))?;
let port = uri.port_u16().unwrap_or(443);
-
if let Some(addr) = hostname.parse::<IpAddr>().ok() {
return Ok(SocketAddr::new(addr, port));
}
+ // Preferentially, use cached address.
+ //
+ if let Some(addr) = address_cache.resolve_hostname(hostname).await {
+ return Ok(SocketAddr::new(addr.ip(), port));
+ }
+
+ // Use getaddrinfo as a fallback
+ //
let mut addrs = GaiResolver::new()
.call(
Name::from_str(&hostname)
@@ -157,7 +259,7 @@ impl fmt::Debug for HttpsConnectorWithSni {
}
impl Service<Uri> for HttpsConnectorWithSni {
- type Response = TlsStream<AbortableStream<TcpStream>>;
+ type Response = AbortableStream<ApiConnection>;
type Error = io::Error;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@@ -175,8 +277,11 @@ impl Service<Uri> for HttpsConnectorWithSni {
io::Error::new(io::ErrorKind::InvalidInput, "invalid url, missing host")
});
let inner = self.inner.clone();
+ let abort_notify = self.abort_notify.clone();
+ let proxy_context = self.proxy_context.clone();
#[cfg(target_os = "android")]
let socket_bypass_tx = self.socket_bypass_tx.clone();
+ let address_cache = self.address_cache.clone();
let fut = async move {
if uri.scheme() != Some(&Scheme::HTTPS) {
@@ -187,16 +292,62 @@ impl Service<Uri> for HttpsConnectorWithSni {
}
let hostname = sni_hostname?;
- let addr = Self::resolve_address(&uri).await?;
+ let addr = Self::resolve_address(address_cache, uri).await?;
- let tokio_connection = Self::open_socket(
- addr,
+ // Loop until we have established a connection. This starts over if a new endpoint
+ // is selected while connecting.
+ let stream = loop {
+ let config = { inner.lock().unwrap().proxy_config.clone() };
+ let hostname_copy = hostname.clone();
+ let addr_copy = addr.clone();
+ let context = proxy_context.clone();
#[cfg(target_os = "android")]
- socket_bypass_tx,
- )
- .await?;
+ let socket_bypass_tx_copy = socket_bypass_tx.clone();
+
+ let stream_fut: Pin<
+ Box<dyn Future<Output = Result<ApiConnection, io::Error>> + Send>,
+ > = Box::pin(async move {
+ match config {
+ InnerConnectionMode::Direct => {
+ let socket = Self::open_socket(
+ addr_copy,
+ #[cfg(target_os = "android")]
+ socket_bypass_tx_copy,
+ )
+ .await?;
+ let tls_stream =
+ TlsStream::connect_https(socket, &hostname_copy).await?;
+ Ok(ApiConnection::Direct(tls_stream))
+ }
+ InnerConnectionMode::Proxied(proxy_config) => {
+ let socket = Self::open_socket(
+ proxy_config.peer,
+ #[cfg(target_os = "android")]
+ socket_bypass_tx_copy,
+ )
+ .await?;
+ let proxy = ProxyClientStream::from_stream(
+ context,
+ socket,
+ &ServerConfig::from(proxy_config),
+ addr,
+ );
+ let tls_stream =
+ TlsStream::connect_https(proxy, &hostname_copy).await?;
+ Ok(ApiConnection::Proxied(tls_stream))
+ }
+ }
+ });
+
+ // Wait for connection. Abort and retry if we switched to a different server.
+ if let future::Either::Left((stream, _)) =
+ future::select(stream_fut, Box::pin(abort_notify.notified())).await
+ {
+ break stream?;
+ }
+ };
- let (tcp_stream, socket_handle) = AbortableStream::new(tokio_connection);
+ let (stream, socket_handle) = AbortableStream::new(stream);
{
let mut inner = inner.lock().unwrap();
@@ -204,7 +355,7 @@ impl Service<Uri> for HttpsConnectorWithSni {
inner.stream_handles.push(socket_handle);
}
- Ok(TlsStream::connect_https(tcp_stream, &hostname).await?)
+ Ok(stream)
};
Box::pin(fut)
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index faf20e00c9..cef6a73cc0 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -3,17 +3,18 @@
use chrono::{offset::Utc, DateTime};
#[cfg(target_os = "android")]
use futures::channel::mpsc;
+use futures::Stream;
use hyper::Method;
use mullvad_types::{
account::{AccountToken, VoucherSubmission},
version::AppVersion,
};
+use proxy::ApiConnectionMode;
use std::{
collections::BTreeMap,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
- sync::Arc,
};
use talpid_types::{net::wireguard, ErrorExt};
@@ -23,13 +24,14 @@ pub mod rest;
mod abortable_stream;
mod https_client_with_sni;
+pub mod proxy;
mod tls_stream;
#[cfg(target_os = "android")]
pub use crate::https_client_with_sni::SocketBypassRequest;
mod address_cache;
mod relay_list;
-pub use address_cache::{AddressCache, CurrentAddressChangeListener};
+pub use address_cache::AddressCache;
pub use hyper::StatusCode;
pub use relay_list::RelayListProxy;
@@ -150,7 +152,7 @@ impl MullvadRpcRuntime {
) -> Result<Self, Error> {
Ok(MullvadRpcRuntime {
handle,
- address_cache: AddressCache::new(vec![API.addr], None)?,
+ address_cache: AddressCache::new(None)?,
api_availability: ApiAvailability::new(availability::State::default()),
#[cfg(target_os = "android")]
socket_bypass_tx,
@@ -158,10 +160,8 @@ impl MullvadRpcRuntime {
}
/// Create a new `MullvadRpcRuntime` using the specified directories.
- /// Try to use the cache directory first, and fall back on the resource directory
- /// if it fails.
+ /// Try to use the cache directory first, and fall back on the bundled address otherwise.
pub async fn with_cache(
- resource_dir: Option<&Path>,
cache_dir: &Path,
write_changes: bool,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
@@ -185,26 +185,15 @@ impl MullvadRpcRuntime {
let address_cache = match AddressCache::from_file(&cache_file, write_file.clone()).await {
Ok(cache) => cache,
Err(error) => {
- let cache_exists = cache_file.exists();
- if cache_exists {
+ if cache_file.exists() {
log::error!(
"{}",
error.display_chain_with_msg(
- "Failed to load cached API addresses. Falling back on bundled list"
+ "Failed to load cached API addresses. Falling back on bundled address"
)
);
}
-
- // Initialize the cache directory cache using the resource directory
- match resource_dir {
- Some(resource_dir) => {
- let read_file = resource_dir.join(API_IP_CACHE_FILENAME);
- let cache = AddressCache::from_file(&read_file, write_file).await?;
- cache.randomize().await?;
- cache
- }
- None => return Err(Error::AddressCacheError(error)),
- }
+ AddressCache::new(write_file)?
}
};
@@ -217,45 +206,49 @@ impl MullvadRpcRuntime {
})
}
- pub fn set_address_change_listener(
- &mut self,
- address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static,
- ) {
- self.address_cache
- .set_change_listener(Arc::new(Box::new(address_change_listener)));
- }
-
/// Creates a new request service and returns a handle to it.
- fn new_request_service(
- &mut self,
+ async fn new_request_service<
+ T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static,
+ AcceptedNewEndpoint: Future<Output = bool> + Send + 'static,
+ >(
+ &self,
sni_hostname: Option<String>,
+ proxy_provider: T,
+ new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> rest::RequestServiceHandle {
- let service = rest::RequestService::new(
- self.handle.clone(),
+ let service_handle = rest::RequestService::new(
sni_hostname,
self.api_availability.handle(),
self.address_cache.clone(),
+ proxy_provider,
+ new_address_callback,
#[cfg(target_os = "android")]
socket_bypass_tx,
- );
- let handle = service.handle();
- self.handle.spawn(service.into_future());
- handle
+ )
+ .await;
+ service_handle
}
/// Returns a request factory initialized to create requests for the master API
- pub fn mullvad_rest_handle(&mut self) -> rest::MullvadRestHandle {
- let service = self.new_request_service(
- Some(API.host.clone()),
- #[cfg(target_os = "android")]
- self.socket_bypass_tx.clone(),
- );
- let factory = rest::RequestFactory::new(
- API.host.clone(),
- Box::new(self.address_cache.clone()),
- Some("app".to_owned()),
- );
+ pub async fn mullvad_rest_handle<
+ T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static,
+ AcceptedNewEndpoint: Future<Output = bool> + Send + 'static,
+ >(
+ &self,
+ proxy_provider: T,
+ new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static,
+ ) -> rest::MullvadRestHandle {
+ let service = self
+ .new_request_service(
+ Some(API.host.clone()),
+ proxy_provider,
+ new_address_callback,
+ #[cfg(target_os = "android")]
+ self.socket_bypass_tx.clone(),
+ )
+ .await;
+ let factory = rest::RequestFactory::new(API.host.clone(), Some("app".to_owned()));
rest::MullvadRestHandle::new(
service,
@@ -266,12 +259,15 @@ impl MullvadRpcRuntime {
}
/// Returns a new request service handle
- pub fn rest_handle(&mut self) -> rest::RequestServiceHandle {
+ pub async fn rest_handle(&mut self) -> rest::RequestServiceHandle {
self.new_request_service(
None,
+ ApiConnectionMode::Direct.into_repeat(),
+ |_| async { true },
#[cfg(target_os = "android")]
None,
)
+ .await
}
pub fn handle(&mut self) -> &mut tokio::runtime::Handle {
diff --git a/mullvad-rpc/src/proxy.rs b/mullvad-rpc/src/proxy.rs
new file mode 100644
index 0000000000..009a1960dc
--- /dev/null
+++ b/mullvad-rpc/src/proxy.rs
@@ -0,0 +1,204 @@
+use crate::tls_stream::TlsStream;
+use futures::Stream;
+use hyper::client::connect::{Connected, Connection};
+use rand::{distributions::Alphanumeric, Rng};
+use serde::{Deserialize, Serialize};
+use shadowsocks::relay::tcprelay::ProxyClientStream;
+use std::{
+ fmt, io,
+ net::SocketAddr,
+ path::Path,
+ pin::Pin,
+ task::{self, Poll},
+};
+use talpid_types::{net::openvpn::ShadowsocksProxySettings, ErrorExt};
+use tokio::{
+ fs,
+ io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
+ net::TcpStream,
+};
+
+const CURRENT_CONFIG_FILENAME: &str = "api-endpoint.json";
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub enum ApiConnectionMode {
+ /// Connect directly to the target.
+ Direct,
+ /// Connect to the destination via a proxy.
+ Proxied(ProxyConfig),
+}
+
+impl fmt::Display for ApiConnectionMode {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
+ match self {
+ ApiConnectionMode::Direct => write!(f, "unproxied"),
+ ApiConnectionMode::Proxied(settings) => settings.fmt(f),
+ }
+ }
+}
+
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub enum ProxyConfig {
+ Shadowsocks(ShadowsocksProxySettings),
+}
+
+impl fmt::Display for ProxyConfig {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
+ match self {
+ // TODO: Do not hardcode TCP
+ ProxyConfig::Shadowsocks(ss) => write!(f, "Shadowsocks {}/TCP", ss.peer),
+ }
+ }
+}
+
+impl ApiConnectionMode {
+ /// Reads the proxy config from `CURRENT_CONFIG_FILENAME`.
+ /// This returns `ApiConnectionMode::Direct` if reading from disk fails for any reason.
+ pub async fn try_from_cache(cache_dir: &Path) -> Self {
+ Self::from_cache(cache_dir).await.unwrap_or_else(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to read API endpoint cache")
+ );
+ ApiConnectionMode::Direct
+ })
+ }
+
+ /// Reads the proxy config from `CURRENT_CONFIG_FILENAME`.
+ /// If the file does not exist, this returns `Ok(ApiConnectionMode::Direct)`.
+ async fn from_cache(cache_dir: &Path) -> io::Result<Self> {
+ let path = cache_dir.join(CURRENT_CONFIG_FILENAME);
+ match fs::read_to_string(path).await {
+ Ok(s) => serde_json::from_str(&s).map_err(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg(&format!(
+ "Failed to deserialize \"{}\"",
+ CURRENT_CONFIG_FILENAME
+ ))
+ );
+ io::Error::new(io::ErrorKind::Other, "deserialization failed")
+ }),
+ Err(error) => {
+ if error.kind() == io::ErrorKind::NotFound {
+ Ok(ApiConnectionMode::Direct)
+ } else {
+ Err(error)
+ }
+ }
+ }
+ }
+
+ /// Stores this config to `CURRENT_CONFIG_FILENAME`.
+ /// The content is saved to a temporary file first, which ensures that
+ /// consumers of the file never end up with partial content.
+ pub async fn save(&self, cache_dir: &Path) -> io::Result<()> {
+ let path = cache_dir.join(CURRENT_CONFIG_FILENAME);
+ let mut temp_ext = String::from("temp");
+ temp_ext.push_str(
+ &rand::thread_rng()
+ .sample_iter(&Alphanumeric)
+ .take(5)
+ .map(char::from)
+ .collect::<String>(),
+ );
+ let temp_path = path.with_extension(temp_ext);
+
+ {
+ let mut file = fs::File::create(&temp_path).await?;
+ let json = serde_json::to_string_pretty(self)
+ .map_err(|_| io::Error::new(io::ErrorKind::Other, "serialization failed"))?;
+ file.write_all(json.as_bytes()).await?;
+ file.write_all(b"\n").await?;
+ file.sync_data().await?;
+ }
+
+ fs::rename(&temp_path, path).await
+ }
+
+ /// Attempts to remove `CURRENT_CONFIG_FILENAME`, if it exists.
+ pub async fn try_delete_cache(cache_dir: &Path) {
+ let path = cache_dir.join(CURRENT_CONFIG_FILENAME);
+ if let Err(err) = fs::remove_file(path).await {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ log::error!(
+ "{}",
+ err.display_chain_with_msg("Failed to remove old API config")
+ );
+ }
+ }
+ }
+
+ /// Returns the remote address, or `None` for `ApiConnectionMode::Direct`.
+ pub fn get_endpoint(&self) -> Option<SocketAddr> {
+ match self {
+ ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss)) => Some(ss.peer),
+ ApiConnectionMode::Direct => None,
+ }
+ }
+
+ pub fn is_proxy(&self) -> bool {
+ *self != ApiConnectionMode::Direct
+ }
+
+ /// Convenience function that returns a stream that repeats
+ /// this config forever.
+ pub fn into_repeat(self) -> impl Stream<Item = ApiConnectionMode> {
+ futures::stream::repeat(self)
+ }
+}
+
+/// Stream that is either a regular TLS stream or TLS via shadowsocks
+pub enum ApiConnection {
+ Direct(TlsStream<TcpStream>),
+ Proxied(TlsStream<ProxyClientStream<TcpStream>>),
+}
+
+impl AsyncRead for ApiConnection {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match Pin::get_mut(self) {
+ ApiConnection::Direct(s) => Pin::new(s).poll_read(cx, buf),
+ ApiConnection::Proxied(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for ApiConnection {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match Pin::get_mut(self) {
+ ApiConnection::Direct(s) => Pin::new(s).poll_write(cx, buf),
+ ApiConnection::Proxied(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ match Pin::get_mut(self) {
+ ApiConnection::Direct(s) => Pin::new(s).poll_flush(cx),
+ ApiConnection::Proxied(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ match Pin::get_mut(self) {
+ ApiConnection::Direct(s) => Pin::new(s).poll_shutdown(cx),
+ ApiConnection::Proxied(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
+
+impl Connection for ApiConnection {
+ fn connected(&self) -> Connected {
+ match self {
+ ApiConnection::Direct(s) => s.connected(),
+ ApiConnection::Proxied(s) => s.connected(),
+ }
+ }
+}
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 41b637f84d..8bb5efc72b 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -4,13 +4,13 @@ use crate::{
address_cache::AddressCache,
availability::ApiAvailabilityHandle,
https_client_with_sni::{HttpsConnectorWithSni, HttpsConnectorWithSniHandle},
+ proxy::ApiConnectionMode,
};
use futures::{
channel::{mpsc, oneshot},
- future::{abortable, AbortHandle, Aborted},
sink::SinkExt,
stream::StreamExt,
- TryFutureExt,
+ Stream, TryFutureExt,
};
use hyper::{
client::Client,
@@ -18,15 +18,12 @@ use hyper::{
Method, Uri,
};
use std::{
- collections::BTreeMap,
future::Future,
- mem,
- net::{IpAddr, SocketAddr},
+ net::SocketAddr,
str::FromStr,
time::{Duration, Instant},
};
use talpid_types::ErrorExt;
-use tokio::runtime::Handle;
pub use hyper::StatusCode;
@@ -45,7 +42,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Request cancelled")]
- Aborted(Aborted),
+ Aborted,
#[error(display = "Hyper error")]
HyperError(#[error(source)] hyper::Error),
@@ -84,114 +81,124 @@ impl Error {
_ => false,
}
}
+
+ /// Returns a new instance for which `abortable_stream::Aborted` is mapped to `Self::Aborted`.
+ fn map_aborted(self) -> Self {
+ if let Error::HyperError(error) = &self {
+ use std::error::Error;
+ let mut source = error.source();
+ while let Some(error) = source {
+ let io_error: Option<&std::io::Error> = error.downcast_ref();
+ if let Some(io_error) = io_error {
+ let abort_error: Option<&crate::abortable_stream::Aborted> =
+ io_error.get_ref().and_then(|inner| inner.downcast_ref());
+ if abort_error.is_some() {
+ return Self::Aborted;
+ }
+ }
+ source = error.source();
+ }
+ }
+ self
+ }
}
/// A service that executes HTTP requests, allowing for on-demand termination of all in-flight
/// requests
-pub(crate) struct RequestService {
+pub(crate) struct RequestService<
+ T: Stream<Item = ApiConnectionMode>,
+ F: Fn(SocketAddr) -> AcceptedNewEndpoint,
+ AcceptedNewEndpoint: Future<Output = bool>,
+> {
command_tx: mpsc::Sender<RequestCommand>,
command_rx: mpsc::Receiver<RequestCommand>,
connector_handle: HttpsConnectorWithSniHandle,
client: hyper::Client<HttpsConnectorWithSni, hyper::Body>,
- handle: Handle,
- next_id: u64,
- in_flight_requests: BTreeMap<u64, AbortHandle>,
- api_availability: ApiAvailabilityHandle,
+ proxy_config_provider: T,
+ new_address_callback: F,
address_cache: AddressCache,
+ api_availability: ApiAvailabilityHandle,
}
-impl RequestService {
+impl<
+ T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static,
+ F: (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static,
+ AcceptedNewEndpoint: Future<Output = bool> + Send + 'static,
+ > RequestService<T, F, AcceptedNewEndpoint>
+{
/// Constructs a new request service.
- pub fn new(
- handle: Handle,
+ pub async fn new(
sni_hostname: Option<String>,
api_availability: ApiAvailabilityHandle,
address_cache: AddressCache,
+ mut proxy_config_provider: T,
+ new_address_callback: F,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
- ) -> RequestService {
+ ) -> RequestServiceHandle {
let (connector, connector_handle) = HttpsConnectorWithSni::new(
- handle.clone(),
sni_hostname,
+ address_cache.clone(),
#[cfg(target_os = "android")]
socket_bypass_tx.clone(),
);
+ proxy_config_provider
+ .next()
+ .await
+ .map(|config| connector_handle.set_connection_mode(config));
+
let (command_tx, command_rx) = mpsc::channel(1);
let client = Client::builder().build(connector);
- Self {
+ let service = Self {
command_tx,
command_rx,
connector_handle,
client,
- handle,
- in_flight_requests: BTreeMap::new(),
- next_id: 0,
- api_availability,
+ proxy_config_provider,
+ new_address_callback,
address_cache,
- }
+ api_availability,
+ };
+ let handle = service.handle();
+ tokio::spawn(service.into_future());
+ handle
}
- /// Constructs a handle
- pub fn handle(&self) -> RequestServiceHandle {
+ fn handle(&self) -> RequestServiceHandle {
RequestServiceHandle {
tx: self.command_tx.clone(),
- handle: self.handle.clone(),
}
}
- fn process_command(&mut self, command: RequestCommand) {
+ async fn process_command(&mut self, command: RequestCommand) {
match command {
RequestCommand::NewRequest(request, completion_tx) => {
- let id = self.id();
let mut tx = self.command_tx.clone();
let timeout = request.timeout();
let hyper_request = request.into_request();
- let host_addr = get_request_socket_addr(&hyper_request);
let api_availability = self.api_availability.clone();
let suspend_fut = api_availability.wait_for_unsuspend();
let request_fut = self.client.request(hyper_request).map_err(Error::from);
- let (request_future, abort_handle) = abortable(async move {
+ let request_future = async move {
let _ = suspend_fut.await;
request_fut.await
- });
- let address_cache = self.address_cache.clone();
- let handle = self.handle.clone();
+ };
let future = async move {
- let response =
- tokio::time::timeout(timeout, request_future.map_err(Error::Aborted))
- .await
- .map_err(Error::TimeoutError);
+ let response = tokio::time::timeout(timeout, request_future)
+ .await
+ .map_err(Error::TimeoutError);
- let response = flatten_result(flatten_result(response));
- if let Some(host_addr) = host_addr {
- if let Err(err) = &response {
- if err.is_network_error() {
- log::error!(
- "{}",
- err.display_chain_with_msg("HTTP request failed")
- );
- if !api_availability.get_state().is_offline() {
- let current_address = address_cache.peek_address();
- if current_address == host_addr
- && address_cache.has_tried_current_address()
- {
- handle.spawn(async move {
- address_cache.select_new_address().await;
- let new_address = address_cache.peek_address();
- log::error!(
- "Request failed using address {}. Trying next API address: {}",
- current_address,
- new_address,
- );
- });
- }
- }
- }
+ let response = flatten_result(response).map_err(|error| error.map_aborted());
+
+ if let Err(err) = &response {
+ if err.is_network_error() && !api_availability.get_state().is_offline() {
+ log::error!("{}", err.display_chain_with_msg("HTTP request failed"));
+ let _ = tx.send(RequestCommand::NextApiConfig).await;
}
}
@@ -200,72 +207,46 @@ impl RequestService {
"Failed to send response to caller, caller channel is shut down"
);
}
- let _ = tx.send(RequestCommand::RequestFinished(id)).await;
};
-
- self.in_flight_requests.insert(id, abort_handle);
- self.handle.spawn(future);
+ tokio::spawn(future);
}
- RequestCommand::RequestFinished(id) => {
- self.in_flight_requests.remove(&id);
+ RequestCommand::Reset => {
+ self.connector_handle.reset();
}
- RequestCommand::Reset(tx) => {
- self.reset();
- let _ = tx.send(());
+ RequestCommand::NextApiConfig => {
+ if let Some(new_config) = self.proxy_config_provider.next().await {
+ let endpoint = match new_config.get_endpoint() {
+ Some(endpoint) => endpoint,
+ None => self.address_cache.get_address().await,
+ };
+ // Switch to new connection mode unless rejected by address change callback
+ if (self.new_address_callback)(endpoint).await {
+ self.connector_handle.set_connection_mode(new_config);
+ }
+ }
}
}
}
- fn reset(&mut self) {
- let old_requests = mem::take(&mut self.in_flight_requests);
- for (_, abort_handle) in old_requests {
- abort_handle.abort();
- }
-
- self.connector_handle.reset();
- }
-
- fn id(&mut self) -> u64 {
- let id = self.next_id;
- self.next_id = id.wrapping_add(1);
- id
- }
-
- pub async fn into_future(mut self) {
+ async fn into_future(mut self) {
while let Some(command) = self.command_rx.next().await {
- self.process_command(command);
+ self.process_command(command).await;
}
- self.reset();
+ self.connector_handle.reset();
}
}
-fn get_request_socket_addr(request: &Request) -> Option<SocketAddr> {
- let uri = request.uri();
- let port = uri
- .port_u16()
- // Assuming HTTPS always
- .unwrap_or(443);
-
- let host_addr = uri.host().and_then(|host| host.parse::<IpAddr>().ok())?;
-
- Some(SocketAddr::new(host_addr, port))
-}
-
#[derive(Clone)]
/// A handle to interact with a spawned `RequestService`.
pub struct RequestServiceHandle {
tx: mpsc::Sender<RequestCommand>,
- handle: Handle,
}
impl RequestServiceHandle {
/// Resets the corresponding RequestService, dropping all in-flight requests.
pub async fn reset(&self) {
let mut tx = self.tx.clone();
- let (done_tx, done_rx) = oneshot::channel();
-
- let _ = tx.send(RequestCommand::Reset(done_tx)).await;
- let _ = done_rx.await;
+ let _ = tx.send(RequestCommand::Reset).await;
}
/// Submits a `RestRequest` for exectuion to the request service.
@@ -278,11 +259,6 @@ impl RequestServiceHandle {
completion_rx.await.map_err(|_| Error::ReceiveError)?
}
-
- /// Spawns a future on the RPC runtime.
- pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) {
- let _ = self.handle.spawn(future);
- }
}
#[derive(Debug)]
@@ -291,8 +267,8 @@ pub(crate) enum RequestCommand {
RestRequest,
oneshot::Sender<std::result::Result<Response, Error>>,
),
- RequestFinished(u64),
- Reset(oneshot::Sender<()>),
+ Reset,
+ NextApiConfig,
}
/// A REST request that is sent to the RequestService to be executed.
@@ -392,20 +368,14 @@ pub struct ErrorResponse {
#[derive(Clone)]
pub struct RequestFactory {
hostname: String,
- address_provider: Box<dyn AddressProvider>,
path_prefix: Option<String>,
pub timeout: Duration,
}
impl RequestFactory {
- pub fn new(
- hostname: String,
- address_provider: Box<dyn AddressProvider>,
- path_prefix: Option<String>,
- ) -> Self {
+ pub fn new(hostname: String, path_prefix: Option<String>) -> Self {
Self {
hostname,
- address_provider,
path_prefix,
timeout: DEFAULT_TIMEOUT,
}
@@ -466,9 +436,8 @@ impl RequestFactory {
}
fn get_uri(&self, path: &str) -> Result<Uri> {
- let host = self.address_provider.get_address();
let prefix = self.path_prefix.as_ref().map(AsRef::as_ref).unwrap_or("");
- let uri = format!("https://{}/{}{}", host, prefix, path);
+ let uri = format!("https://{}/{}{}", self.hostname, prefix, path);
hyper::Uri::from_str(&uri).map_err(Error::UriError)
}
@@ -478,29 +447,6 @@ impl RequestFactory {
}
}
-pub trait AddressProvider: Send + Sync {
- /// Must return a string that represents either a host or a host with port
- fn get_address(&self) -> String;
- fn clone_box(&self) -> Box<dyn AddressProvider>;
-}
-
-impl Clone for Box<dyn AddressProvider> {
- fn clone(&self) -> Self {
- self.clone_box()
- }
-}
-
-impl AddressProvider for IpAddr {
- /// Must return a string that represents either a host or a host with port
- fn get_address(&self) -> String {
- self.to_string()
- }
-
- fn clone_box(&self) -> Box<dyn AddressProvider> {
- Box::new(*self)
- }
-}
-
pub fn get_request<T: serde::de::DeserializeOwned>(
factory: &RequestFactory,
service: RequestServiceHandle,
@@ -632,7 +578,7 @@ impl MullvadRestHandle {
let handle = self.clone();
let availability = self.availability.clone();
- self.service.spawn(async move {
+ tokio::spawn(async move {
// always start the fetch after 15 minutes
let api_proxy = crate::ApiProxy::new(handle);
let mut next_check = Instant::now() + API_IP_CHECK_DELAY;
@@ -652,14 +598,29 @@ impl MullvadRestHandle {
}
match api_proxy.clone().get_api_addrs().await {
Ok(new_addrs) => {
- log::debug!("Fetched new API addresses {:?}, will fetch again in {} hours", new_addrs, API_IP_CHECK_INTERVAL.as_secs() / ( 60 * 60 ));
- if let Err(err) = address_cache.set_addresses(new_addrs).await {
- log::error!("Failed to save newly updated API addresses: {}", err);
+ if let Some(addr) = new_addrs.get(0) {
+ log::debug!(
+ "Fetched new API address {:?}. Fetching again in {} hours",
+ addr,
+ API_IP_CHECK_INTERVAL.as_secs() / (60 * 60)
+ );
+ if let Err(err) = address_cache.set_address(*addr).await {
+ log::error!(
+ "Failed to save newly updated API address: {}",
+ err
+ );
+ }
+ } else {
+ log::error!("API returned no API addresses");
}
next_check = next_regular_check();
}
Err(err) => {
- log::error!("Failed to fetch new API addresses: {}, will retry again in {} seconds", err, API_IP_CHECK_ERROR_INTERVAL.as_secs());
+ log::error!(
+ "Failed to fetch new API addresses: {}. Retrying in {} seconds",
+ err,
+ API_IP_CHECK_ERROR_INTERVAL.as_secs()
+ );
next_check = next_error_check();
}
}
diff --git a/mullvad-rpc/src/tls_stream.rs b/mullvad-rpc/src/tls_stream.rs
index 232bb39b92..cad0268ac3 100644
--- a/mullvad-rpc/src/tls_stream.rs
+++ b/mullvad-rpc/src/tls_stream.rs
@@ -16,7 +16,7 @@ use tokio_rustls::{
const LE_ROOT_CERT: &[u8] = include_bytes!("../le_root_cert.pem");
pub struct TlsStream<S: AsyncRead + AsyncWrite + Unpin> {
- stream: Pin<Box<tokio_rustls::client::TlsStream<S>>>,
+ stream: tokio_rustls::client::TlsStream<S>,
}
impl<S> TlsStream<S>
@@ -49,11 +49,9 @@ where
}
};
- let tls_stream = connector.connect(host, stream).await?;
+ let stream = connector.connect(host, stream).await?;
- Ok(TlsStream {
- stream: Box::pin(tls_stream),
- })
+ Ok(TlsStream { stream })
}
}
@@ -79,7 +77,7 @@ where
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- self.stream.as_mut().poll_read(cx, buf)
+ Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
@@ -92,15 +90,15 @@ where
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- self.stream.as_mut().poll_write(cx, buf)
+ Pin::new(&mut self.stream).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
- self.stream.as_mut().poll_flush(cx)
+ Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
- self.stream.as_mut().poll_shutdown(cx)
+ Pin::new(&mut self.stream).poll_shutdown(cx)
}
}
diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs
index 947f9bd9ca..e65b1278f8 100644
--- a/mullvad-setup/src/main.rs
+++ b/mullvad-setup/src/main.rs
@@ -1,6 +1,6 @@
use clap::{crate_authors, crate_description, crate_name, App};
use mullvad_management_interface::new_rpc_client;
-use mullvad_rpc::MullvadRpcRuntime;
+use mullvad_rpc::{proxy::ApiConnectionMode, MullvadRpcRuntime};
use mullvad_types::version::ParsedAppVersion;
use std::{path::PathBuf, process, time::Duration};
use talpid_core::{
@@ -165,11 +165,19 @@ async fn remove_wireguard_key() -> Result<(), Error> {
if let Some(token) = settings.get_account_token() {
if let Some(wg_data) = settings.get_wireguard() {
- let mut rpc_runtime = MullvadRpcRuntime::with_cache(None, &cache_path, false)
+ let rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false)
.await
.map_err(Error::RpcInitializationError)?;
- let mut key_proxy =
- mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle());
+ let mut key_proxy = mullvad_rpc::WireguardKeyProxy::new(
+ rpc_runtime
+ .mullvad_rest_handle(
+ ApiConnectionMode::try_from_cache(&cache_path)
+ .await
+ .into_repeat(),
+ |_| async { true },
+ )
+ .await,
+ );
retry_future_n(
move || {
key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key())
diff --git a/mullvad-types/src/location.rs b/mullvad-types/src/location.rs
index b9c460fa87..5a962dafb1 100644
--- a/mullvad-types/src/location.rs
+++ b/mullvad-types/src/location.rs
@@ -21,7 +21,7 @@ pub struct Location {
const RAIDUS_OF_EARTH: f64 = 6372.8;
impl Location {
- pub fn distance_from(&self, other: &Location) -> f64 {
+ pub fn distance_from(&self, other: &Coordinates) -> f64 {
haversine_dist_deg(
self.latitude,
self.longitude,
@@ -29,6 +29,76 @@ impl Location {
other.longitude,
)
}
+
+ pub fn has_same_city(&self, other: &Self) -> bool {
+ self.country_code == other.country_code && self.city_code == other.city_code
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Coordinates {
+ pub latitude: f64,
+ pub longitude: f64,
+}
+
+impl From<&Location> for Coordinates {
+ fn from(location: &Location) -> Self {
+ Self {
+ latitude: location.latitude,
+ longitude: location.longitude,
+ }
+ }
+}
+
+impl From<Location> for Coordinates {
+ fn from(location: Location) -> Self {
+ Coordinates::from(&location)
+ }
+}
+
+impl Coordinates {
+ /// Computes the approximate midpoint of a set of locations.
+ ///
+ /// This works by calculating the mean Cartesian coordinates, and converting them
+ /// back to spherical coordinates. This is approximate, because the semi-minor (polar)
+ /// axis is assumed to equal the semi-major (equatorial) axis.
+ ///
+ /// https://en.wikipedia.org/wiki/Spherical_coordinate_system#Cartesian_coordinates
+ pub fn midpoint(locations: &[Location]) -> Self {
+ Self::midpoint_inner(locations.iter().map(Coordinates::from))
+ }
+
+ fn midpoint_inner(locations: impl std::iter::Iterator<Item = Coordinates>) -> Self {
+ let mut x = 0f64;
+ let mut y = 0f64;
+ let mut z = 0f64;
+
+ let mut count = 0;
+
+ for location in locations {
+ let cos_lat = location.latitude.to_radians().cos();
+ let sin_lat = location.latitude.to_radians().sin();
+ let cos_lon = location.longitude.to_radians().cos();
+ let sin_lon = location.longitude.to_radians().sin();
+ x += cos_lat * cos_lon;
+ y += cos_lat * sin_lon;
+ z += sin_lat;
+ count += 1;
+ }
+ let inv_total_weight = 1f64 / (count as f64);
+ x *= inv_total_weight;
+ y *= inv_total_weight;
+ z *= inv_total_weight;
+
+ let longitude = y.atan2(x);
+ let hypotenuse = (x * x + y * y).sqrt();
+ let latitude = z.atan2(hypotenuse);
+
+ Coordinates {
+ latitude: latitude.to_degrees(),
+ longitude: longitude.to_degrees(),
+ }
+ }
}
/// Takes input as latitude and longitude degrees.
@@ -110,6 +180,16 @@ impl From<AmIMullvad> for GeoIpLocation {
#[cfg(test)]
mod tests {
+ use super::Coordinates;
+
+ impl Coordinates {
+ fn equal(&self, other: Coordinates) -> bool {
+ const EPS: f64 = 0.1;
+ (self.latitude - other.latitude).abs() < EPS
+ && (self.longitude - other.longitude).abs() < EPS
+ }
+ }
+
#[test]
fn test_haversine_dist_deg() {
use super::haversine_dist_deg;
@@ -129,4 +209,43 @@ mod tests {
111.22634257109495
);
}
+
+ #[test]
+ fn test_midpoint() {
+ assert!(Coordinates::midpoint_inner(
+ [
+ Coordinates {
+ latitude: 0.0,
+ longitude: 90.0,
+ },
+ Coordinates {
+ latitude: 90.0,
+ longitude: 0.0,
+ },
+ ]
+ .into_iter()
+ )
+ .equal(Coordinates {
+ latitude: 45.0,
+ longitude: 90.0,
+ }));
+
+ assert!(Coordinates::midpoint_inner(
+ [
+ Coordinates {
+ latitude: -20.0,
+ longitude: 90.0,
+ },
+ Coordinates {
+ latitude: -20.0,
+ longitude: -90.0,
+ },
+ ]
+ .into_iter()
+ )
+ .equal(Coordinates {
+ latitude: -90.0,
+ longitude: 0.0,
+ }));
+ }
}