diff options
| -rw-r--r-- | Cargo.lock | 172 | ||||
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | mullvad_daemon/Cargo.toml | 14 | ||||
| -rw-r--r-- | mullvad_daemon/src/ipc_api.rs | 90 | ||||
| -rw-r--r-- | mullvad_daemon/src/main.rs | 325 | ||||
| -rw-r--r-- | mullvad_daemon/src/management_interface.rs | 301 | ||||
| -rw-r--r-- | mullvad_daemon/src/states.rs | 21 | ||||
| -rw-r--r-- | talpid_core/Cargo.toml | 10 | ||||
| -rw-r--r-- | talpid_core/src/lib.rs | 2 | ||||
| -rw-r--r-- | talpid_core/src/process/mod.rs | 7 | ||||
| -rw-r--r-- | talpid_core/src/process/monitor.rs | 113 | ||||
| -rw-r--r-- | talpid_core/src/process/openvpn.rs | 2 | ||||
| -rw-r--r-- | talpid_core/src/process/unix.rs | 33 | ||||
| -rw-r--r-- | talpid_core/src/tunnel/mod.rs | 70 | ||||
| -rw-r--r-- | talpid_core/src/tunnel/openvpn.rs | 218 | ||||
| -rw-r--r-- | talpid_ipc/Cargo.toml | 6 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 31 |
17 files changed, 932 insertions, 485 deletions
diff --git a/Cargo.lock b/Cargo.lock index 9e86a409c1..c960739c60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,26 +18,11 @@ dependencies = [ ] [[package]] -name = "ansi_term" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "assert_matches" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "atty" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "backtrace" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -66,11 +51,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "bitflags" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "byteorder" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -90,21 +70,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "clap" -version = "2.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "atty 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "bitflags 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", - "strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "term_size 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "unicode-segmentation 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "vec_map 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "dbghelp-sys" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -120,13 +85,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "duct" -version = "0.8.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", "os_pipe 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "shared_child 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -140,14 +105,6 @@ dependencies = [ [[package]] name = "error-chain" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "error-chain" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ @@ -213,7 +170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#e5da3f069fb14f9a90278f09a1841f9149af541b" dependencies = [ "futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -225,19 +182,19 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#e5da3f069fb14f9a90278f09a1841f9149af541b" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-pubsub" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#e5da3f069fb14f9a90278f09a1841f9149af541b" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -245,10 +202,10 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#e5da3f069fb14f9a90278f09a1841f9149af541b" dependencies = [ "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -257,10 +214,10 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#e5da3f069fb14f9a90278f09a1841f9149af541b" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)", ] @@ -275,6 +232,11 @@ dependencies = [ ] [[package]] +name = "lazy_static" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "lazycell" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -341,14 +303,17 @@ dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "talpid_core 0.0.0", "talpid_ipc 0.1.0", + "uuid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -511,7 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "shared_child" -version = "0.2.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -535,11 +500,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "strsim" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "syn" version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -558,25 +518,16 @@ dependencies = [ ] [[package]] -name = "talpid_cli" -version = "0.0.0" -dependencies = [ - "clap 2.23.2 (registry+https://github.com/rust-lang/crates.io-index)", - "env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "talpid_core 0.0.0", -] - -[[package]] name = "talpid_core" version = "0.0.0" dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", + "duct 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "openvpn_ffi 0.1.0", "talpid_ipc 0.1.0", @@ -589,9 +540,9 @@ dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -600,16 +551,6 @@ dependencies = [ ] [[package]] -name = "term_size" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "thread-id" version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -666,16 +607,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "unicode-segmentation" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "unicode-width" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "unicode-xid" version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -703,9 +634,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "vec_map" -version = "0.7.0" +name = "uuid" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] [[package]] name = "void" @@ -749,23 +683,18 @@ dependencies = [ [metadata] "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" -"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e772942dccdf11b368c31e044e4fca9189f80a773d2f0808379de65894cbf57" -"checksum atty 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d912da0db7fa85514874458ca3651fe2cddace8d0b0505571dbdcd41ab490159" "checksum backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f551bc2ddd53aea015d453ef0b635af89444afa5ed2405dd0b2062ad5d600d80" "checksum backtrace-sys 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d192fd129132fbc97497c1f2ec2c2c5174e376b95f535199ef4fe0a293d33842" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" -"checksum bitflags 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1370e9fc2a6ae53aea8b7a5110edbd08836ed87c88736dfabccade1c2b44bff4" "checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8" "checksum bytes 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3941933da81d8717b427c2ddc2d73567cd15adb6c57514a2726d9ee598a5439a" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" -"checksum clap 2.23.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf1114886d7cde2d6448517161d7db8d681a9a1c09f7d210f0b0864e48195f6" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" "checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" -"checksum duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e45aa15fe0a8a8f511e6d834626afd55e49b62e5c8802e18328a87e8a8f6065c" +"checksum duct 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f3154a9285e24d7c3aba0dca9a13adf2ba6160cce3490b157c8b37a0f80e85" "checksum env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e3856f1697098606fc6cb97a93de88ca3f3bc35bb878c725920e6e82ecf05e83" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" -"checksum error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6930e04918388a9a2e41d518c25cf679ccafe26733fb4127dbf21993f2575d46" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "55f0008e13fc853f79ea8fc86e931486860d4c4c156cdffb59fa5f7fa833660a" "checksum gcc 0.3.45 (registry+https://github.com/rust-lang/crates.io-index)" = "40899336fb50db0c78710f53e87afc54d8c7266fb76262fecc78ca1a7f09deae" @@ -774,12 +703,13 @@ dependencies = [ "checksum idna 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac85ec3f80c8e4e99d9325521337e14ec7555c458a14e377d189659a427f375" "checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be" "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" -"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" +"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3b37545ab726dd833ec6420aaba8231c5b320814b9029ad585555d2a03e94fbf" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" "checksum libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "88ee81885f9f04bff991e306fea7c1c60a5f0f9e409e99f6b40e3311a3363135" @@ -806,27 +736,23 @@ dependencies = [ "checksum serde_derive_internals 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "021c338d22c7e30f957a6ab7e388cb6098499dda9fd4ba1661ee074ca7a180d1" "checksum serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "48b04779552e92037212c3615370f6bd57a40ebba7f20e554ff9f55e41a69a7b" "checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" -"checksum shared_child 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "099b38928dbe4a0a01fcd8c233183072f14a7d126a34bed05880869be66e14cc" +"checksum shared_child 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bcd5e483b3475af9bc2a35311c2f3bbf0bd98fde91410ab15a0d4ba3c3127b4e" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8266519bc1d17d0b5b16f6c21295625d562841c708f6376f49028a43e9c11e" "checksum stable_deref_trait 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "15132e0e364248108c5e2c02e3ab539be8d6f5d52a01ca9bbf27ed657316f02b" -"checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" "checksum syn 0.11.10 (registry+https://github.com/rust-lang/crates.io-index)" = "171b739972d9a1bfb169e8077238b51f9ebeaae4ff6e08072f7ba386a8802da2" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" -"checksum term_size 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2b6b55df3198cc93372e85dd2ed817f0e38ce8cc0f22eb32391bfad9c4bf209" "checksum thread-id 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4437c97558c70d129e40629a5b385b3fb1ffac301e63941335e4d354081ec14a" "checksum thread_local 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c85048c6260d17cf486ceae3282d9fb6b90be220bf5b28c400f5485ffc29f0c7" "checksum tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "99e958104a67877907c1454386d5482fe8e965a55d60be834a15a44328e7dc76" "checksum tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "48f55df1341bb92281f229a6030bc2abffde2c7a44c6d6b802b7687dd8be0775" "checksum unicode-bidi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d3a078ebdd62c0e71a709c3d53d2af693fe09fe93fbff8344aebe289b78f9032" "checksum unicode-normalization 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e28fa37426fceeb5cf8f41ee273faa7c82c47dc8fba5853402841e665fcd86ff" -"checksum unicode-segmentation 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "18127285758f0e2c6cf325bb3f3d138a12fee27de4f23e146cd6a179f26c2cf3" -"checksum unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3a113775714a22dcb774d8ea3655c53a32debae63a063acc00a91cc586245f" "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unreachable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1f2ae5ddb18e1c92664717616dd9549dde73f539f01bd7b77c2edb2446bdff91" "checksum url 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f5ba8a749fb4479b043733416c244fa9d1d3af3d7c23804944651c8a448cb87e" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" -"checksum vec_map 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8cdc8b93bd0198ed872357fb2e667f7125646b1762f16d60b2c96350d361897" +"checksum uuid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b5d0f5103675a280a926ec2f9b7bcc2ef49367df54e8c570c3311fec919f9a8b" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/Cargo.toml b/Cargo.toml index ec561b5566..ebdd8443ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["mullvad_daemon", "talpid_openvpn_plugin", "openvpn_ffi", "talpid_cli", "talpid_ipc"] +members = ["mullvad_daemon", "talpid_openvpn_plugin", "openvpn_ffi", "talpid_ipc"] diff --git a/mullvad_daemon/Cargo.toml b/mullvad_daemon/Cargo.toml index 9982a1bbaa..ce2ffafa98 100644 --- a/mullvad_daemon/Cargo.toml +++ b/mullvad_daemon/Cargo.toml @@ -10,13 +10,15 @@ serde = "1.0" serde_derive = "1.0" log = "0.3" env_logger = "0.4" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +uuid = { version = "0.5", features = ["v4"] } +lazy_static = "0.2" -[dependencies.talpid_ipc] -path = "../talpid_ipc" +talpid_core = { path = "../talpid_core" } +talpid_ipc = { path = "../talpid_ipc" } [dev-dependencies] assert_matches = "1.0" diff --git a/mullvad_daemon/src/ipc_api.rs b/mullvad_daemon/src/ipc_api.rs deleted file mode 100644 index 84932a0b8f..0000000000 --- a/mullvad_daemon/src/ipc_api.rs +++ /dev/null @@ -1,90 +0,0 @@ -use jsonrpc_core::Error; -use jsonrpc_core::futures::BoxFuture; -use jsonrpc_macros::pubsub; -use jsonrpc_pubsub::SubscriptionId; - -use std::collections::HashMap; -use std::net::IpAddr; - -pub type AccountToken = String; -pub type CountryCode = String; - -build_rpc_trait! { - pub trait IpcApi { - type Metadata; - - /// Fetches and returns metadata about an account. Returns an error on non-existing - /// accounts. - #[rpc(name = "get_account_data")] - fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>; - - /// Returns available countries. - #[rpc(name = "get_countries")] - fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>; - - /// Set which account to connect with - #[rpc(name = "set_account")] - fn set_account(&self, AccountToken) -> Result<(), Error>; - - /// Set which country to connect to - #[rpc(name = "set_country")] - fn set_country(&self, CountryCode) -> Result<(), Error>; - - /// Set if the backend should automatically establish a tunnel on start or not. - #[rpc(name = "set_autoconnect")] - fn set_autoconnect(&self, bool) -> Result<(), Error>; - - /// Try to connect if disconnected, or do nothing if already connecting/connected. - #[rpc(name = "connect")] - fn connect(&self) -> Result<(), Error>; - - /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already - /// disconnected. - #[rpc(name = "disconnect")] - fn disconnect(&self) -> Result<(), Error>; - - /// Returns the current security state of the Mullvad client. Changes to this state will - /// be announced to subscribers of `event`. - #[rpc(name = "get_state")] - fn get_state(&self) -> Result<SecurityState, Error>; - - /// Returns the current public IP of this computer. - #[rpc(name = "get_ip")] - fn get_ip(&self) -> Result<IpAddr, Error>; - - /// Performs a geoIP lookup and returns the current location as perceived by the public - /// internet. - #[rpc(name = "get_location")] - fn get_location(&self) -> Result<Location, Error>; - - #[pubsub(name = "event")] { - /// Subscribes to the `event` notifications. - #[rpc(name = "event_subscribe")] - fn subscribe(&self, Self::Metadata, pubsub::Subscriber<String>); - - /// Unsubscribes from the `event` notifications. - #[rpc(name = "event_unsubscribe")] - fn unsubscribe(&self, SubscriptionId) -> BoxFuture<bool, Error>; - } - } -} - -#[derive(Serialize)] -pub struct AccountData { - pub paid_until: String, -} - -#[derive(Serialize)] -pub struct Location { - pub latlong: [f64; 2], - pub country: String, - pub city: String, -} - -#[derive(Serialize)] -pub enum SecurityState { - Unsecured, - Securing, - Secured, - Unsecuring, -} diff --git a/mullvad_daemon/src/main.rs b/mullvad_daemon/src/main.rs index 95033b1564..fa1db7765c 100644 --- a/mullvad_daemon/src/main.rs +++ b/mullvad_daemon/src/main.rs @@ -8,37 +8,330 @@ extern crate serde; #[macro_use] extern crate serde_derive; -extern crate talpid_ipc; - extern crate jsonrpc_core; extern crate jsonrpc_pubsub; #[macro_use] extern crate jsonrpc_macros; extern crate jsonrpc_ws_server; +extern crate uuid; +#[macro_use] +extern crate lazy_static; + +extern crate talpid_core; +extern crate talpid_ipc; + +mod management_interface; +mod states; + +use management_interface::{ManagementInterfaceServer, TunnelCommand}; +use states::{SecurityState, TargetState}; + +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; + +use talpid_core::net::RemoteAddr; +use talpid_core::tunnel::{self, TunnelEvent, TunnelMonitor}; + +error_chain!{ + errors { + /// The client is in the wrong state for the requested operation. Optimally the code should + /// be written in such a way so such states can't exist. + InvalidState { + description("Client is in an invalid state for the requested operation") + } + TunnelError(msg: &'static str) { + description("Error in the tunnel monitor") + display("Tunnel monitor error: {}", msg) + } + ManagementInterfaceError(msg: &'static str) { + description("Error in the management interface") + display("Management interface error: {}", msg) + } + } +} + +lazy_static! { + // Temporary store of hardcoded remotes. + static ref REMOTES: [RemoteAddr; 3] = [ + RemoteAddr::new("se5.mullvad.net", 1300), + RemoteAddr::new("se6.mullvad.net", 1300), + RemoteAddr::new("se7.mullvad.net", 1300), + ]; +} + +pub enum DaemonEvent { + TunnelEvent(TunnelEvent), + TunnelExit(tunnel::Result<()>), + ManagementInterfaceEvent(TunnelCommand), + ManagementInterfaceExit(talpid_ipc::Result<()>), +} + +impl From<TunnelEvent> for DaemonEvent { + fn from(tunnel_event: TunnelEvent) -> Self { + DaemonEvent::TunnelEvent(tunnel_event) + } +} + +impl From<TunnelCommand> for DaemonEvent { + fn from(tunnel_command: TunnelCommand) -> Self { + DaemonEvent::ManagementInterfaceEvent(tunnel_command) + } +} + +/// Represents the internal state of the actual tunnel. +// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TunnelState { + /// No tunnel is running. + NotRunning, + /// The tunnel has been started, but it is not established/functional. + Down, + /// The tunnel is up and working. + Up, +} + +impl TunnelState { + pub fn as_security_state(&self) -> SecurityState { + match *self { + TunnelState::Up => SecurityState::Secured, + _ => SecurityState::Unsecured, + } + } +} + + +struct Daemon { + state: TunnelState, + last_broadcasted_state: SecurityState, + target_state: TargetState, + rx: mpsc::Receiver<DaemonEvent>, + tx: mpsc::Sender<DaemonEvent>, + tunnel_close_handle: Option<tunnel::CloseHandle>, + management_interface_broadcaster: management_interface::EventBroadcaster, + + // Just for testing. A cyclic iterator iterating over the hardcoded remotes, + // picking a new one for each retry. + remote_iter: std::iter::Cycle<std::iter::Cloned<std::slice::Iter<'static, RemoteAddr>>>, +} + +impl Daemon { + pub fn new() -> Result<Self> { + let (tx, rx) = mpsc::channel(); + let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; + Ok( + Daemon { + state: TunnelState::NotRunning, + last_broadcasted_state: SecurityState::Unsecured, + target_state: TargetState::Unsecured, + rx, + tx, + tunnel_close_handle: None, + management_interface_broadcaster, + remote_iter: REMOTES.iter().cloned().cycle(), + }, + ) + } + + // Starts the management interface and spawns a thread that will process it. + // Returns a handle that allows notifying all subscribers on events. + fn start_management_interface(event_tx: mpsc::Sender<DaemonEvent>) + -> Result<management_interface::EventBroadcaster> { + let server = Self::start_management_interface_server(event_tx.clone())?; + let event_broadcaster = server.event_broadcaster(); + Self::spawn_management_interface_wait_thread(server, event_tx); + Ok(event_broadcaster) + } + + fn start_management_interface_server(event_tx: mpsc::Sender<DaemonEvent>) + -> Result<ManagementInterfaceServer> { + let server = + ManagementInterfaceServer::start(event_tx.clone()) + .chain_err(|| ErrorKind::ManagementInterfaceError("Failed to start server"),)?; + info!( + "Mullvad management interface listening on {}", + server.address() + ); + Ok(server) + } + + fn spawn_management_interface_wait_thread(server: ManagementInterfaceServer, + exit_tx: mpsc::Sender<DaemonEvent>) { + thread::spawn( + move || { + let result = server.wait(); + debug!("Mullvad management interface shut down"); + let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExit(result)); + }, + ); + } + + /// Consume the `Daemon` and run the main event loop. Blocks until an error happens. + pub fn run(mut self) -> Result<()> { + while let Ok(event) = self.rx.recv() { + self.handle_event(event)?; + } + Ok(()) + } + + fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { + use DaemonEvent::*; + match event { + TunnelEvent(event) => Ok(self.handle_tunnel_event(event)), + TunnelExit(result) => self.handle_tunnel_exit(result), + ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), + ManagementInterfaceExit(result) => self.handle_management_interface_exit(result), + } + } + + fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) { + info!("Tunnel event: {:?}", tunnel_event); + let new_state = match tunnel_event { + TunnelEvent::Up => TunnelState::Up, + TunnelEvent::Down => TunnelState::Down, + }; + self.set_state(new_state); + } + + fn handle_tunnel_exit(&mut self, result: tunnel::Result<()>) -> Result<()> { + self.tunnel_close_handle = None; + if let Err(e) = result { + log_error("Tunnel exited in an unexpected way", e); + } + self.set_state(TunnelState::NotRunning); + self.apply_target_state() + } + + fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> { + match event { + TunnelCommand::SetTargetState(state) => self.set_target_state(state)?, + TunnelCommand::GetState(tx) => { + if let Err(_) = tx.send(self.last_broadcasted_state) { + warn!("Unable to send current state to management interface client",); + } + } + } + Ok(()) + } + + fn handle_management_interface_exit(&self, result: talpid_ipc::Result<()>) -> Result<()> { + let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly"); + match result { + Ok(()) => Err(error.into()), + e => e.chain_err(|| error), + } + } + + /// Update the state of the client. If it changed, notify the subscribers. + fn set_state(&mut self, new_state: TunnelState) { + if new_state != self.state { + self.state = new_state; + let new_security_state = self.state.as_security_state(); + if self.last_broadcasted_state != new_security_state { + self.last_broadcasted_state = new_security_state; + self.management_interface_broadcaster.notify_new_state(new_security_state); + } + } + } + + /// Set the target state of the client. If it changed trigger the operations needed to progress + /// towards that state. + fn set_target_state(&mut self, new_state: TargetState) -> Result<()> { + if new_state != self.target_state { + self.target_state = new_state; + self.apply_target_state() + } else { + Ok(()) + } + } -pub mod ipc_api; -pub mod mock_ipc; + fn apply_target_state(&mut self) -> Result<()> { + match (self.target_state, self.state) { + (TargetState::Secured, TunnelState::NotRunning) => { + debug!("Triggering tunnel start"); + self.start_tunnel() + } + (TargetState::Unsecured, TunnelState::Down) | + (TargetState::Unsecured, TunnelState::Up) => { + if let Some(close_handle) = self.tunnel_close_handle.take() { + debug!("Triggering tunnel stop"); + // This close operation will block until the tunnel is dead. + close_handle + .close() + .chain_err(|| ErrorKind::TunnelError("Unable to kill tunnel")) + } else { + Ok(()) + } + } + (target_state, state) => { + trace!( + "apply_target_state does nothing on TargetState::{:?} TunnelState::{:?}", + target_state, + state + ); + Ok(()) + } + } + } + + fn start_tunnel(&mut self) -> Result<()> { + ensure!( + self.state == TunnelState::NotRunning, + ErrorKind::InvalidState + ); + let remote = self.remote_iter.next().unwrap(); + let tunnel_monitor = self.spawn_tunnel_monitor(remote)?; + self.tunnel_close_handle = Some(tunnel_monitor.close_handle()); + self.spawn_tunnel_monitor_wait_thread(tunnel_monitor); + + self.set_state(TunnelState::Down); + Ok(()) + } + + fn spawn_tunnel_monitor(&self, remote: RemoteAddr) -> Result<TunnelMonitor> { + // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync + let event_tx = Arc::new(Mutex::new(self.tx.clone())); + let on_tunnel_event = move |event| { + let _ = event_tx.lock().unwrap().send(DaemonEvent::TunnelEvent(event)); + }; + TunnelMonitor::new(remote, on_tunnel_event) + .chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor")) + } + + fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) { + let error_tx = self.tx.clone(); + thread::spawn( + move || { + let result = tunnel_monitor.wait(); + let _ = error_tx.send(DaemonEvent::TunnelExit(result)); + trace!("Tunnel monitor thread exit"); + }, + ); + } +} + + +fn log_error<E>(msg: &str, error: E) + where E: error_chain::ChainedError +{ + error!("{}: {}", msg, error); + for e in error.iter().skip(1) { + error!("Caused by {}", e); + } +} -error_chain!{} quick_main!(run); fn run() -> Result<()> { init_logger()?; - let server = start_ipc()?; - info!("Mullvad daemon listening on {}", server.address()); - main_loop(server) + let daemon = Daemon::new().chain_err(|| "Unable to initialize daemon")?; + daemon.run()?; + + debug!("Mullvad daemon is quitting"); + Ok(()) } fn init_logger() -> Result<()> { env_logger::init().chain_err(|| "Failed to bootstrap logging system") } - -fn start_ipc() -> Result<mock_ipc::IpcServer> { - mock_ipc::IpcServer::start().chain_err(|| "Failed to start IPC server") -} - -fn main_loop(server: mock_ipc::IpcServer) -> Result<()> { - server.wait().chain_err(|| "Error while waiting for server to process") -} diff --git a/mullvad_daemon/src/management_interface.rs b/mullvad_daemon/src/management_interface.rs new file mode 100644 index 0000000000..7e66205862 --- /dev/null +++ b/mullvad_daemon/src/management_interface.rs @@ -0,0 +1,301 @@ +use jsonrpc_core::{Error, ErrorCode, Metadata}; +use jsonrpc_core::futures::{BoxFuture, Future, future, sync}; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; +use jsonrpc_ws_server; + +use states::{SecurityState, TargetState}; + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, Mutex, RwLock, mpsc}; + +use talpid_ipc; +use uuid; + + +pub type AccountToken = String; +pub type CountryCode = String; + +#[derive(Serialize)] +pub struct AccountData { + pub paid_until: String, +} + +#[derive(Serialize)] +pub struct Location { + pub latlong: [f64; 2], + pub country: String, + pub city: String, +} + + +build_rpc_trait! { + pub trait ManagementInterfaceApi { + type Metadata; + + /// Fetches and returns metadata about an account. Returns an error on non-existing + /// accounts. + #[rpc(name = "get_account_data")] + fn get_account_data(&self, AccountToken) -> Result<AccountData, Error>; + + /// Returns available countries. + #[rpc(name = "get_countries")] + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error>; + + /// Set which account to connect with + #[rpc(name = "set_account")] + fn set_account(&self, AccountToken) -> Result<(), Error>; + + /// Set which country to connect to + #[rpc(name = "set_country")] + fn set_country(&self, CountryCode) -> Result<(), Error>; + + /// Set if the client should automatically establish a tunnel on start or not. + #[rpc(name = "set_autoconnect")] + fn set_autoconnect(&self, bool) -> Result<(), Error>; + + /// Try to connect if disconnected, or do nothing if already connecting/connected. + #[rpc(name = "connect")] + fn connect(&self) -> Result<(), Error>; + + /// Disconnect the VPN tunnel if it is connecting/connected. Does nothing if already + /// disconnected. + #[rpc(name = "disconnect")] + fn disconnect(&self) -> Result<(), Error>; + + /// Returns the current security state of the Mullvad client. Changes to this state will + /// be announced to subscribers of `event`. + #[rpc(async, name = "get_state")] + fn get_state(&self) -> BoxFuture<SecurityState, Error>; + + /// Returns the current public IP of this computer. + #[rpc(name = "get_ip")] + fn get_ip(&self) -> Result<IpAddr, Error>; + + /// Performs a geoIP lookup and returns the current location as perceived by the public + /// internet. + #[rpc(name = "get_location")] + fn get_location(&self) -> Result<Location, Error>; + + #[pubsub(name = "new_state")] { + /// Subscribes to the `new_state` event notifications. + #[rpc(name = "new_state_subscribe")] + fn new_state_subscribe(&self, Self::Metadata, pubsub::Subscriber<SecurityState>); + + /// Unsubscribes from the `new_state` event notifications. + #[rpc(name = "new_state_unsubscribe")] + fn new_state_unsubscribe(&self, SubscriptionId) -> BoxFuture<(), Error>; + } + } +} + + +/// Enum representing commands coming in on the management interface. +#[derive(Debug)] +pub enum TunnelCommand { + /// Change target state. + SetTargetState(TargetState), + /// Request the current state. + GetState(sync::oneshot::Sender<SecurityState>), +} + +type ActiveSubscriptions = Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<SecurityState>>>>; + +pub struct ManagementInterfaceServer { + server: talpid_ipc::IpcServer, + active_subscriptions: ActiveSubscriptions, +} + +impl ManagementInterfaceServer { + pub fn start(tunnel_tx: mpsc::Sender<::DaemonEvent>) -> talpid_ipc::Result<Self> { + let rpc = ManagementInterface::new(tunnel_tx); + let active_subscriptions = rpc.active_subscriptions.clone(); + + let mut io = PubSubHandler::default(); + io.extend_with(rpc.to_delegate()); + let server = talpid_ipc::IpcServer::start_with_metadata(io.into(), meta_extractor)?; + Ok( + ManagementInterfaceServer { + server, + active_subscriptions, + }, + ) + } + + pub fn address(&self) -> &str { + self.server.address() + } + + pub fn event_broadcaster(&self) -> EventBroadcaster { + EventBroadcaster { active_subscriptions: self.active_subscriptions.clone() } + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. + pub fn wait(self) -> talpid_ipc::Result<()> { + self.server.wait() + } +} + + +/// A handle that allows broadcasting messages to all subscribers of the management interface. +pub struct EventBroadcaster { + active_subscriptions: ActiveSubscriptions, +} + +impl EventBroadcaster { + /// Sends an event to all subscribers of the management interface. + pub fn notify_new_state(&self, event: SecurityState) { + let active_subscriptions = self.active_subscriptions.read().unwrap(); + for sink in active_subscriptions.values() { + let _ = sink.notify(Ok(event)).wait(); + } + } +} + +struct ManagementInterface { + active_subscriptions: ActiveSubscriptions, + tx: Mutex<mpsc::Sender<::DaemonEvent>>, +} + +impl ManagementInterface { + pub fn new(tx: mpsc::Sender<::DaemonEvent>) -> Self { + ManagementInterface { + active_subscriptions: Default::default(), + tx: Mutex::new(tx), + } + } +} + +impl ManagementInterfaceApi for ManagementInterface { + type Metadata = Meta; + + fn get_account_data(&self, _account_token: AccountToken) -> Result<AccountData, Error> { + trace!("get_account_data"); + Ok(AccountData { paid_until: "2018-12-31T16:00:00.000Z".to_owned() },) + } + + fn get_countries(&self) -> Result<HashMap<CountryCode, String>, Error> { + trace!("get_countries"); + Ok(HashMap::new()) + } + + fn set_account(&self, _account_token: AccountToken) -> Result<(), Error> { + trace!("set_account"); + Ok(()) + } + + fn set_country(&self, _country_code: CountryCode) -> Result<(), Error> { + trace!("set_country"); + Ok(()) + } + + fn set_autoconnect(&self, _autoconnect: bool) -> Result<(), Error> { + trace!("set_autoconnect"); + Ok(()) + } + + fn connect(&self) -> Result<(), Error> { + trace!("connect"); + self.tx + .lock() + .unwrap() + .send(TunnelCommand::SetTargetState(TargetState::Secured).into()) + .map_err(|_| Error::internal_error()) + } + + fn disconnect(&self) -> Result<(), Error> { + trace!("disconnect"); + self.tx + .lock() + .unwrap() + .send(TunnelCommand::SetTargetState(TargetState::Unsecured).into()) + .map_err(|_| Error::internal_error()) + } + + fn get_state(&self) -> BoxFuture<SecurityState, Error> { + trace!("get_state"); + let (state_tx, state_rx) = sync::oneshot::channel(); + match self.tx.lock().unwrap().send(TunnelCommand::GetState(state_tx).into()) { + Ok(()) => state_rx.map_err(|_| Error::internal_error()).boxed(), + Err(_) => future::err(Error::internal_error()).boxed(), + } + } + + fn get_ip(&self) -> Result<IpAddr, Error> { + trace!("get_ip"); + Ok(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))) + } + + fn get_location(&self) -> Result<Location, Error> { + trace!("get_location"); + Ok( + Location { + latlong: [1.0, 2.0], + country: "narnia".to_owned(), + city: "Le city".to_owned(), + }, + ) + } + + fn new_state_subscribe(&self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber<SecurityState>) { + trace!("new_state_subscribe"); + let mut active_subscriptions = self.active_subscriptions.write().unwrap(); + loop { + let id = SubscriptionId::String(uuid::Uuid::new_v4().to_string()); + if let Entry::Vacant(entry) = active_subscriptions.entry(id.clone()) { + if let Ok(sink) = subscriber.assign_id(id.clone()) { + debug!("Accepting new subscription with id {:?}", id); + entry.insert(sink); + } + break; + } + } + } + + fn new_state_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<(), Error> { + trace!("new_state_unsubscribe"); + let was_removed = self.active_subscriptions.write().unwrap().remove(&id).is_some(); + let result = if was_removed { + debug!("Unsubscribing id {:?}", id); + future::ok(()) + } else { + future::err( + Error { + code: ErrorCode::InvalidParams, + message: "Invalid subscription".to_owned(), + data: None, + }, + ) + }; + result.boxed() + } +} + + +/// The metadata type. There is one instance associated with each connection. In this pubsub +/// scenario they are created by `meta_extractor` by the server on each new incoming +/// connection. +#[derive(Clone, Debug, Default)] +pub struct Meta { + session: Option<Arc<Session>>, +} + +/// Make the `Meta` type possible to use as jsonrpc metadata type. +impl Metadata for Meta {} + +/// Make the `Meta` type possible to use as a pubsub metadata type. +impl PubSubMetadata for Meta { + fn session(&self) -> Option<Arc<Session>> { + self.session.clone() + } +} + +/// Metadata extractor function for `Meta`. +fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { + Meta { session: Some(Arc::new(Session::new(context.sender()))) } +} diff --git a/mullvad_daemon/src/states.rs b/mullvad_daemon/src/states.rs new file mode 100644 index 0000000000..10e543c58a --- /dev/null +++ b/mullvad_daemon/src/states.rs @@ -0,0 +1,21 @@ +/// Security state of the computer. +/// TODO(linus): There is a difference between lockdown(firewall) and tunnel functionality. The +/// firewall can be set to prevent any leaks but the tunnel is not connected. Then we are secured, +/// but disconnected. The frontend should probably reflect these states in some way. I think it +/// be reasonable to have three states, since unsecured but tunnel is up is probably an invalid +/// state. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum SecurityState { + Unsecured, + Secured, +} + +/// Represents the state the client strives towards. +/// When in `Secured`, the client should keep the computer from leaking and try to +/// establish a VPN tunnel if it is not up. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum TargetState { + Unsecured, + Secured, +} diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml index 18c998cf93..573cc0b328 100644 --- a/talpid_core/Cargo.toml +++ b/talpid_core/Cargo.toml @@ -5,11 +5,15 @@ authors = ["Linus Färnstrand <linus@mullvad.net>", "Erik Larkö <erik@mullvad.n description = "Core backend functionality of the Mullvad VPN client" [dependencies] -duct = "0.8" +duct = "0.9.1" error-chain = "0.10" log = "0.3" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +lazy_static = "0.2" +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } + +[target.'cfg(unix)'.dependencies] +libc = "0.2.20" [dependencies.talpid_ipc] path = "../talpid_ipc" diff --git a/talpid_core/src/lib.rs b/talpid_core/src/lib.rs index 8a053bffa8..5496d038ce 100644 --- a/talpid_core/src/lib.rs +++ b/talpid_core/src/lib.rs @@ -9,6 +9,8 @@ extern crate assert_matches; extern crate duct; #[macro_use] +extern crate lazy_static; +#[macro_use] extern crate log; #[macro_use] diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs index a7d76ea9f4..88bdc12ea4 100644 --- a/talpid_core/src/process/mod.rs +++ b/talpid_core/src/process/mod.rs @@ -1,5 +1,6 @@ -/// A module for monitoring child processes and get notified of events on them. -pub mod monitor; - /// A module for all OpenVPN related process management. pub mod openvpn; + +/// Unix specific process management features. +#[cfg(unix)] +pub mod unix; diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs deleted file mode 100644 index 1568d80615..0000000000 --- a/talpid_core/src/process/monitor.rs +++ /dev/null @@ -1,113 +0,0 @@ -use duct; - -use std::io; -use std::process; -use std::sync::Arc; -use std::thread; - -/// A child process monitor. Takes care of starting and monitoring a child process and calls the -/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out -/// of scope it will kill the child and wait for it to exit properly. -pub struct ChildMonitor { - child: Arc<duct::Handle>, - thread: Option<thread::JoinHandle<()>>, -} - -impl ChildMonitor { - /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the - /// child process exits. - pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self> - where L: FnMut(io::Result<&process::Output>) + Send + 'static - { - let child = Arc::new(expression.start()?); - let child_clone = child.clone(); - let thread = Some(thread::spawn(move || on_exit(child_clone.wait()))); - Ok(ChildMonitor { child, thread }) - } - - /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is - /// guaranteed to fire before this method returns. - pub fn wait(&mut self) -> io::Result<&process::Output> { - if let Some(thread) = self.thread.take() { - if let Err(e) = thread.join() { - error!("Panic in the on_exit callback in ChildMonitor: {:?}", e); - } - } - self.child.wait() - } - - /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor - /// will wait for the process for you. - pub fn kill(&self) -> io::Result<()> { - self.child.kill() - } -} - -impl Drop for ChildMonitor { - fn drop(&mut self) { - let _ = self.kill(); - let _ = self.child.wait(); - } -} - - -#[cfg(test)] -mod child_monitor_tests { - use super::*; - use duct::{Expression, cmd}; - - use std::sync::mpsc; - use std::time::Duration; - - fn echo_cmd(s: &str) -> Expression { - cmd("echo", &[s]).stdout_capture().unchecked() - } - - fn invalid_cmd() -> Expression { - cmd("this command does not exist", &[""]).unchecked() - } - - fn sleep_cmd(secs: u32) -> Expression { - if cfg!(windows) { - cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked() - } else { - cmd("sleep", &[secs.to_string()]).unchecked() - } - } - - fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) { - let (tx, rx) = mpsc::channel(); - let child = - ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap()) - .expect("Unable to start process"); - (child, rx) - } - - #[test] - fn echo() { - let (mut child, rx) = spawn(&echo_cmd("foobar")); - let wait_output = child.wait().unwrap(); - let callback_output = rx.try_recv().unwrap().unwrap(); - - assert!(callback_output.status.success()); - assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]); - assert_eq!(wait_output.status, callback_output.status); - assert_eq!(wait_output.stdout, callback_output.stdout); - } - - #[test] - fn invalid_command() { - assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err()); - } - - #[test] - fn callback_after_kill() { - let (child, rx) = spawn(&sleep_cmd(100000)); - // Make sure on_exit is not triggered within the first second. It should not be called - // until we kill the process. - assert!(rx.recv_timeout(Duration::from_secs(1)).is_err()); - - child.kill().unwrap(); - assert!(!rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap().status.success()); - } -} diff --git a/talpid_core/src/process/openvpn.rs b/talpid_core/src/process/openvpn.rs index df1e4c7682..97c88a99c0 100644 --- a/talpid_core/src/process/openvpn.rs +++ b/talpid_core/src/process/openvpn.rs @@ -54,7 +54,7 @@ impl OpenVpnCommand { /// Build a runnable expression from the current state of the command. pub fn build(&self) -> duct::Expression { debug!("Building expression: {}", &self); - duct::cmd(&self.openvpn_bin, self.get_arguments()) + duct::cmd(&self.openvpn_bin, self.get_arguments()).unchecked() } /// Returns all arguments that the subprocess would be spawned with. diff --git a/talpid_core/src/process/unix.rs b/talpid_core/src/process/unix.rs new file mode 100644 index 0000000000..64599a6930 --- /dev/null +++ b/talpid_core/src/process/unix.rs @@ -0,0 +1,33 @@ +extern crate libc; + +use duct; +use duct::unix::HandleExt; + +use std::io; +use std::sync::{Arc, mpsc}; +use std::thread; +use std::time::Duration; + +/// Kills a process by first sending it the `SIGTERM` signal and then wait up to `timeout`. If the +/// process has not died after the timeout has expired it is killed. +pub fn nice_kill(handle: Arc<duct::Handle>, timeout: Duration) -> io::Result<()> { + trace!("Sending SIGTERM to child process"); + handle.send_signal(libc::SIGTERM)?; + + if wait_timeout(handle.clone(), timeout) { + debug!("Child process exited from SIGTERM"); + Ok(()) + } else { + debug!("Child process did not exit from SIGTERM, sending SIGKILL"); + handle.kill() + } +} + +/// Wait for a process to die for a maximum of `timeout`. Returns true if the process died within +/// the timeout. Warning, if the process does not exit in the given time, this function will leave +/// a thread running until it does exit. +fn wait_timeout(handle: Arc<duct::Handle>, timeout: Duration) -> bool { + let (stop, stopped) = mpsc::channel(); + thread::spawn(move || { let _ = stop.send(handle.wait().is_ok()); }); + stopped.recv_timeout(timeout).unwrap_or(false) +} diff --git a/talpid_core/src/tunnel/mod.rs b/talpid_core/src/tunnel/mod.rs index c453c52056..aead09b13b 100644 --- a/talpid_core/src/tunnel/mod.rs +++ b/talpid_core/src/tunnel/mod.rs @@ -1,11 +1,12 @@ use net; use openvpn_ffi::OpenVpnPluginEvent; use process::openvpn::OpenVpnCommand; +use std::io; /// A module for all OpenVPN related tunnel management. pub mod openvpn; -use self::openvpn::{OpenVpnEvent, OpenVpnMonitor}; +use self::openvpn::{OpenVpnCloseHandle, OpenVpnMonitor}; mod errors { error_chain!{ @@ -14,10 +15,6 @@ mod errors { TunnelMonitoringError { description("Error while setting up or processing events from the VPN tunnel") } - /// An error indicating that there was an error when trying to start up a VPN tunnel. - TunnelStartError { - description("Error while trying to start the tunnel") - } } } } @@ -25,26 +22,18 @@ pub use self::errors::*; /// Possible events from the VPN tunnel and the child process managing it. +#[derive(Debug)] pub enum TunnelEvent { /// Sent when the tunnel comes up and is ready for traffic. Up, /// Sent when the tunnel goes down. Down, - /// Sent when the process managing the tunnel exits. - Shutdown, } impl TunnelEvent { - /// Converts an `OpenVpnEvent` to a `TunnelEvent`. + /// Converts an `OpenVpnPluginEvent` to a `TunnelEvent`. /// Returns `None` if there is no corresponding `TunnelEvent`. - pub fn from_openvpn_event(event: &OpenVpnEvent) -> Option<TunnelEvent> { - match *event { - OpenVpnEvent::PluginEvent(ref event, _) => Self::from_openvpn_plugin_event(event), - OpenVpnEvent::Shutdown(_) => Some(TunnelEvent::Shutdown), - } - } - - fn from_openvpn_plugin_event(event: &OpenVpnPluginEvent) -> Option<TunnelEvent> { + fn from_openvpn_event(event: &OpenVpnPluginEvent) -> Option<TunnelEvent> { match *event { OpenVpnPluginEvent::Up => Some(TunnelEvent::Up), OpenVpnPluginEvent::RoutePredown => Some(TunnelEvent::Down), @@ -60,28 +49,49 @@ pub struct TunnelMonitor { } impl TunnelMonitor { - /// Creates a new `TunnelMonitor` with the given event callback. - pub fn new<L>(on_event: L) -> Result<Self> + /// Creates a new `TunnelMonitor` that connects to the given remote and notifies `on_event` + /// on tunnel state changes. + pub fn new<L>(remote: net::RemoteAddr, on_event: L) -> Result<Self> where L: Fn(TunnelEvent) + Send + Sync + 'static { - let on_openvpn_event = move |openvpn_event| { - // FIXME: This comment must be here to make rustfmt 0.8.3 not screw up. - match TunnelEvent::from_openvpn_event(&openvpn_event) { - Some(tunnel_event) => on_event(tunnel_event), - None => debug!("Ignoring OpenVpnEvent {:?}", openvpn_event), - } + let on_openvpn_event = move |event, _env| match TunnelEvent::from_openvpn_event(&event) { + Some(tunnel_event) => on_event(tunnel_event), + None => debug!("Ignoring OpenVpnEvent {:?}", event), }; - let monitor = openvpn::OpenVpnMonitor::new(on_openvpn_event, get_plugin_path()) + let cmd = Self::create_openvpn_cmd(remote); + let monitor = openvpn::OpenVpnMonitor::new(cmd, on_openvpn_event, get_plugin_path()) .chain_err(|| ErrorKind::TunnelMonitoringError)?; Ok(TunnelMonitor { monitor }) } - /// Tries to start a VPN tunnel towards the given address. Will fail if there is a tunnel - /// running already. - pub fn start(&self, remote: net::RemoteAddr) -> Result<()> { + fn create_openvpn_cmd(remote: net::RemoteAddr) -> OpenVpnCommand { let mut cmd = OpenVpnCommand::new("openvpn"); - cmd.config(get_config_path()).remotes(remote).unwrap(); - self.monitor.start(cmd).chain_err(|| ErrorKind::TunnelStartError) + cmd.config(get_config_path()) + .remotes(remote) + .unwrap(); + cmd + } + + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle(self.monitor.close_handle()) + } + + /// Consumes the monitor and block until the tunnel exits or there is an error. + pub fn wait(self) -> Result<()> { + self.monitor.wait().chain_err(|| ErrorKind::TunnelMonitoringError) + } +} + + +/// A handle to a `TunnelMonitor` +pub struct CloseHandle(OpenVpnCloseHandle); + +impl CloseHandle { + /// Closes the underlying tunnel, making the `TunnelMonitor::wait` method return. + pub fn close(self) -> io::Result<()> { + self.0.close() } } diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs index 9d366ebb4a..08c2c14a32 100644 --- a/talpid_core/src/tunnel/openvpn.rs +++ b/talpid_core/src/tunnel/openvpn.rs @@ -1,30 +1,29 @@ +use duct; use jsonrpc_core::{Error, IoHandler}; -use openvpn_ffi; -use process::monitor::ChildMonitor; +use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent}; use process::openvpn::OpenVpnCommand; -use std::io; -use std::path::{Path, PathBuf}; -use std::process; +use std::io; +use std::path::Path; use std::result::Result as StdResult; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::Duration; use talpid_ipc; mod errors { error_chain!{ errors { - /// The `OpenVpnMonitor` is in an invalid state for the requested operation. - InvalidState { - description("Invalid state. OpenVPN is already running") + /// Unable to start, wait for or kill the OpenVPN process. + ChildProcessError(msg: &'static str) { + description("Unable to start, wait for or kill the OpenVPN process") + display("OpenVPN process error: {}", msg) } - /// Unable to start or kill the OpenVPN process. - ChildProcessError { - description("Unable to start or kill the OpenVPN process") - } - /// Unable to start or manage the IPC server listening for events from OpenVPN - IpcServerError { - description("Unable to start or manage the IPC server") + /// Unable to start or manage the IPC server listening for events from OpenVPN. + EventDispatcherError { + description("Unable to start or manage the event dispatcher IPC server") } } } @@ -32,89 +31,145 @@ mod errors { pub use self::errors::*; -/// Possible events from OpenVPN -#[derive(Debug)] -pub enum OpenVpnEvent { - /// An event from the plugin loaded into OpenVPN. - PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - /// The OpenVPN process exited. Containing the result of waiting for the process. - Shutdown(io::Result<process::ExitStatus>), +lazy_static!{ + static ref OPENVPN_DIE_TIMEOUT: Duration = Duration::from_secs(2); } -/// Struct for monitoring OpenVPN processes. + +/// Struct for monitoring an OpenVPN process. pub struct OpenVpnMonitor { - on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>, - plugin_path: PathBuf, - child: Arc<Mutex<Option<ChildMonitor>>>, - event_dispatcher: OpenVpnEventDispatcher, + child: Arc<duct::Handle>, + event_dispatcher: Option<OpenVpnEventDispatcher>, + closed: Arc<AtomicBool>, } impl OpenVpnMonitor { /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given /// path. - pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self> - where L: Fn(OpenVpnEvent) + Send + Sync + 'static, + pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static, P: AsRef<Path> { - let on_event = Arc::new(on_event); - let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?; + let event_dispatcher = OpenVpnEventDispatcher::start(on_event) + .chain_err(|| ErrorKind::EventDispatcherError)?; + + cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]); + let child = cmd.build() + .start() + .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?; + Ok( OpenVpnMonitor { - on_event, - plugin_path: plugin_path.as_ref().to_owned(), - child: Arc::new(Mutex::new(None)), - event_dispatcher, + child: Arc::new(child), + event_dispatcher: Some(event_dispatcher), + closed: Arc::new(AtomicBool::new(false)), }, ) } - fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>) - -> Result<OpenVpnEventDispatcher> { - let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env)); - OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError) + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> OpenVpnCloseHandle { + OpenVpnCloseHandle { + child: self.child.clone(), + closed: self.closed.clone(), + } } - /// Tries to start a new OpenVPN process if one is not already running. - /// If this `OpenVpnMonitor is already monitoring a running process it will return an - /// `InvalidState` error. - pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> { - let mut child_lock = self.child.lock().unwrap(); - if child_lock.is_some() { - bail!(ErrorKind::InvalidState); + /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting + /// for the process or in the event dispatcher. + pub fn wait(mut self) -> Result<()> { + match self.wait_result() { + WaitResult::Child(Ok(exit_status)) => { + if exit_status.success() || self.closed.load(Ordering::SeqCst) { + debug!( + "OpenVPN exited, as expected, with exit status: {}", + exit_status + ); + Ok(()) + } else { + error!("OpenVPN died unexpectedly with status: {}", exit_status); + Err(ErrorKind::ChildProcessError("Died unexpectedly").into()) + } + } + WaitResult::Child(Err(e)) => { + error!("OpenVPN process wait error: {}", e); + Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting")) + } + WaitResult::EventDispatcher(result) => { + error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result); + match result { + Ok(()) => Err(ErrorKind::EventDispatcherError.into()), + Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError), + } + } } - *child_lock = Some(self.start_child_monitor(cmd)?); - Ok(()) } - fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> { - self.set_plugin(&mut cmd); + /// Waits for both the child process and the event dispatcher in parallel. After both have + /// returned this returns the earliest result. + fn wait_result(&mut self) -> WaitResult { + let child_wait_handle = self.child.clone(); + let child_close_handle = self.close_handle(); + let event_dispatcher = self.event_dispatcher.take().unwrap(); + let dispatcher_handle = event_dispatcher.close_handle(); - let child = self.child.clone(); - let on_event = self.on_event.clone(); + let (child_tx, rx) = mpsc::channel(); + let dispatcher_tx = child_tx.clone(); - let on_exit = move |exit_status: io::Result<&process::Output>| { - *child.lock().unwrap() = None; - (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),) - }; - ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError) - } + thread::spawn( + move || { + let result = child_wait_handle.wait().map(|output| output.status); + child_tx.send(WaitResult::Child(result)).unwrap(); + dispatcher_handle.close(); + }, + ); + thread::spawn( + move || { + let result = event_dispatcher.wait(); + dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap(); + let _ = child_close_handle.close(); + }, + ); - fn set_plugin(&self, cmd: &mut OpenVpnCommand) { - let event_dispatcher_address = self.event_dispatcher.address().to_string(); - cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]); + let result = rx.recv().unwrap(); + let _ = rx.recv().unwrap(); + result } +} + +/// A handle to an `OpenVpnMonitor` for closing it. +pub struct OpenVpnCloseHandle { + child: Arc<duct::Handle>, + closed: Arc<AtomicBool>, +} - /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does - /// nothing. - pub fn kill(&self) -> Result<()> { - if let Some(ref child) = *self.child.lock().unwrap() { - child.kill().chain_err(|| ErrorKind::ChildProcessError)?; +impl OpenVpnCloseHandle { + /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. + pub fn close(self) -> io::Result<()> { + if !self.closed.swap(true, Ordering::SeqCst) { + self.kill_openvpn() + } else { + Ok(()) } - Ok(()) } -} + #[cfg(unix)] + fn kill_openvpn(self) -> io::Result<()> { + ::process::unix::nice_kill(self.child, *OPENVPN_DIE_TIMEOUT) + } + #[cfg(not(unix))] + fn kill_openvpn(self) -> io::Result<()> { + self.child.kill() + } +} + +/// Internal enum to differentiate between if the child process or the event dispatcher died first. +enum WaitResult { + Child(io::Result<::std::process::ExitStatus>), + EventDispatcher(talpid_ipc::Result<()>), +} /// IPC server for listening to events coming from plugin loaded into OpenVPN. @@ -125,8 +180,7 @@ pub struct OpenVpnEventDispatcher { impl OpenVpnEventDispatcher { /// Construct and start the IPC server with the given event listener callback. pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - L: Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { let rpc = OpenVpnEventApiImpl { on_event }; let mut io = IoHandler::new(); @@ -140,7 +194,14 @@ impl OpenVpnEventDispatcher { self.server.address() } - /// Consumes the server and waits for it to finish. + /// Creates a handle to this event dispatcher, allowing the listening server to be closed while + /// some other thread is blocked in `wait`. + pub fn close_handle(&self) -> talpid_ipc::CloseHandle { + self.server.close_handle() + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. pub fn wait(self) -> talpid_ipc::Result<()> { self.server.wait() } @@ -153,8 +214,8 @@ mod api { pub trait OpenVpnEventApi { #[rpc(name = "openvpn_event")] fn openvpn_event(&self, - openvpn_ffi::OpenVpnPluginEvent, - openvpn_ffi::OpenVpnEnv) + OpenVpnPluginEvent, + OpenVpnEnv) -> StdResult<(), Error>; } } @@ -162,18 +223,15 @@ mod api { use self::api::*; struct OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { on_event: L, } impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { - fn openvpn_event(&self, - event: openvpn_ffi::OpenVpnPluginEvent, - env: openvpn_ffi::OpenVpnEnv) - -> StdResult<(), Error> { + fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> { debug!("OpenVPN event {:?}", event); (self.on_event)(event, env); Ok(()) diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml index d1a2bb13e2..630cb1ced1 100644 --- a/talpid_ipc/Cargo.toml +++ b/talpid_ipc/Cargo.toml @@ -9,12 +9,12 @@ error-chain = "0.10" serde = "1.0" serde_json = "1.0" log = "0.3" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } ws = { git = "https://github.com/tomusdrw/ws-rs" } url = "1.4" [dev-dependencies] assert_matches = "1.0" env_logger = "0.4" -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index f6d4acea19..3ef9bf4733 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -26,18 +26,6 @@ pub type IpcServerId = String; error_chain!{ errors { - ReadFailure { - description("Could not read IPC message") - } - ParseFailure { - description("Unable to serialize/deserialize message") - } - CouldNotStartServer { - description("Failed to start the IPC server") - } - SendError { - description("Unable to send message") - } IpcServerError { description("Error in IPC server") } @@ -74,17 +62,28 @@ impl IpcServer { .chain_err(|| ErrorKind::IpcServerError) } + /// Returns the localhost address this `IpcServer` is listening on. pub fn address(&self) -> &str { &self.address } - /// Consumes the server, stops it and waits for it to finish. - pub fn stop(self) { - self.server.close(); + /// Creates a handle bound to this `IpcServer` that can be used to shut it down. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle(self.server.close_handle()) } - /// Consumes the server and waits for it to finish. + /// Consumes the server and waits for it to finish. Get an `CloseHandle` before calling this + /// if you want to be able to shut the server down. pub fn wait(self) -> Result<()> { self.server.wait().chain_err(|| ErrorKind::IpcServerError) } } + +#[derive(Clone)] +pub struct CloseHandle(jsonrpc_ws_server::CloseHandle); + +impl CloseHandle { + pub fn close(self) { + self.0.close(); + } +} |
