diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2018-06-20 15:34:40 +0200 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2018-07-03 13:37:54 +0200 |
| commit | 73840e98952dd3f7c005fcec44c971455da179eb (patch) | |
| tree | 15a887410858007a802450ad2870d8bf495dd16d | |
| parent | 67e82627564f8e4a2d8da4bcf5f0fd00867876bc (diff) | |
| download | mullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.tar.xz mullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.zip | |
Refactor IpcFacade to DaemonRpc and JsonRpcWs to JsonRpcTransport
31 files changed, 1247 insertions, 1057 deletions
diff --git a/app/app.android.js b/app/app.android.js index dbbb0c1c26..8158922673 100644 --- a/app/app.android.js +++ b/app/app.android.js @@ -29,7 +29,7 @@ DeviceEventEmitter.addListener('com.mullvad.daemon-connection', async (_event, a await backend.autologin(); await backend.fetchRelaySettings(); await backend.fetchSecurityState(); - await backend.connect(); + await backend.connectTunnel(); } catch (e) { if (e instanceof NoAccountError) { log.debug('No previously configured account set, showing window'); diff --git a/app/app.js b/app/app.js index 70d1052560..5ebc1137a1 100644 --- a/app/app.js +++ b/app/app.js @@ -10,39 +10,55 @@ import { log } from './lib/platform'; import makeRoutes from './routes'; import configureStore from './redux/store'; import { Backend, NoAccountError } from './lib/backend'; +import { DaemonRpc } from './lib/daemon-rpc'; import { setShutdownHandler } from './shutdown-handler'; import type { ConnectionState } from './redux/connection/reducers'; import type { TrayIconType } from './tray-icon-controller'; +import type { RpcCredentialsProvider, RpcCredentials } from './lib/backend'; const initialState = null; const memoryHistory = createMemoryHistory(); const store = configureStore(initialState, memoryHistory); -const backend = new Backend(store); -ipcRenderer.on('daemon-connection', async (_event, args) => { - backend.setCredentials(args.credentials); - backend.sync(); +class CredentialsProvider implements RpcCredentialsProvider { + async request(): Promise<RpcCredentials> { + return new Promise((resolve, _reject) => { + ipcRenderer.once('daemon-connection', async (_event, credentials: RpcCredentials) => { + log.debug('Got credentials: ', credentials); + resolve(credentials); + }); + ipcRenderer.send('daemon-connection'); + }); + } +} + +const rpc = new DaemonRpc(); +const credentialsProvider = new CredentialsProvider(); +const backend = new Backend(store, rpc, credentialsProvider); + +(async function() { + backend.connect(); + try { await backend.autologin(); - await backend.fetchRelaySettings(); - await backend.fetchSecurityState(); - await backend.connect(); - } catch (e) { - if (e instanceof NoAccountError) { + } catch (error) { + if (error instanceof NoAccountError) { log.debug('No previously configured account set, showing window'); ipcRenderer.send('show-window'); + } else { + log.error(`Failed to autologin: ${error.message}`); } } -}); -ipcRenderer.send('daemon-connection'); + backend.connectTunnel(); +})(); setShutdownHandler(async () => { log.info('Executing a shutdown handler'); try { - await backend.disconnect(); + await backend.disconnectTunnel(); log.info('Disconnected the tunnel'); } catch (e) { log.error(`Failed to shutdown tunnel: ${e.message}`); diff --git a/app/components/Account.js b/app/components/Account.js index fc72ff5b30..f8ea3dc19b 100644 --- a/app/components/Account.js +++ b/app/components/Account.js @@ -8,7 +8,7 @@ import styles from './AccountStyles'; import Img from './Img'; import { formatAccount } from '../lib/formatters'; -import type { AccountToken } from '../lib/ipc-facade'; +import type { AccountToken } from '../lib/daemon-rpc'; export type AccountProps = { accountToken: AccountToken, diff --git a/app/components/Login.js b/app/components/Login.js index dd297c8e9c..d15681c58b 100644 --- a/app/components/Login.js +++ b/app/components/Login.js @@ -11,7 +11,7 @@ import styles from './LoginStyles'; import { colors } from '../config'; import type { AccountReduxState } from '../redux/account/reducers'; -import type { AccountToken } from '../lib/ipc-facade'; +import type { AccountToken } from '../lib/daemon-rpc'; export type Props = { account: AccountReduxState, diff --git a/app/components/SelectLocation.js b/app/components/SelectLocation.js index 5864ed55a3..14d0a04f9c 100644 --- a/app/components/SelectLocation.js +++ b/app/components/SelectLocation.js @@ -14,7 +14,7 @@ import type { RelayLocationRedux, RelayLocationCityRedux, } from '../redux/settings/reducers'; -import type { RelayLocation } from '../lib/ipc-facade'; +import type { RelayLocation } from '../lib/daemon-rpc'; export type SelectLocationProps = { settings: SettingsReduxState, diff --git a/app/containers/SelectLocationPage.js b/app/containers/SelectLocationPage.js index b2ae476912..29b47c14a3 100644 --- a/app/containers/SelectLocationPage.js +++ b/app/containers/SelectLocationPage.js @@ -26,7 +26,7 @@ const mapDispatchToProps = (dispatch: ReduxDispatch, props: SharedRouteProps) => await backend.updateRelaySettings(relayUpdate); await backend.fetchRelaySettings(); - await backend.connect(); + await backend.connectTunnel(); pushHistory('/connect'); } catch (e) { diff --git a/app/lib/backend.js b/app/lib/backend.js index 39521d8edb..4fb9eb3cfa 100644 --- a/app/lib/backend.js +++ b/app/lib/backend.js @@ -1,16 +1,23 @@ // @flow -import { log } from '../lib/platform'; -import { IpcFacade, RealIpc } from './ipc-facade'; -import { JsonRpcError, TimeOutError } from './jsonrpc-ws-ipc'; +import { bindActionCreators } from 'redux'; +import { push } from 'react-router-redux'; +import { + RemoteError as JsonRpcTransportRemoteError, + TimeOutError as JsonRpcTransportTimeOutError, +} from './jsonrpc-transport'; import accountActions from '../redux/account/actions'; import connectionActions from '../redux/connection/actions'; import settingsActions from '../redux/settings/actions'; -import { push } from 'react-router-redux'; +import { log } from '../lib/platform'; -import type { RpcCredentials } from './rpc-address-file'; +import type { RpcCredentials as OriginalRpcCredentials } from './rpc-address-file'; +import type { + DaemonRpcProtocol, + ConnectionObserver as DaemonConnectionObserver, +} from './daemon-rpc'; import type { ReduxStore } from '../redux/store'; -import type { AccountToken, BackendState, RelaySettingsUpdate } from './ipc-facade'; +import type { AccountToken, BackendState, RelaySettingsUpdate } from './daemon-rpc'; import type { ConnectionState } from '../redux/connection/reducers'; export class NoCreditError extends Error { @@ -75,71 +82,80 @@ export class UnknownError extends Error { } } +export class CredentialsRequestError extends Error { + _reason: Error; + + constructor(reason: Error) { + super('Failed to request the RPC credentials'); + this._reason = reason; + } + + get reason(): Error { + return this._reason; + } +} + +export type RpcCredentials = OriginalRpcCredentials; +export interface RpcCredentialsProvider { + request(): Promise<RpcCredentials>; +} + /** * Backend implementation */ export class Backend { - _ipc: IpcFacade; - _credentials: ?RpcCredentials; - _authenticationPromise: ?Promise<void>; _store: ReduxStore; + _daemonRpc: DaemonRpcProtocol; + _credentialsProvider: RpcCredentialsProvider; + _reconnectBackoff = new ReconnectionBackoff(); - constructor(store: ReduxStore, credentials?: RpcCredentials, ipc: ?IpcFacade) { - this._store = store; - this._credentials = credentials; + _credentials: ?RpcCredentials; + _authenticationPromise: ?Promise<void>; - if (ipc) { - this._ipc = ipc; + _openConnectionObserver: ?DaemonConnectionObserver; + _closeConnectionObserver: ?DaemonConnectionObserver; - // force to re-authenticate when connection closed - this._ipc.setCloseConnectionHandler(() => { - this._authenticationPromise = null; - }); - - this._registerIpcListeners(); - this._startReachability(); - } - } + constructor( + store: ReduxStore, + rpc: DaemonRpcProtocol, + credentialsProvider: RpcCredentialsProvider, + ) { + this._store = store; + this._daemonRpc = rpc; + this._credentialsProvider = credentialsProvider; - setCredentials(credentials: RpcCredentials) { - log.debug('Got connection info to backend', credentials.connectionString); - this._credentials = credentials; + this._openConnectionObserver = rpc.addOpenConnectionObserver(() => { + this._onOpenConnection(); + }); - if (this._ipc) { - this._credentials = credentials; - } else { - this._ipc = new RealIpc(credentials.connectionString); + this._closeConnectionObserver = rpc.addCloseConnectionObserver((error) => { + this._onCloseConnection(error); + }); - // force to re-authenticate when connection closed - this._ipc.setCloseConnectionHandler(() => { - this._authenticationPromise = null; - }); - } - this._registerIpcListeners(); + this._setupReachability(); } - async sync() { - log.info('Syncing with the backend...'); + dispose() { + const openConnectionObserver = this._openConnectionObserver; + const closeConnectionObserver = this._closeConnectionObserver; - try { - await this._fetchRelayLocations(); - } catch (e) { - log.error('Failed to fetch the relay locations: ', e.message); + if (openConnectionObserver) { + openConnectionObserver.unsubscribe(); + this._openConnectionObserver = null; } - try { - await this._fetchLocation(); - } catch (e) { - log.error('Failed to fetch the location: ', e.message); + if (closeConnectionObserver) { + closeConnectionObserver.unsubscribe(); + this._closeConnectionObserver = null; } + } - try { - await this._fetchAllowLan(); - } catch (e) { - log.error('Failed to fetch the LAN sharing policy: ', e.message); - } + connect() { + this._connectToDaemon(); + } - await this._fetchAccountHistory(); + disconnect() { + this._daemonRpc.disconnect(); } async login(accountToken: AccountToken) { @@ -150,11 +166,11 @@ export class Backend { try { await this._ensureAuthenticated(); - const accountData = await this._ipc.getAccountData(accountToken); + const accountData = await this._daemonRpc.getAccountData(accountToken); log.debug('Account exists', accountData); - await this._ipc.setAccount(accountToken); + await this._daemonRpc.setAccount(accountToken); log.info('Log in complete'); @@ -166,10 +182,10 @@ export class Backend { setTimeout(() => { this._store.dispatch(push('/connect')); log.debug('Autoconnecting...'); - this.connect(); + this.connectTunnel(); }, 1000); - await this._fetchAccountHistory(); + await this.fetchAccountHistory(); } catch (e) { log.error('Failed to log in,', e.message); @@ -178,27 +194,6 @@ export class Backend { } } - _rpcErrorToBackendError(e) { - if (e instanceof JsonRpcError) { - switch (e.code) { - case -200: // Account doesn't exist - return new InvalidAccountError(); - case -32603: // Internal error - // We treat all internal backend errors as the user cannot reach - // api.mullvad.net. This is not always true of course, but it is - // true so often that we choose to disregard the other edge cases - // for now. - return new CommunicationError(); - } - } else if (e instanceof TimeOutError) { - return new CommunicationError(); - } else if (e instanceof NoDaemonError) { - return e; - } - - return new UnknownError(e.message); - } - async autologin() { try { log.debug('Attempting to log in automatically'); @@ -207,7 +202,7 @@ export class Backend { this._store.dispatch(accountActions.startLogin()); - const accountToken = await this._ipc.getAccount(); + const accountToken = await this._daemonRpc.getAccount(); if (!accountToken) { throw new NoAccountError(); } @@ -215,7 +210,7 @@ export class Backend { log.debug('The backend had an account number stored: ', accountToken); this._store.dispatch(accountActions.startLogin(accountToken)); - const accountData = await this._ipc.getAccountData(accountToken); + const accountData = await this._daemonRpc.getAccountData(accountToken); log.debug('The stored account number still exists', accountData); this._store.dispatch(accountActions.loginSuccessful(accountData.expiry)); @@ -234,12 +229,12 @@ export class Backend { // @TODO: What does it mean for a logout to be successful or failed? try { await this._ensureAuthenticated(); - await this._ipc.setAccount(null); + await this._daemonRpc.setAccount(null); this._store.dispatch(accountActions.loggedOut()); // disconnect user during logout - await this.disconnect(); + await this.disconnectTunnel(); this._store.dispatch(push('/')); } catch (e) { @@ -247,9 +242,9 @@ export class Backend { } } - async connect() { + async connectTunnel() { try { - const currentState = await this._ipc.getState(); + const currentState = await this._daemonRpc.getState(); if (currentState.state === 'secured') { log.debug('Refusing to connect as connection is already secured'); this._store.dispatch(connectionActions.connected()); @@ -259,18 +254,18 @@ export class Backend { this._store.dispatch(connectionActions.connecting()); await this._ensureAuthenticated(); - await this._ipc.connect(); + await this._daemonRpc.connectTunnel(); } catch (e) { log.error('Failed to connect: ', e.message); this._store.dispatch(connectionActions.disconnected()); } } - async disconnect() { + async disconnectTunnel() { // @TODO: Failure modes try { await this._ensureAuthenticated(); - await this._ipc.disconnect(); + await this._daemonRpc.disconnectTunnel(); } catch (e) { log.error('Failed to disconnect: ', e.message); } @@ -279,7 +274,7 @@ export class Backend { async updateRelaySettings(relaySettings: RelaySettingsUpdate) { try { await this._ensureAuthenticated(); - await this._ipc.updateRelaySettings(relaySettings); + await this._daemonRpc.updateRelaySettings(relaySettings); } catch (e) { log.error('Failed to update relay settings: ', e.message); } @@ -288,7 +283,7 @@ export class Backend { async fetchRelaySettings() { await this._ensureAuthenticated(); - const relaySettings = await this._ipc.getRelaySettings(); + const relaySettings = await this._daemonRpc.getRelaySettings(); log.debug('Got relay settings from backend', JSON.stringify(relaySettings)); if (relaySettings.normal) { @@ -339,13 +334,13 @@ export class Backend { } async updateAccountExpiry() { - const ipc = this._ipc; + const ipc = this._daemonRpc; const store = this._store; try { await this._ensureAuthenticated(); - const accountToken = await this._ipc.getAccount(); + const accountToken = await this._daemonRpc.getAccount(); if (!accountToken) { throw new NoAccountError(); } @@ -357,31 +352,22 @@ export class Backend { } } - async removeAccountFromHistory(accountToken: AccountToken) { - try { - await this._ensureAuthenticated(); - await this._ipc.removeAccountFromHistory(accountToken); - await this._fetchAccountHistory(); - } catch (e) { - log.error('Failed to remove account token from history', e.message); - } + async removeAccountFromHistory(accountToken: AccountToken): Promise<void> { + await this._ensureAuthenticated(); + await this._daemonRpc.removeAccountFromHistory(accountToken); + await this.fetchAccountHistory(); } - async _fetchAccountHistory() { - try { - await this._ensureAuthenticated(); - const accountHistory = await this._ipc.getAccountHistory(); - this._store.dispatch(accountActions.updateAccountHistory(accountHistory)); - } catch (e) { - log.info('Failed to fetch account history,', e.message); - throw e; - } + async fetchAccountHistory(): Promise<void> { + await this._ensureAuthenticated(); + const accountHistory = await this._daemonRpc.getAccountHistory(); + this._store.dispatch(accountActions.updateAccountHistory(accountHistory)); } - async _fetchRelayLocations() { + async fetchRelayLocations() { await this._ensureAuthenticated(); - const locations = await this._ipc.getRelayLocations(); + const locations = await this._daemonRpc.getRelayLocations(); log.info('Got relay locations'); @@ -401,10 +387,10 @@ export class Backend { this._store.dispatch(settingsActions.updateRelayLocations(storedLocations)); } - async _fetchLocation() { + async fetchLocation() { await this._ensureAuthenticated(); - const location = await this._ipc.getLocation(); + const location = await this._daemonRpc.getLocation(); log.info('Got location from daemon'); @@ -421,65 +407,148 @@ export class Backend { } async setAllowLan(allowLan: boolean) { - try { - await this._ensureAuthenticated(); - await this._ipc.setAllowLan(allowLan); + await this._ensureAuthenticated(); + await this._daemonRpc.setAllowLan(allowLan); - this._store.dispatch(settingsActions.updateAllowLan(allowLan)); - } catch (e) { - log.error('Failed to change the LAN sharing policy: ', e.message); - } + this._store.dispatch(settingsActions.updateAllowLan(allowLan)); } - async _fetchAllowLan() { + async fetchAllowLan() { await this._ensureAuthenticated(); - const allowLan = await this._ipc.getAllowLan(); + const allowLan = await this._daemonRpc.getAllowLan(); this._store.dispatch(settingsActions.updateAllowLan(allowLan)); } async fetchSecurityState() { await this._ensureAuthenticated(); - const securityState = await this._ipc.getState(); + const securityState = await this._daemonRpc.getState(); const connectionState = this._securityStateToConnectionState(securityState); this._dispatchConnectionState(connectionState); } + async _requestCredentials(): Promise<RpcCredentials> { + try { + return await this._credentialsProvider.request(); + } catch (providerError) { + throw new CredentialsRequestError(providerError); + } + } + + async _connectToDaemon(): Promise<void> { + let credentials; + try { + credentials = await this._requestCredentials(); + } catch (error) { + log.error(`Cannot request the RPC credentials: ${error.message}`); + return; + } + + this._credentials = credentials; + this._daemonRpc.connect(credentials.connectionString); + } + + async _onOpenConnection() { + this._reconnectBackoff.reset(); + + // make sure to re-subscribe to state notifications when connection is re-established. + try { + await this._subscribeStateListener(); + } catch (error) { + log.error(`Cannot subscribe for RPC notifications: ${error.message}`); + } + + this._fetchInitialState(); + } + + async _onCloseConnection(error: ?Error) { + // force to re-authenticate when connection closed + this._authenticationPromise = null; + + if (error) { + log.debug(`Lost connection to daemon: ${error.message}`); + + const recover = async () => { + try { + await this.connect(); + } catch (error) { + log.error(`Failed to reconnect: ${error.message}`); + } + }; + + this._reconnectBackoff.attempt(() => { + recover(); + }); + } + } + /** * Start reachability monitoring for online/offline detection * This is currently done via HTML5 APIs but will be replaced later * with proper backend integration. */ - _startReachability() { + _setupReachability() { + const { online, offline } = bindActionCreators(connectionActions, this._store.dispatch); + window.addEventListener('online', () => { - this._store.dispatch(connectionActions.online()); + online(); }); window.addEventListener('offline', () => { - // force disconnect since there is no real connection anyway. - this.disconnect(); - this._store.dispatch(connectionActions.offline()); + offline(); }); - // update online status in background - setTimeout(() => { - const action = navigator.onLine ? connectionActions.online() : connectionActions.offline(); - - this._store.dispatch(action); - }, 0); + if (navigator.onLine) { + online(); + } else { + offline(); + } } - async _registerIpcListeners() { + async _subscribeStateListener() { await this._ensureAuthenticated(); - this._ipc.registerStateListener((newState) => { - const connectionState = this._securityStateToConnectionState(newState); - log.debug(`Got new state from backend {state: ${newState.state}, \ - target_state: ${newState.target_state}}, translated to '${connectionState}'`); - this._dispatchConnectionState(connectionState); - this.sync(); + await this._daemonRpc.subscribeStateListener((newState, error) => { + if (error) { + log.error(`Received an error when processing the incoming state change: ${error.message}`); + } + + if (newState) { + const connectionState = this._securityStateToConnectionState(newState); + + log.debug( + `Got new state from backend {state: ${newState.state}, target_state: ${ + newState.target_state + }}, translated to '${connectionState}'`, + ); + + this._dispatchConnectionState(connectionState); + this._refreshStateOnChange(); + } }); } + async _fetchInitialState() { + try { + await Promise.all([ + this.fetchSecurityState(), + this.fetchRelaySettings(), + this.fetchRelayLocations(), + this.fetchAllowLan(), + this.fetchLocation(), + ]); + } catch (error) { + log.error(`Cannot prefetch data: ${error.message}`); + } + } + + async _refreshStateOnChange() { + try { + await this.fetchLocation(); + } catch (error) { + log.error(`Failed to fetch the location: ${error.message}`); + } + } + _securityStateToConnectionState(backendState: BackendState): ConnectionState { if (backendState.state === 'unsecured' && backendState.target_state === 'secured') { return 'connecting'; @@ -492,19 +561,44 @@ export class Backend { } _dispatchConnectionState(connectionState: ConnectionState) { + const { connecting, connected, disconnected } = bindActionCreators( + connectionActions, + this._store.dispatch, + ); switch (connectionState) { case 'connecting': - this._store.dispatch(connectionActions.connecting()); + connecting(); break; case 'connected': - this._store.dispatch(connectionActions.connected()); + connected(); break; case 'disconnected': - this._store.dispatch(connectionActions.disconnected()); + disconnected(); break; } } + _rpcErrorToBackendError(e) { + if (e instanceof JsonRpcTransportRemoteError) { + switch (e.code) { + case -200: // Account doesn't exist + return new InvalidAccountError(); + case -32603: // Internal error + // We treat all internal backend errors as the user cannot reach + // api.mullvad.net. This is not always true of course, but it is + // true so often that we choose to disregard the other edge cases + // for now. + return new CommunicationError(); + } + } else if (e instanceof JsonRpcTransportTimeOutError) { + return new CommunicationError(); + } else if (e instanceof NoDaemonError) { + return e; + } + + return new UnknownError(e.message); + } + _ensureAuthenticated(): Promise<void> { const credentials = this._credentials; if (credentials) { @@ -519,11 +613,41 @@ export class Backend { async _authenticate(sharedSecret: string) { try { - await this._ipc.authenticate(sharedSecret); + await this._daemonRpc.authenticate(sharedSecret); log.info('Authenticated with backend'); } catch (e) { - log.error('Failed to authenticate with backend: ', e.message); + log.error(`Failed to authenticate with backend: ${e.message}`); throw e; } } } + +/* + * Used to calculate the time to wait before reconnecting + * the websocket. + * + * It uses a linear backoff function that goes from 500ms + * to 3000ms + */ +class ReconnectionBackoff { + _attempt: number; + + constructor() { + this._attempt = 0; + } + + attempt(handler: () => void) { + setTimeout(handler, this._getIncreasedBackoff()); + } + + reset() { + this._attempt = 0; + } + + _getIncreasedBackoff() { + if (this._attempt < 6) { + this._attempt++; + } + return this._attempt * 500; + } +} diff --git a/app/lib/daemon-rpc.js b/app/lib/daemon-rpc.js new file mode 100644 index 0000000000..2ef3d06e13 --- /dev/null +++ b/app/lib/daemon-rpc.js @@ -0,0 +1,387 @@ +// @flow + +import JsonRpcTransport from './jsonrpc-transport'; +import { + object, + maybe, + string, + number, + boolean, + enumeration, + arrayOf, + oneOf, +} from 'validated/schema'; +import { validate } from 'validated/object'; + +import type { Node as SchemaNode } from 'validated/schema'; + +export type AccountData = { expiry: string }; +export type AccountToken = string; +export type Ip = string; +export type Location = { + ip: Ip, + country: string, + city: ?string, + latitude: number, + longitude: number, + mullvad_exit_ip: boolean, +}; +const LocationSchema = object({ + ip: string, + country: string, + city: maybe(string), + latitude: number, + longitude: number, + mullvad_exit_ip: boolean, +}); + +export type SecurityState = 'secured' | 'unsecured'; +export type BackendState = { + state: SecurityState, + target_state: SecurityState, +}; + +export type RelayProtocol = 'tcp' | 'udp'; +export type RelayLocation = {| city: [string, string] |} | {| country: string |}; + +type OpenVpnParameters = { + port: 'any' | { only: number }, + protocol: 'any' | { only: RelayProtocol }, +}; + +type TunnelOptions<TOpenVpnParameters> = { + openvpn: TOpenVpnParameters, +}; + +type RelaySettingsNormal<TTunnelOptions> = { + location: + | 'any' + | { + only: RelayLocation, + }, + tunnel: + | 'any' + | { + only: TTunnelOptions, + }, +}; + +// types describing the structure of RelaySettings +export type RelaySettingsCustom = { + host: string, + tunnel: { + openvpn: { + port: number, + protocol: RelayProtocol, + }, + }, +}; +export type RelaySettings = + | {| + normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>, + |} + | {| + custom_tunnel_endpoint: RelaySettingsCustom, + |}; + +// types describing the partial update of RelaySettings +export type RelaySettingsNormalUpdate = $Shape< + RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>, +>; +export type RelaySettingsUpdate = + | {| + normal: RelaySettingsNormalUpdate, + |} + | {| + custom_tunnel_endpoint: RelaySettingsCustom, + |}; + +const constraint = <T>(constraintValue: SchemaNode<T>) => { + return oneOf( + string, // any + object({ + only: constraintValue, + }), + ); +}; + +const RelaySettingsSchema = oneOf( + object({ + normal: object({ + location: constraint( + oneOf( + object({ + city: arrayOf(string), + }), + object({ + country: string, + }), + ), + ), + tunnel: constraint( + object({ + openvpn: object({ + port: constraint(number), + protocol: constraint(enumeration('udp', 'tcp')), + }), + }), + ), + }), + }), + object({ + custom_tunnel_endpoint: object({ + host: string, + tunnel: object({ + openvpn: object({ + port: number, + protocol: enumeration('udp', 'tcp'), + }), + }), + }), + }), +); + +export type RelayList = { + countries: Array<RelayListCountry>, +}; + +export type RelayListCountry = { + name: string, + code: string, + cities: Array<RelayListCity>, +}; + +export type RelayListCity = { + name: string, + code: string, + latitude: number, + longitude: number, + has_active_relays: boolean, +}; + +const RelayListSchema = object({ + countries: arrayOf( + object({ + name: string, + code: string, + cities: arrayOf( + object({ + name: string, + code: string, + latitude: number, + longitude: number, + has_active_relays: boolean, + }), + ), + }), + ), +}); + +const AccountDataSchema = object({ + expiry: string, +}); + +const allSecurityStates: Array<SecurityState> = ['secured', 'unsecured']; +const BackendStateSchema = object({ + state: enumeration(...allSecurityStates), + target_state: enumeration(...allSecurityStates), +}); + +export interface DaemonRpcProtocol { + connect(string): void; + disconnect(): void; + getAccountData(AccountToken): Promise<AccountData>; + getRelayLocations(): Promise<RelayList>; + getAccount(): Promise<?AccountToken>; + setAccount(accountToken: ?AccountToken): Promise<void>; + updateRelaySettings(RelaySettingsUpdate): Promise<void>; + getRelaySettings(): Promise<RelaySettings>; + setAllowLan(boolean): Promise<void>; + getAllowLan(): Promise<boolean>; + connectTunnel(): Promise<void>; + disconnectTunnel(): Promise<void>; + getLocation(): Promise<Location>; + getState(): Promise<BackendState>; + subscribeStateListener((state: ?BackendState, error: ?Error) => void): Promise<void>; + addOpenConnectionObserver(() => void): ConnectionObserver; + addCloseConnectionObserver((error: ?Error) => void): ConnectionObserver; + authenticate(sharedSecret: string): Promise<void>; + getAccountHistory(): Promise<Array<AccountToken>>; + removeAccountFromHistory(accountToken: AccountToken): Promise<void>; +} + +export class ResponseParseError extends Error { + _validationError: ?Error; + + constructor(message: string, validationError: ?Error) { + super(message); + this._validationError = validationError; + } + + get validationError(): ?Error { + return this._validationError; + } +} + +export type ConnectionObserver = { + unsubscribe: () => void, +}; + +export class DaemonRpc implements DaemonRpcProtocol { + _transport = new JsonRpcTransport(); + + async authenticate(sharedSecret: string): Promise<void> { + await this._transport.send('auth', sharedSecret); + } + + connect(connectionString: string) { + this._transport.connect(connectionString); + } + + disconnect() { + this._transport.disconnect(); + } + + addOpenConnectionObserver(handler: () => void): ConnectionObserver { + this._transport.on('open', handler); + return { + unsubscribe: () => { + this._transport.off('open', handler); + }, + }; + } + + addCloseConnectionObserver(handler: (error: ?Error) => void): ConnectionObserver { + this._transport.on('close', handler); + return { + unsubscribe: () => { + this._transport.off('close', handler); + }, + }; + } + + async getAccountData(accountToken: AccountToken): Promise<AccountData> { + // send the IPC with 30s timeout since the backend will wait + // for a HTTP request before replying + const response = await this._transport.send('get_account_data', accountToken, 30000); + try { + return validate(AccountDataSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_account_data', error); + } + } + + async getRelayLocations(): Promise<RelayList> { + const response = await this._transport.send('get_relay_locations'); + try { + return validate(RelayListSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_relay_locations', error); + } + } + + async getAccount(): Promise<?AccountToken> { + const response = await this._transport.send('get_account'); + if (response === null || typeof response === 'string') { + return response; + } else { + throw new ResponseParseError('Invalid response from get_account', null); + } + } + + async setAccount(accountToken: ?AccountToken): Promise<void> { + await this._transport.send('set_account', accountToken); + } + + async updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> { + await this._transport.send('update_relay_settings', [relaySettings]); + } + + async getRelaySettings(): Promise<RelaySettings> { + const response = await this._transport.send('get_relay_settings'); + try { + const validatedObject = validate(RelaySettingsSchema, response); + + /* $FlowFixMe: + There is no way to express the constraints with string literals, i.e: + + RelaySettingsSchema constraint: + oneOf(string, object) + + RelaySettings constraint: + 'any' | object + + These two are incompatible so we simply enforce the type for now. + */ + return ((validatedObject: any): RelaySettings); + } catch (e) { + throw new ResponseParseError('Invalid response from get_relay_settings', e); + } + } + + async setAllowLan(allowLan: boolean): Promise<void> { + await this._transport.send('set_allow_lan', [allowLan]); + } + + async getAllowLan(): Promise<boolean> { + const response = await this._transport.send('get_allow_lan'); + if (typeof response === 'boolean') { + return response; + } else { + throw new ResponseParseError('Invalid response from get_allow_lan', null); + } + } + + async connectTunnel(): Promise<void> { + await this._transport.send('connect'); + } + + async disconnectTunnel(): Promise<void> { + await this._transport.send('disconnect'); + } + + async getLocation(): Promise<Location> { + // send the IPC with 30s timeout since the backend will wait + // for a HTTP request before replying + + const response = await this._transport.send('get_current_location', [], 30000); + try { + return validate(LocationSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_current_location', error); + } + } + + async getState(): Promise<BackendState> { + const response = await this._transport.send('get_state'); + try { + return validate(BackendStateSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_state', error); + } + } + + subscribeStateListener(listener: (state: ?BackendState, error: ?Error) => void): Promise<void> { + return this._transport.subscribe('new_state', (payload) => { + try { + const newState = validate(BackendStateSchema, payload); + listener(newState, null); + } catch (error) { + listener(null, new ResponseParseError('Invalid payload from new_state', error)); + } + }); + } + + async getAccountHistory(): Promise<Array<AccountToken>> { + const response = await this._transport.send('get_account_history'); + try { + return validate(arrayOf(string), response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_account_history', null); + } + } + + async removeAccountFromHistory(accountToken: AccountToken): Promise<void> { + await this._transport.send('remove_account_from_history', accountToken); + } +} diff --git a/app/lib/ipc-facade.js b/app/lib/ipc-facade.js deleted file mode 100644 index 103ef4d931..0000000000 --- a/app/lib/ipc-facade.js +++ /dev/null @@ -1,356 +0,0 @@ -// @flow - -import JsonRpcWs, { InvalidReply } from './jsonrpc-ws-ipc'; -import { - object, - maybe, - string, - number, - boolean, - enumeration, - arrayOf, - oneOf, -} from 'validated/schema'; -import { validate } from 'validated/object'; - -import type { Node as SchemaNode } from 'validated/schema'; - -export type AccountData = { expiry: string }; -export type AccountToken = string; -export type Ip = string; -export type Location = { - ip: Ip, - country: string, - city: ?string, - latitude: number, - longitude: number, - mullvad_exit_ip: boolean, -}; -const LocationSchema = object({ - ip: string, - country: string, - city: maybe(string), - latitude: number, - longitude: number, - mullvad_exit_ip: boolean, -}); - -export type SecurityState = 'secured' | 'unsecured'; -export type BackendState = { - state: SecurityState, - target_state: SecurityState, -}; - -export type RelayProtocol = 'tcp' | 'udp'; -export type RelayLocation = {| city: [string, string] |} | {| country: string |}; - -type OpenVpnParameters = { - port: 'any' | { only: number }, - protocol: 'any' | { only: RelayProtocol }, -}; - -type TunnelOptions<TOpenVpnParameters> = { - openvpn: TOpenVpnParameters, -}; - -type RelaySettingsNormal<TTunnelOptions> = { - location: - | 'any' - | { - only: RelayLocation, - }, - tunnel: - | 'any' - | { - only: TTunnelOptions, - }, -}; - -// types describing the structure of RelaySettings -export type RelaySettingsCustom = { - host: string, - tunnel: { - openvpn: { - port: number, - protocol: RelayProtocol, - }, - }, -}; -export type RelaySettings = - | {| - normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>, - |} - | {| - custom_tunnel_endpoint: RelaySettingsCustom, - |}; - -// types describing the partial update of RelaySettings -export type RelaySettingsNormalUpdate = $Shape< - RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>, ->; -export type RelaySettingsUpdate = - | {| - normal: RelaySettingsNormalUpdate, - |} - | {| - custom_tunnel_endpoint: RelaySettingsCustom, - |}; - -const constraint = <T>(constraintValue: SchemaNode<T>) => - oneOf( - string, - object({ - only: constraintValue, - }), - ); - -const RelaySettingsSchema = oneOf( - object({ - normal: object({ - location: constraint( - oneOf( - object({ - city: arrayOf(string), - }), - object({ - country: string, - }), - ), - ), - tunnel: constraint( - object({ - openvpn: object({ - port: constraint(number), - protocol: constraint(enumeration('udp', 'tcp')), - }), - }), - ), - }), - }), - object({ - custom_tunnel_endpoint: object({ - host: string, - tunnel: object({ - openvpn: object({ - port: number, - protocol: enumeration('udp', 'tcp'), - }), - }), - }), - }), -); - -export type RelayList = { - countries: Array<RelayListCountry>, -}; - -export type RelayListCountry = { - name: string, - code: string, - cities: Array<RelayListCity>, -}; - -export type RelayListCity = { - name: string, - code: string, - latitude: number, - longitude: number, - has_active_relays: boolean, -}; - -const RelayListSchema = object({ - countries: arrayOf( - object({ - name: string, - code: string, - cities: arrayOf( - object({ - name: string, - code: string, - latitude: number, - longitude: number, - has_active_relays: boolean, - }), - ), - }), - ), -}); - -export interface IpcFacade { - setConnectionString(string): void; - getAccountData(AccountToken): Promise<AccountData>; - getRelayLocations(): Promise<RelayList>; - getAccount(): Promise<?AccountToken>; - setAccount(accountToken: ?AccountToken): Promise<void>; - updateRelaySettings(RelaySettingsUpdate): Promise<void>; - getRelaySettings(): Promise<RelaySettings>; - setAllowLan(boolean): Promise<void>; - getAllowLan(): Promise<boolean>; - connect(): Promise<void>; - disconnect(): Promise<void>; - getLocation(): Promise<Location>; - getState(): Promise<BackendState>; - registerStateListener((BackendState) => void): void; - setCloseConnectionHandler(() => void): void; - authenticate(sharedSecret: string): Promise<void>; - getAccountHistory(): Promise<Array<AccountToken>>; - removeAccountFromHistory(accountToken: AccountToken): Promise<void>; -} - -export class RealIpc implements IpcFacade { - _ipc: JsonRpcWs; - - constructor(connectionString: string) { - this._ipc = new JsonRpcWs(connectionString); - } - - setConnectionString(str: string) { - this._ipc.setConnectionString(str); - } - - getAccountData(accountToken: AccountToken): Promise<AccountData> { - // send the IPC with 30s timeout since the backend will wait - // for a HTTP request before replying - - return this._ipc.send('get_account_data', accountToken, 30000).then((raw) => { - if (typeof raw === 'object' && raw && raw.expiry) { - return raw; - } else { - throw new InvalidReply(raw, 'Expected an object with expiry'); - } - }); - } - - async getRelayLocations(): Promise<RelayList> { - const raw = await this._ipc.send('get_relay_locations'); - try { - const validated: any = validate(RelayListSchema, raw); - return (validated: RelayList); - } catch (e) { - throw new InvalidReply(raw, e); - } - } - - getAccount(): Promise<?AccountToken> { - return this._ipc.send('get_account').then((raw) => { - if (raw === undefined || raw === null || typeof raw === 'string') { - return raw; - } else { - throw new InvalidReply(raw); - } - }); - } - - setAccount(accountToken: ?AccountToken): Promise<void> { - return this._ipc.send('set_account', accountToken).then(this._ignoreResponse); - } - - _ignoreResponse(_response: mixed): void { - return; - } - - updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> { - return this._ipc.send('update_relay_settings', [relaySettings]).then(this._ignoreResponse); - } - - getRelaySettings(): Promise<RelaySettings> { - return this._ipc.send('get_relay_settings').then((raw) => { - try { - const validated: any = validate(RelaySettingsSchema, raw); - return (validated: RelaySettings); - } catch (e) { - throw new InvalidReply(raw, e); - } - }); - } - - setAllowLan(allowLan: boolean): Promise<void> { - return this._ipc.send('set_allow_lan', [allowLan]).then(this._ignoreResponse); - } - - async getAllowLan(): Promise<boolean> { - const raw = await this._ipc.send('get_allow_lan'); - if (typeof raw === 'boolean') { - return raw; - } else { - throw new InvalidReply(raw, 'Expected a boolean'); - } - } - - connect(): Promise<void> { - return this._ipc.send('connect').then(this._ignoreResponse); - } - - disconnect(): Promise<void> { - return this._ipc.send('disconnect').then(this._ignoreResponse); - } - - getLocation(): Promise<Location> { - // send the IPC with 30s timeout since the backend will wait - // for a HTTP request before replying - - return this._ipc.send('get_current_location', [], 30000).then((raw) => { - try { - const validated: any = validate(LocationSchema, raw); - return (validated: Location); - } catch (e) { - throw new InvalidReply(raw, e); - } - }); - } - - getState(): Promise<BackendState> { - return this._ipc.send('get_state').then((raw) => { - return this._parseBackendState(raw); - }); - } - - _parseBackendState(raw: mixed): BackendState { - if (raw && raw.state && raw.target_state) { - const uncheckedRaw: any = raw; - - const states: Array<SecurityState> = ['secured', 'unsecured']; - const correctState = states.includes(uncheckedRaw.state); - const correctTargetState = states.includes(uncheckedRaw.target_state); - - if (!correctState || !correctTargetState) { - throw new InvalidReply(raw); - } - - return (uncheckedRaw: BackendState); - } else { - throw new InvalidReply(raw); - } - } - - registerStateListener(listener: (BackendState) => void) { - this._ipc.on('new_state', (rawEvent) => { - const parsedEvent: BackendState = this._parseBackendState(rawEvent); - - listener(parsedEvent); - }); - } - - setCloseConnectionHandler(handler: () => void) { - this._ipc.setCloseConnectionHandler(handler); - } - - authenticate(sharedSecret: string): Promise<void> { - return this._ipc.send('auth', sharedSecret).then(this._ignoreResponse); - } - - getAccountHistory(): Promise<Array<AccountToken>> { - return this._ipc.send('get_account_history').then((raw) => { - if (Array.isArray(raw) && raw.every((i) => typeof i === 'string')) { - const checked: any = raw; - return (checked: Array<AccountToken>); - } else { - throw new InvalidReply(raw, 'Expected an array of strings'); - } - }); - } - - removeAccountFromHistory(accountToken: AccountToken): Promise<void> { - return this._ipc.send('remove_account_from_history', accountToken).then(this._ignoreResponse); - } -} diff --git a/app/lib/jsonrpc-transport.js b/app/lib/jsonrpc-transport.js new file mode 100644 index 0000000000..2fa98df988 --- /dev/null +++ b/app/lib/jsonrpc-transport.js @@ -0,0 +1,326 @@ +// @flow + +import { EventEmitter } from 'events'; +import jsonrpc from 'jsonrpc-lite'; +import uuid from 'uuid'; +import { log } from '../lib/platform'; + +export type UnansweredRequest = { + resolve: (mixed) => void, + reject: (mixed) => void, + timerId: TimeoutID, + message: Object, +}; + +export type JsonRpcErrorResponse = { + type: 'error', + payload: { + id: string, + error: { + code: number, + message: string, + }, + }, +}; +export type JsonRpcNotification = { + type: 'notification', + payload: { + method: string, + params: { + subscription: string, + result: mixed, + }, + }, +}; +export type JsonRpcSuccess = { + type: 'success', + payload: { + id: string, + result: mixed, + }, +}; +export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess; + +export class RemoteError extends Error { + _code: number; + _details: string; + + constructor(code: number, details: string) { + super(`Remote JSON-RPC error ${code}: ${details}`); + this._code = code; + this._details = details; + } + + get code(): number { + return this._code; + } + + get details(): string { + return this._details; + } +} + +export class TimeOutError extends Error { + _jsonRpcMessage: Object; + + constructor(jsonRpcMessage: Object) { + super('Request timed out'); + + this._jsonRpcMessage = jsonRpcMessage; + } + + get jsonRpcMessage(): Object { + return this._jsonRpcMessage; + } +} + +export class SubscriptionError extends Error { + _reply: mixed; + + constructor(message: string, reply: mixed) { + const replyString = JSON.stringify(reply); + + super(`${message}: ${replyString}`); + + this._reply = reply; + } + + get reply(): mixed { + return this._reply; + } +} + +export class ConnectionError extends Error { + _code: number; + + constructor(code: number) { + super(ConnectionError.reason(code)); + this._code = code; + } + + get code(): number { + return this._code; + } + + static reason(code: number): string { + switch (code) { + case 1006: + return 'Abnormal closure'; + case 1011: + return 'Internal error'; + case 1012: + return 'Service restart'; + case 1014: + return 'Bad gateway'; + default: + return `Unknown (${code})`; + } + } +} + +const DEFAULT_TIMEOUT_MILLIS = 5000; + +export default class JsonRpcTransport extends EventEmitter { + _unansweredRequests: Map<string, UnansweredRequest> = new Map(); + _subscriptions: Map<string | number, (mixed) => void> = new Map(); + _websocket: ?WebSocket; + _websocketFactory: (string) => WebSocket; + + constructor(websocketFactory: ?(string) => WebSocket) { + super(); + this._websocketFactory = + websocketFactory || ((connectionString) => new WebSocket(connectionString)); + } + + /// Connect websocket + connect(connectionString: string) { + this.disconnect(); + + log.info('Connecting to websocket', connectionString); + + const websocket = this._websocketFactory(connectionString); + + websocket.onopen = () => { + log.info('Websocket is connected'); + this.emit('open'); + }; + + websocket.onmessage = (event) => { + const data = event.data; + if (typeof data === 'string') { + this._onMessage(data); + } else { + log.error('Got invalid reply from the server', event); + } + }; + + websocket.onclose = (event) => { + log.info(`The websocket connection closed with code: ${event.code}`); + + // Remove all subscriptions since they are connection based + this._subscriptions.clear(); + + // 1000 is a code used for normal connection closure. + const connectionError = event.code === 1000 ? null : new ConnectionError(event.code); + + this.emit('close', connectionError); + }; + + this._websocket = websocket; + } + + disconnect() { + if (this._websocket) { + this._websocket.close(); + this._websocket = null; + } + } + + async subscribe(event: string, listener: (mixed) => void): Promise<*> { + log.silly(`Adding a listener to ${event}`); + + try { + const subscriptionId = await this.send(`${event}_subscribe`); + if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') { + this._subscriptions.set(subscriptionId, listener); + } else { + throw new SubscriptionError( + 'The subscription id was not a string or a number', + subscriptionId, + ); + } + } catch (e) { + log.error(`Failed adding listener to ${event}: ${e.message}`); + throw e; + } + } + + async send( + action: string, + data: mixed, + timeout: number = DEFAULT_TIMEOUT_MILLIS, + ): Promise<mixed> { + let socket: WebSocket; + try { + socket = await this._getWebSocket(); + } catch (error) { + throw error; + } + + return new Promise((resolve, reject) => { + const id = uuid.v4(); + const payload = this._prepareParams(data); + const timerId = setTimeout(() => this._onTimeout(id), timeout); + const message = jsonrpc.request(id, action, payload); + this._unansweredRequests.set(id, { + resolve, + reject, + timerId, + message, + }); + + try { + log.silly('Sending message', id, action); + socket.send(JSON.stringify(message)); + } catch (error) { + log.error(`Failed sending RPC message "${action}": ${error.message}`); + throw error; + } + }); + } + + _prepareParams(data: mixed): Array<mixed> | Object { + // JSONRPC only accepts arrays and objects as params, but + // this isn't very nice to use, so this method wraps other + // types in an array. The choice of array is based on try-and-error + + if (data === undefined) { + return []; + } else if (data === null) { + return [null]; + } else if (Array.isArray(data) || typeof data === 'object') { + return data; + } else { + return [data]; + } + } + + _getWebSocket(): Promise<WebSocket> { + if (this._websocket && this._websocket.readyState === 1) { + return Promise.resolve(this._websocket); + } else { + return new Promise((resolve, reject) => { + log.debug('Waiting for websocket to connect'); + + this.once('open', () => { + const ws = this._websocket; + if (ws) { + resolve(ws); + } else { + reject(new Error('Internal error')); + } + }); + }); + } + } + + _onTimeout(requestId) { + const request = this._unansweredRequests.get(requestId); + + this._unansweredRequests.delete(requestId); + + if (request) { + log.warn(`Request ${requestId} timed out: `, request.message); + request.reject(new TimeOutError(request.message)); + } else { + log.warn(`Request ${requestId} timed out but it seems to already have been answered`); + } + } + + _onMessage(message: string) { + const result = jsonrpc.parse(message); + const messages = Array.isArray(result) ? result : [result]; + + for (const message of messages) { + if (message.type === 'notification') { + this._onNotification(message); + } else { + this._onReply(message); + } + } + } + + _onNotification(message: JsonRpcNotification) { + const subscriptionId = message.payload.params.subscription; + const listener = this._subscriptions.get(subscriptionId); + + if (listener) { + log.silly('Got notification', message.payload.method, message.payload.params.result); + listener(message.payload.params.result); + } else { + log.warn('Got notification for', message.payload.method, 'but no one is listening for it'); + } + } + + _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) { + const id = message.payload.id; + const request = this._unansweredRequests.get(id); + this._unansweredRequests.delete(id); + + if (request) { + log.silly('Got answer to', id, message.type); + + clearTimeout(request.timerId); + + if (message.type === 'error') { + const error = message.payload.error; + request.reject(new RemoteError(error.code, error.message)); + } else { + const reply = message.payload.result; + request.resolve(reply); + } + } else { + log.warn(`Got reply to ${id} but no one was waiting for it`); + } + } +} diff --git a/app/lib/jsonrpc-ws-ipc.js b/app/lib/jsonrpc-ws-ipc.js deleted file mode 100644 index 84dec96c77..0000000000 --- a/app/lib/jsonrpc-ws-ipc.js +++ /dev/null @@ -1,312 +0,0 @@ -// @flow - -import jsonrpc from 'jsonrpc-lite'; -import uuid from 'uuid'; -import { log } from '../lib/platform'; - -export type UnansweredRequest = { - resolve: (mixed) => void, - reject: (mixed) => void, - timerId: TimeoutID, - message: Object, -}; - -export type JsonRpcErrorResponse = { - type: 'error', - payload: { - id: string, - error: { - code: number, - message: string, - }, - }, -}; -export type JsonRpcNotification = { - type: 'notification', - payload: { - method: string, - params: { - subscription: string, - result: mixed, - }, - }, -}; -export type JsonRpcSuccess = { - type: 'success', - payload: { - id: string, - result: mixed, - }, -}; -export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess; - -export class JsonRpcError extends Error { - _code: number; - _details: string; - - constructor(code: number, details: string) { - super(`Remote JSON-RPC error ${code}: ${details}`); - this._code = code; - this._details = details; - } - - get code(): number { - return this._code; - } - - get details(): string { - return this._details; - } -} - -export class TimeOutError extends Error { - jsonRpcMessage: Object; - - constructor(jsonRpcMessage: Object) { - super('Request timed out'); - this.name = 'TimeOutError'; - this.jsonRpcMessage = jsonRpcMessage; - } -} - -export class InvalidReply extends Error { - reply: mixed; - - constructor(reply: mixed, msg: ?string) { - super(msg); - this.name = 'InvalidReply'; - this.reply = reply; - - if (msg) { - this.message = msg + ' - '; - } - this.message += JSON.stringify(reply); - } -} - -const DEFAULT_TIMEOUT_MILLIS = 5000; - -export default class Ipc { - _connectionString: ?string; - _onConnect: Array<{ resolve: () => void }>; - _unansweredRequests: Map<string, UnansweredRequest>; - _subscriptions: Map<string | number, (mixed) => void>; - _websocket: WebSocket; - _backoff: ReconnectionBackoff; - _websocketFactory: (string) => WebSocket; - _closeConnectionHandler: ?() => void; - - constructor(connectionString: string, websocketFactory: ?(string) => WebSocket) { - this._connectionString = connectionString; - this._onConnect = []; - this._unansweredRequests = new Map(); - this._subscriptions = new Map(); - this._websocketFactory = - websocketFactory || ((connectionString) => new WebSocket(connectionString)); - - this._backoff = new ReconnectionBackoff(); - this._reconnect(); - } - - setConnectionString(str: string) { - this._connectionString = str; - } - - setCloseConnectionHandler(handler: ?() => void) { - this._closeConnectionHandler = handler; - } - - async on(event: string, listener: (mixed) => void): Promise<*> { - log.silly(`Adding a listener to ${event}`); - try { - const subscriptionId = await this.send(`${event}_subscribe`); - if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') { - this._subscriptions.set(subscriptionId, listener); - } else { - throw new InvalidReply(subscriptionId, 'The subscription id was not a string or a number'); - } - } catch (e) { - log.error(`Failed adding listener to ${event}: ${e.message}`); - } - } - - send(action: string, data: mixed, timeout: number = DEFAULT_TIMEOUT_MILLIS): Promise<mixed> { - return new Promise(async (resolve, reject) => { - const id = uuid.v4(); - - const params = this._prepareParams(data); - const timerId = setTimeout(() => this._onTimeout(id), timeout); - const jsonrpcMessage = jsonrpc.request(id, action, params); - this._unansweredRequests.set(id, { - resolve: resolve, - reject: reject, - timerId: timerId, - message: jsonrpcMessage, - }); - - try { - const ws = await this._getWebSocket(); - log.silly('Sending message', id, action); - ws.send(jsonrpcMessage); - } catch (e) { - log.error(`Failed sending RPC message "${action}": ${e.message}`); - reject(e); - } - }); - } - - _prepareParams(data: mixed): Array<mixed> | Object { - // JSONRPC only accepts arrays and objects as params, but - // this isn't very nice to use, so this method wraps other - // types in an array. The choice of array is based on try-and-error - - if (data === undefined) { - return []; - } else if (data === null) { - return [null]; - } else if (Array.isArray(data) || typeof data === 'object') { - return data; - } else { - return [data]; - } - } - - _getWebSocket(): Promise<WebSocket> { - return new Promise((resolve) => { - if (this._websocket && this._websocket.readyState === 1) { - resolve(this._websocket); - } else { - log.debug('Waiting for websocket to connect'); - this._onConnect.push({ - resolve: () => resolve(this._websocket), - }); - } - }); - } - - _onTimeout(requestId) { - const request = this._unansweredRequests.get(requestId); - this._unansweredRequests.delete(requestId); - - if (!request) { - log.warn(requestId, 'timed out but it seems to already have been answered'); - return; - } - - log.warn(request.message, 'timed out'); - request.reject(new TimeOutError(request.message)); - } - - _onMessage(message: string) { - const json = JSON.parse(message); - const c = jsonrpc.parseObject(json); - - if (c.type === 'notification') { - this._onNotification(c); - } else { - this._onReply(c); - } - } - - _onNotification(message: JsonRpcNotification) { - const subscriptionId = message.payload.params.subscription; - const listener = this._subscriptions.get(subscriptionId); - - if (listener) { - log.silly('Got notification', message.payload.method, message.payload.params.result); - listener(message.payload.params.result); - } else { - log.warn('Got notification for', message.payload.method, 'but no one is listening for it'); - } - } - - _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) { - const id = message.payload.id; - const request = this._unansweredRequests.get(id); - this._unansweredRequests.delete(id); - - if (!request) { - log.warn('Got reply to', id, 'but no one was waiting for it'); - return; - } - - log.silly('Got answer to', id, message.type); - - clearTimeout(request.timerId); - - if (message.type === 'error') { - const error = message.payload.error; - request.reject(new JsonRpcError(error.code, error.message)); - } else { - const reply = message.payload.result; - request.resolve(reply); - } - } - - _reconnect() { - const connectionString = this._connectionString; - if (!connectionString) return; - - log.info('Connecting to websocket', connectionString); - this._websocket = this._websocketFactory(connectionString); - - this._websocket.onopen = () => { - log.info('Websocket is connected'); - this._backoff.successfullyConnected(); - - while (this._onConnect.length > 0) { - this._onConnect.pop().resolve(); - } - }; - - this._websocket.onmessage = (evt) => { - const data = evt.data; - if (typeof data === 'string') { - this._onMessage(data); - } else { - log.error('Got invalid reply from the server', evt); - } - }; - - this._websocket.onclose = () => { - if (this._closeConnectionHandler) { - this._closeConnectionHandler(); - } - - const delay = this._backoff.getIncreasedBackoff(); - log.warn( - 'The websocket connetion closed, attempting to reconnect it in', - delay, - 'milliseconds', - ); - setTimeout(() => this._reconnect(), delay); - }; - } -} - -/* - * Used to calculate the time to wait before reconnecting - * the websocket. - * - * It uses a linear backoff function that goes from 500ms - * to 3000ms - */ -class ReconnectionBackoff { - _attempt: number; - - constructor() { - this._attempt = 0; - } - - successfullyConnected() { - this._attempt = 0; - } - - getIncreasedBackoff() { - if (this._attempt < 6) { - this._attempt++; - } - - return this._attempt * 500; - } -} diff --git a/app/lib/relay-settings-builder.js b/app/lib/relay-settings-builder.js index 0386d1ef4f..5f6c279e3f 100644 --- a/app/lib/relay-settings-builder.js +++ b/app/lib/relay-settings-builder.js @@ -6,7 +6,7 @@ import type { RelaySettingsUpdate, RelaySettingsNormalUpdate, RelaySettingsCustom, -} from './ipc-facade'; +} from './daemon-rpc'; type LocationBuilder<Self> = { country: (country: string) => Self, diff --git a/app/lib/rpc-address-file.js b/app/lib/rpc-address-file.js index 5185ddc749..6eec716591 100644 --- a/app/lib/rpc-address-file.js +++ b/app/lib/rpc-address-file.js @@ -29,7 +29,7 @@ export class RpcAddressFile { return this._filePath; } - poll(): Promise<void> { + waitUntilExists(): Promise<void> { let promise = this._pollPromise; if (!promise) { @@ -52,20 +52,7 @@ export class RpcAddressFile { return promise; } - isTrusted() { - const filePath = this._filePath; - switch (process.platform) { - case 'win32': - return isOwnedByLocalSystem(filePath); - case 'darwin': - case 'linux': - return isOwnedAndOnlyWritableByRoot(filePath); - default: - throw new Error(`Unknown platform: ${process.platform}`); - } - } - - async waitUntilExists(): Promise<RpcCredentials> { + async parse(): Promise<RpcCredentials> { const data = await fsReadFileAsync(this._filePath, 'utf8'); const [connectionString, sharedSecret] = data.split('\n', 2); @@ -78,6 +65,19 @@ export class RpcAddressFile { throw new Error('Cannot parse the RPC address file'); } } + + isTrusted() { + const filePath = this._filePath; + switch (process.platform) { + case 'win32': + return isOwnedByLocalSystem(filePath); + case 'darwin': + case 'linux': + return isOwnedAndOnlyWritableByRoot(filePath); + default: + throw new Error(`Unknown platform: ${process.platform}`); + } + } } function getRpcAddressFilePath() { diff --git a/app/main.js b/app/main.js index 3a10f6808c..ee685e819c 100644 --- a/app/main.js +++ b/app/main.js @@ -163,7 +163,7 @@ const ApplicationMain = { ipcMain.on('daemon-connection', async (event) => { const addressFile = new RpcAddressFile(); - log.debug(`Reading the RPC connection info from "${addressFile.filePath}"`); + log.debug(`Waiting for RPC address file: "${addressFile.filePath}"`); try { await addressFile.waitUntilExists(); @@ -192,7 +192,7 @@ const ApplicationMain = { log.debug('Read RPC connection info', credentials.connectionString); - event.sender.send('daemon-connection', { credentials }); + event.sender.send('daemon-connection', credentials); } catch (error) { log.error(`Cannot parse the RPC address file: ${error.message}`); return; diff --git a/app/redux/account/actions.js b/app/redux/account/actions.js index c778b111fd..df25c1e823 100644 --- a/app/redux/account/actions.js +++ b/app/redux/account/actions.js @@ -1,6 +1,6 @@ // @flow -import type { AccountToken } from '../../lib/ipc-facade'; +import type { AccountToken } from '../../lib/daemon-rpc'; import type { Backend } from '../../lib/backend'; type StartLoginAction = { diff --git a/app/redux/account/reducers.js b/app/redux/account/reducers.js index 094864c9a8..dbd854a6c0 100644 --- a/app/redux/account/reducers.js +++ b/app/redux/account/reducers.js @@ -1,7 +1,7 @@ // @flow import type { ReduxAction } from '../store'; -import type { AccountToken } from '../../lib/ipc-facade'; +import type { AccountToken } from '../../lib/daemon-rpc'; export type LoginState = 'none' | 'logging in' | 'failed' | 'ok'; export type AccountReduxState = { diff --git a/app/redux/connection/actions.js b/app/redux/connection/actions.js index b4f1df09be..e16a50e47f 100644 --- a/app/redux/connection/actions.js +++ b/app/redux/connection/actions.js @@ -4,10 +4,10 @@ import { Clipboard } from 'reactxp'; import type { Backend } from '../../lib/backend'; import type { ReduxThunk } from '../store'; -import type { Ip } from '../../lib/ipc-facade'; +import type { Ip } from '../../lib/daemon-rpc'; -const connect = (backend: Backend): ReduxThunk => () => backend.connect(); -const disconnect = (backend: Backend) => () => backend.disconnect(); +const connect = (backend: Backend): ReduxThunk => () => backend.connectTunnel(); +const disconnect = (backend: Backend) => () => backend.disconnectTunnel(); const copyIPAddress = (): ReduxThunk => { return (_, getState) => { const ip = getState().connection.ip; diff --git a/app/redux/connection/reducers.js b/app/redux/connection/reducers.js index a50fbf0757..c75d0f2a8f 100644 --- a/app/redux/connection/reducers.js +++ b/app/redux/connection/reducers.js @@ -1,7 +1,7 @@ // @flow import type { ReduxAction } from '../store'; -import type { Ip } from '../../lib/ipc-facade'; +import type { Ip } from '../../lib/daemon-rpc'; export type ConnectionState = 'disconnected' | 'connecting' | 'connected'; export type ConnectionReduxState = { diff --git a/app/redux/settings/reducers.js b/app/redux/settings/reducers.js index f559c1f725..4a25bf9502 100644 --- a/app/redux/settings/reducers.js +++ b/app/redux/settings/reducers.js @@ -1,7 +1,7 @@ // @flow import type { ReduxAction } from '../store'; -import type { RelayProtocol, RelayLocation } from '../../lib/ipc-facade'; +import type { RelayProtocol, RelayLocation } from '../../lib/daemon-rpc'; export type RelaySettingsRedux = | {| diff --git a/package.json b/package.json index 3c2a6182de..75e9382085 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "eslint-plugin-react": "^7.9.1", "flow-bin": "^0.66.0", "flow-typed": "^2.4.0", + "mock-socket": "^7.1.0", "npm-run-all": "^4.0.1", "prettier": "1.13.3", "redux-mock-store": "^1.3.0", diff --git a/test/auth.spec.js b/test/auth.spec.js index e6b2c25486..4f919dff87 100644 --- a/test/auth.spec.js +++ b/test/auth.spec.js @@ -8,30 +8,34 @@ describe('authentication', () => { it('authenticates before ipc call if unauthenticated', (done) => { const { store, mockIpc } = setupIpcAndStore(); + const credentials = { + connectionString: 'ws://localhost:1234/', + sharedSecret: '1234', + }; + const chain = new IpcChain(mockIpc); - chain.onSuccessOrFailure(done); chain.expect('authenticate').withInputValidation((secret) => { expect(secret).to.equal(credentials.sharedSecret); }); chain.expect('connect'); + chain.end(done); - const credentials = { - sharedSecret: '', - connectionString: '', - }; - const backend = new Backend(store, credentials, mockIpc); - backend.connect(); + const backend = new Backend(store, mockIpc); + backend.connect(credentials); + + backend.connectTunnel(); }); it('reauthenticates on reconnect', async () => { const { mockIpc, backend } = setupBackendAndStore(); mockIpc.authenticate = spy(mockIpc.authenticate); + await mockIpc.connectTunnel(); mockIpc.killWebSocket(); expect(mockIpc.authenticate).to.not.have.been.called(); - await backend.connect(); + await backend.connectTunnel(); expect(mockIpc.authenticate).to.have.been.called.once; }); }); diff --git a/test/autologin.spec.js b/test/autologin.spec.js index 60c5cc5bde..505ad285c0 100644 --- a/test/autologin.spec.js +++ b/test/autologin.spec.js @@ -16,7 +16,7 @@ describe('autologin', () => { expect(num).to.equal(randomAccountToken); }); - chain.onSuccessOrFailure(done); + chain.end(done); backend.autologin(); }); diff --git a/test/connect.spec.js b/test/connect.spec.js index f1621dbeb6..409a04c0fb 100644 --- a/test/connect.spec.js +++ b/test/connect.spec.js @@ -7,7 +7,7 @@ describe('connect', () => { it("should set the connection state to 'disconnected' on failed attempts", (done) => { const { store, mockIpc, backend } = setupBackendAndStore(); - mockIpc.connect = () => new Promise((_, reject) => reject('Some error')); + mockIpc.connectTunnel = () => new Promise((_, reject) => reject('Some error')); store.dispatch(connectionActions.connected()); @@ -23,7 +23,7 @@ describe('connect', () => { it('should update the state with the server address', () => { const { store, backend } = setupBackendAndStore(); - return backend.connect().then(() => { + return backend.connectTunnel().then(() => { const state = store.getState().connection; expect(state.status).to.equal('connecting'); }); diff --git a/test/helpers/IpcChain.js b/test/helpers/IpcChain.js index 1505621cc0..547851fdcf 100644 --- a/test/helpers/IpcChain.js +++ b/test/helpers/IpcChain.js @@ -1,7 +1,5 @@ // @flow -import { check, failFast } from './ipc-helpers'; - export class IpcChain { _expectedCalls: Array<string>; _recordedCalls: Array<string>; @@ -39,12 +37,11 @@ export class IpcChain { const inputValidation = step.inputValidation; if (inputValidation) { - const failedInputValidation = failFast(() => { + try { inputValidation(...args); - }, this._done); - - if (failedInputValidation) { + } catch (error) { this._abort(); + this._done(error); return; } } @@ -65,16 +62,19 @@ export class IpcChain { } _ensureChainCalledCorrectly() { - check(() => { + try { expect(this._expectedCalls).to.deep.equal(this._recordedCalls); - }, this._done); + this._done(); + } catch (error) { + this._done(error); + } } _registerCall(ipcCall: string) { this._recordedCalls.push(ipcCall); } - onSuccessOrFailure(done: (*) => void) { + end(done: (?Error) => void) { this._done = done; } } diff --git a/test/helpers/ipc-helpers.js b/test/helpers/ipc-helpers.js index 4466c06eb2..5bfe1d0267 100644 --- a/test/helpers/ipc-helpers.js +++ b/test/helpers/ipc-helpers.js @@ -12,7 +12,6 @@ type Check = () => void; export function setupIpcAndStore() { const memoryHistory = createMemoryHistory(); const store = configureStore(null, memoryHistory); - const mockIpc = newMockIpc(); return { store, mockIpc }; @@ -20,12 +19,7 @@ export function setupIpcAndStore() { export function setupBackendAndStore() { const { store, mockIpc } = setupIpcAndStore(); - - const credentials = { - sharedSecret: '', - connectionString: '', - }; - const backend = new Backend(store, credentials, mockIpc); + const backend = new Backend(store, mockIpc); return { store, mockIpc, backend }; } @@ -33,11 +27,7 @@ export function setupBackendAndStore() { export function setupBackendAndMockStore() { const store = mockStore(_initialState()); const mockIpc = newMockIpc(); - const credentials = { - sharedSecret: '', - connectionString: '', - }; - const backend = new Backend(store, credentials, mockIpc); + const backend = new Backend(store, mockIpc); return { store, mockIpc, backend }; } diff --git a/test/ipc.spec.js b/test/ipc.spec.js deleted file mode 100644 index 1c8b19b5a6..0000000000 --- a/test/ipc.spec.js +++ /dev/null @@ -1,134 +0,0 @@ -// @flow - -import Ipc from '../app/lib/jsonrpc-ws-ipc'; -import jsonrpc from 'jsonrpc-lite'; -import type { JsonRpcMessage } from '../app/lib/jsonrpc-ws-ipc'; - -describe('The IPC server', () => { - it('should send as soon as the websocket connects', () => { - const { ws, ipc } = setupIpc(); - ws.close(); - - let sent = false; - const p = ipc.send('hello').then(() => { - expect(sent).to.be.true; - }); - - ws.on('hello', (msg) => { - sent = true; - - ws.replyOk(msg.id); - }); - ws.acceptConnection(); - - return p; - }); - - it('should reject failed jsonrpc requests', () => { - const { ws, ipc } = setupIpc(); - ws.on('WHAT_IS_THIS', (msg) => { - ws.replyFail(msg.id, 'Method not found', -32601); - }); - - return ipc.send('WHAT_IS_THIS').catch((e) => { - expect(e.code).to.equal(-32601); - expect(e.message).to.contain('Method not found'); - }); - }); - - it('should route reply to correct promise', () => { - const { ws, ipc } = setupIpc(); - - ws.on('a message', (msg) => ws.replyOk(msg.id, 'a reply')); - - const decoy = ipc - .send('a decoy', [], 1) - .then(() => { - throw new Error('Should not be called'); - }) - .catch((e) => { - if (e.name !== 'TimeOutError') { - throw e; - } - }); - const message = ipc.send('a message', [], 1).then((reply) => expect(reply).to.equal('a reply')); - - return Promise.all([message, decoy]); - }); - - it('should timeout if no response is returned', () => { - const { ipc } = setupIpc(); - - return ipc.send('a message', [], 1).catch((e) => { - expect(e.name).to.equal('TimeOutError'); - expect(e.message).to.contain('timed out'); - }); - }); - - it('should route notifications', (done) => { - const { ws, ipc } = setupIpc(); - - const eventListener = (event) => { - try { - expect(event).to.equal('an event!'); - done(); - } catch (ex) { - done(ex); - } - }; - - ws.on('event_subscribe', (msg) => ws.replyOk(msg.id, 1)); - ipc - .on('event', eventListener) - .then(() => { - ws.reply(jsonrpc.notification('event', { subscription: 1, result: 'an event!' })); - }) - .catch((e) => done(e)); - }); -}); - -function mockWebsocket() { - const ws: any = { - listeners: {}, - readyState: 1, - }; - - ws.on = (event, listener) => (ws.listeners[event] = listener); - ws.send = (data) => { - const listener = ws.listeners[data.method]; - if (listener) { - listener(data); - } - }; - - ws.factory = () => ws; - - ws.acceptConnection = () => { - ws.readyState = 1; - ws.onopen(); - }; - ws.close = () => { - ws.readyState = 3; - ws.onclose(); - }; - - ws.reply = (msg: JsonRpcMessage) => { - ws.onmessage({ data: JSON.stringify(msg) }); - }; - ws.replyOk = (id: string, msg) => { - ws.reply(jsonrpc.success(id, msg || '')); - }; - ws.replyFail = (id: string, msg: string, code: number) => { - ws.reply(jsonrpc.error(id, new jsonrpc.JsonRpcError(msg, code))); - }; - - return ws; -} - -function setupIpc() { - const ws = mockWebsocket(); - return { - ws: ws, - ipc: new Ipc('1.2.3.4', ws.factory), - }; -} diff --git a/test/jsonrpc-transport.spec.js b/test/jsonrpc-transport.spec.js new file mode 100644 index 0000000000..124bba8f00 --- /dev/null +++ b/test/jsonrpc-transport.spec.js @@ -0,0 +1,144 @@ +// @flow + +import JsonRpcTransport, { + TimeOutError as JsonRpcTransportTimeOutError, +} from '../app/lib/jsonrpc-transport'; +import jsonrpc from 'jsonrpc-lite'; +import { Server, WebSocket as MockWebSocket } from 'mock-socket'; + +describe('JSON RPC transport', () => { + const WEBSOCKET_URL = 'ws://localhost:8080'; + let server: Server, transport: JsonRpcTransport; + + beforeEach(() => { + server = new Server(WEBSOCKET_URL); + transport = new JsonRpcTransport((s) => new MockWebSocket(s)); + }); + + afterEach(() => { + server.close(); + }); + + it('should send as soon as the websocket connects', (done) => { + server.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + + if (payload.method === 'hello') { + server.send(JSON.stringify(jsonrpc.success(payload.id, 'ok'))); + } + }); + + transport + .send('hello') + .then(() => { + done(); + }) + .catch((error) => { + done(error); + }); + + transport.connect(WEBSOCKET_URL); + }); + + it('should reject failed jsonrpc requests', (done) => { + server.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + + if (payload.method === 'invalid-method') { + server.send( + JSON.stringify( + jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)), + ), + ); + } + }); + + transport.send('invalid-method').catch((error) => { + try { + expect(error.code).to.equal(-32601); + expect(error.message).to.contain('Method not found'); + done(); + } catch (error) { + done(error); + } + }); + + transport.connect(WEBSOCKET_URL); + }); + + it('should route reply to correct promise', () => { + server.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + + if (payload.method === 'a message') { + server.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply'))); + } + }); + + const decoy = transport + .send('a decoy', [], 100) + .then(() => { + throw new Error('Should not be called'); + }) + .catch((error) => { + expect(error).to.be.an.instanceof(JsonRpcTransportTimeOutError); + }); + + const message = transport.send('a message', [], 100).then((reply) => { + expect(reply).to.equal('a reply'); + }); + + transport.connect(WEBSOCKET_URL); + + return Promise.all([message, decoy]); + }); + + it('should timeout if no response is returned', (done) => { + transport + .send('timeout-message', {}, 1) + .then(() => { + done(new Error('Should not be called')); + }) + .catch((error) => { + try { + expect(error).to.be.an.instanceof(JsonRpcTransportTimeOutError); + expect(error.message).to.contain('Request timed out'); + done(); + } catch (error) { + done(error); + } + }); + + transport.connect(WEBSOCKET_URL); + }); + + it('should route notifications', (done) => { + server.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + + if (payload.method === 'event_subscribe') { + server.send(JSON.stringify(jsonrpc.success(payload.id, 1))); + } + }); + + transport + .subscribe('event', (event) => { + try { + expect(event).to.equal('an event!'); + done(); + } catch (error) { + done(error); + } + }) + .then(() => { + server.send( + JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'an event!' })), + ); + }) + .catch((error) => { + done(error); + }); + + transport.connect(WEBSOCKET_URL); + }); +}); diff --git a/test/login.spec.js b/test/login.spec.js index 68db14494b..a2390b5241 100644 --- a/test/login.spec.js +++ b/test/login.spec.js @@ -24,7 +24,7 @@ describe('Logging in', () => { expect(an).to.equal('123'); }); - chain.onSuccessOrFailure(done); + chain.end(done); store.dispatch(accountActions.login(backend, '123')); }); diff --git a/test/logout.spec.js b/test/logout.spec.js index 65bc3aba09..c973c6a81a 100644 --- a/test/logout.spec.js +++ b/test/logout.spec.js @@ -19,7 +19,7 @@ describe('logging out', () => { expect(num).to.be.null; }); chain.expect('disconnect'); - chain.onSuccessOrFailure(done); + chain.end(done); backend.logout(); }); diff --git a/test/mocks/ipc.js b/test/mocks/ipc.js index 590313ec13..b610244462 100644 --- a/test/mocks/ipc.js +++ b/test/mocks/ipc.js @@ -1,39 +1,39 @@ // @flow -import type { IpcFacade, AccountToken, AccountData, BackendState } from '../../app/lib/ipc-facade'; +import type { + DaemonRpcProtocol, + AccountToken, + AccountData, + BackendState, +} from '../../app/lib/daemon-rpc'; interface MockIpc { sendNewState: (BackendState) => void; killWebSocket: () => void; -getAccountData: (AccountToken) => Promise<AccountData>; - -connect: () => Promise<void>; + -connectTunnel: () => Promise<void>; -getAccount: () => Promise<?AccountToken>; -authenticate: (string) => Promise<void>; } export function newMockIpc() { const stateListeners = []; - const connectionCloseListeners = []; + let connectionOpenListener: ?() => void; + let connectionCloseListener: ?(error: ?Error) => void; - const mockIpc: IpcFacade & MockIpc = { + const mockIpc: DaemonRpcProtocol & MockIpc = { setConnectionString: (_str: string) => {}, - getAccountData: (accountToken) => Promise.resolve({ accountToken: accountToken, expiry: '', }), - getRelayLocations: () => Promise.resolve({ countries: [], }), - getAccount: () => Promise.resolve('1111'), - setAccount: () => Promise.resolve(), - updateRelaySettings: () => Promise.resolve(), - getRelaySettings: () => Promise.resolve({ custom_tunnel_endpoint: { @@ -46,17 +46,16 @@ export function newMockIpc() { }, }, }), - setAllowLan: (_allowLan: boolean) => Promise.resolve(), - getAllowLan: () => Promise.resolve(true), - - connect: () => Promise.resolve(), - - disconnect: () => Promise.resolve(), - - shutdown: () => Promise.resolve(), - + connect: () => { + if (connectionOpenListener) { + connectionOpenListener(); + } + }, + disconnect: () => {}, + connectTunnel: () => Promise.resolve(), + disconnectTunnel: () => Promise.resolve(), getLocation: () => Promise.resolve({ ip: '', @@ -66,36 +65,33 @@ export function newMockIpc() { longitude: 0.0, mullvad_exit_ip: false, }), - getState: () => Promise.resolve({ state: 'unsecured', target_state: 'unsecured', }), - - registerStateListener: (listener: (BackendState) => void) => { + subscribeStateListener: (listener: (state: ?BackendState, error: ?Error) => void) => { stateListeners.push(listener); + return Promise.resolve(); }, - sendNewState: (state: BackendState) => { for (const listener of stateListeners) { listener(state); } }, - - setCloseConnectionHandler: (listener: () => void) => { - connectionCloseListeners.push(listener); + addOpenConnectionObserver: (listener: () => void) => { + connectionOpenListener = listener; + }, + addCloseConnectionObserver: (listener: (error: ?Error) => void) => { + connectionCloseListener = listener; }, - authenticate: (_secret: string) => Promise.resolve(), - getAccountHistory: () => Promise.resolve([]), - removeAccountFromHistory: (_accountToken) => Promise.resolve(), killWebSocket: () => { - for (const listener of connectionCloseListeners) { - listener(); + if (connectionCloseListener) { + connectionCloseListener(); } }, }; @@ -1822,7 +1822,7 @@ commander@2: version "2.14.1" resolved "https://registry.yarnpkg.com/commander/-/commander-2.14.1.tgz#2235123e37af8ca3c65df45b026dbd357b01b9aa" -commander@2.15.1, commander@^2.15.1: +commander@2.15.1, commander@^2.15.1, commander@^2.9.0: version "2.15.1" resolved "https://registry.yarnpkg.com/commander/-/commander-2.15.1.tgz#df46e867d0fc2aec66a34662b406a9ccafff5b0f" @@ -1830,7 +1830,7 @@ commander@^2.11.0: version "2.12.2" resolved "https://registry.yarnpkg.com/commander/-/commander-2.12.2.tgz#0f5946c427ed9ec0d91a46bb9def53e54650e555" -commander@^2.2.0, commander@^2.9.0: +commander@^2.2.0: version "2.9.0" resolved "https://registry.yarnpkg.com/commander/-/commander-2.9.0.tgz#9c99094176e12240cb22d6c5146098400fe0f7d4" dependencies: @@ -5265,6 +5265,10 @@ mocha@^5.2.0: mkdirp "0.5.1" supports-color "5.4.0" +mock-socket@^7.1.0: + version "7.1.0" + resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-7.1.0.tgz#482ecccafb0f0e86b8905ba2aa57c1fe73ba262d" + moment@^2.20.1: version "2.20.1" resolved "https://registry.yarnpkg.com/moment/-/moment-2.20.1.tgz#d6eb1a46cbcc14a2b2f9434112c1ff8907f313fd" @@ -7911,8 +7915,8 @@ validate-npm-package-license@^3.0.1: spdx-expression-parse "^3.0.0" validated@^1.1.0: - version "1.1.1" - resolved "https://registry.yarnpkg.com/validated/-/validated-1.1.1.tgz#176daaf2537d51cee708160238fd774a32b4a20a" + version "1.2.0" + resolved "https://registry.yarnpkg.com/validated/-/validated-1.2.0.tgz#44ca3791cd5b2fc24433f7d1ae3a8896dfe1aec3" dependencies: commander "^2.9.0" custom-error-instance "^2.1.1" |
