diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2018-06-20 15:34:40 +0200 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2018-07-03 13:37:54 +0200 |
| commit | 73840e98952dd3f7c005fcec44c971455da179eb (patch) | |
| tree | 15a887410858007a802450ad2870d8bf495dd16d /app/lib | |
| parent | 67e82627564f8e4a2d8da4bcf5f0fd00867876bc (diff) | |
| download | mullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.tar.xz mullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.zip | |
Refactor IpcFacade to DaemonRpc and JsonRpcWs to JsonRpcTransport
Diffstat (limited to 'app/lib')
| -rw-r--r-- | app/lib/backend.js | 412 | ||||
| -rw-r--r-- | app/lib/daemon-rpc.js | 387 | ||||
| -rw-r--r-- | app/lib/ipc-facade.js | 356 | ||||
| -rw-r--r-- | app/lib/jsonrpc-transport.js | 326 | ||||
| -rw-r--r-- | app/lib/jsonrpc-ws-ipc.js | 312 | ||||
| -rw-r--r-- | app/lib/relay-settings-builder.js | 2 | ||||
| -rw-r--r-- | app/lib/rpc-address-file.js | 30 |
7 files changed, 997 insertions, 828 deletions
diff --git a/app/lib/backend.js b/app/lib/backend.js index 39521d8edb..4fb9eb3cfa 100644 --- a/app/lib/backend.js +++ b/app/lib/backend.js @@ -1,16 +1,23 @@ // @flow -import { log } from '../lib/platform'; -import { IpcFacade, RealIpc } from './ipc-facade'; -import { JsonRpcError, TimeOutError } from './jsonrpc-ws-ipc'; +import { bindActionCreators } from 'redux'; +import { push } from 'react-router-redux'; +import { + RemoteError as JsonRpcTransportRemoteError, + TimeOutError as JsonRpcTransportTimeOutError, +} from './jsonrpc-transport'; import accountActions from '../redux/account/actions'; import connectionActions from '../redux/connection/actions'; import settingsActions from '../redux/settings/actions'; -import { push } from 'react-router-redux'; +import { log } from '../lib/platform'; -import type { RpcCredentials } from './rpc-address-file'; +import type { RpcCredentials as OriginalRpcCredentials } from './rpc-address-file'; +import type { + DaemonRpcProtocol, + ConnectionObserver as DaemonConnectionObserver, +} from './daemon-rpc'; import type { ReduxStore } from '../redux/store'; -import type { AccountToken, BackendState, RelaySettingsUpdate } from './ipc-facade'; +import type { AccountToken, BackendState, RelaySettingsUpdate } from './daemon-rpc'; import type { ConnectionState } from '../redux/connection/reducers'; export class NoCreditError extends Error { @@ -75,71 +82,80 @@ export class UnknownError extends Error { } } +export class CredentialsRequestError extends Error { + _reason: Error; + + constructor(reason: Error) { + super('Failed to request the RPC credentials'); + this._reason = reason; + } + + get reason(): Error { + return this._reason; + } +} + +export type RpcCredentials = OriginalRpcCredentials; +export interface RpcCredentialsProvider { + request(): Promise<RpcCredentials>; +} + /** * Backend implementation */ export class Backend { - _ipc: IpcFacade; - _credentials: ?RpcCredentials; - _authenticationPromise: ?Promise<void>; _store: ReduxStore; + _daemonRpc: DaemonRpcProtocol; + _credentialsProvider: RpcCredentialsProvider; + _reconnectBackoff = new ReconnectionBackoff(); - constructor(store: ReduxStore, credentials?: RpcCredentials, ipc: ?IpcFacade) { - this._store = store; - this._credentials = credentials; + _credentials: ?RpcCredentials; + _authenticationPromise: ?Promise<void>; - if (ipc) { - this._ipc = ipc; + _openConnectionObserver: ?DaemonConnectionObserver; + _closeConnectionObserver: ?DaemonConnectionObserver; - // force to re-authenticate when connection closed - this._ipc.setCloseConnectionHandler(() => { - this._authenticationPromise = null; - }); - - this._registerIpcListeners(); - this._startReachability(); - } - } + constructor( + store: ReduxStore, + rpc: DaemonRpcProtocol, + credentialsProvider: RpcCredentialsProvider, + ) { + this._store = store; + this._daemonRpc = rpc; + this._credentialsProvider = credentialsProvider; - setCredentials(credentials: RpcCredentials) { - log.debug('Got connection info to backend', credentials.connectionString); - this._credentials = credentials; + this._openConnectionObserver = rpc.addOpenConnectionObserver(() => { + this._onOpenConnection(); + }); - if (this._ipc) { - this._credentials = credentials; - } else { - this._ipc = new RealIpc(credentials.connectionString); + this._closeConnectionObserver = rpc.addCloseConnectionObserver((error) => { + this._onCloseConnection(error); + }); - // force to re-authenticate when connection closed - this._ipc.setCloseConnectionHandler(() => { - this._authenticationPromise = null; - }); - } - this._registerIpcListeners(); + this._setupReachability(); } - async sync() { - log.info('Syncing with the backend...'); + dispose() { + const openConnectionObserver = this._openConnectionObserver; + const closeConnectionObserver = this._closeConnectionObserver; - try { - await this._fetchRelayLocations(); - } catch (e) { - log.error('Failed to fetch the relay locations: ', e.message); + if (openConnectionObserver) { + openConnectionObserver.unsubscribe(); + this._openConnectionObserver = null; } - try { - await this._fetchLocation(); - } catch (e) { - log.error('Failed to fetch the location: ', e.message); + if (closeConnectionObserver) { + closeConnectionObserver.unsubscribe(); + this._closeConnectionObserver = null; } + } - try { - await this._fetchAllowLan(); - } catch (e) { - log.error('Failed to fetch the LAN sharing policy: ', e.message); - } + connect() { + this._connectToDaemon(); + } - await this._fetchAccountHistory(); + disconnect() { + this._daemonRpc.disconnect(); } async login(accountToken: AccountToken) { @@ -150,11 +166,11 @@ export class Backend { try { await this._ensureAuthenticated(); - const accountData = await this._ipc.getAccountData(accountToken); + const accountData = await this._daemonRpc.getAccountData(accountToken); log.debug('Account exists', accountData); - await this._ipc.setAccount(accountToken); + await this._daemonRpc.setAccount(accountToken); log.info('Log in complete'); @@ -166,10 +182,10 @@ export class Backend { setTimeout(() => { this._store.dispatch(push('/connect')); log.debug('Autoconnecting...'); - this.connect(); + this.connectTunnel(); }, 1000); - await this._fetchAccountHistory(); + await this.fetchAccountHistory(); } catch (e) { log.error('Failed to log in,', e.message); @@ -178,27 +194,6 @@ export class Backend { } } - _rpcErrorToBackendError(e) { - if (e instanceof JsonRpcError) { - switch (e.code) { - case -200: // Account doesn't exist - return new InvalidAccountError(); - case -32603: // Internal error - // We treat all internal backend errors as the user cannot reach - // api.mullvad.net. This is not always true of course, but it is - // true so often that we choose to disregard the other edge cases - // for now. - return new CommunicationError(); - } - } else if (e instanceof TimeOutError) { - return new CommunicationError(); - } else if (e instanceof NoDaemonError) { - return e; - } - - return new UnknownError(e.message); - } - async autologin() { try { log.debug('Attempting to log in automatically'); @@ -207,7 +202,7 @@ export class Backend { this._store.dispatch(accountActions.startLogin()); - const accountToken = await this._ipc.getAccount(); + const accountToken = await this._daemonRpc.getAccount(); if (!accountToken) { throw new NoAccountError(); } @@ -215,7 +210,7 @@ export class Backend { log.debug('The backend had an account number stored: ', accountToken); this._store.dispatch(accountActions.startLogin(accountToken)); - const accountData = await this._ipc.getAccountData(accountToken); + const accountData = await this._daemonRpc.getAccountData(accountToken); log.debug('The stored account number still exists', accountData); this._store.dispatch(accountActions.loginSuccessful(accountData.expiry)); @@ -234,12 +229,12 @@ export class Backend { // @TODO: What does it mean for a logout to be successful or failed? try { await this._ensureAuthenticated(); - await this._ipc.setAccount(null); + await this._daemonRpc.setAccount(null); this._store.dispatch(accountActions.loggedOut()); // disconnect user during logout - await this.disconnect(); + await this.disconnectTunnel(); this._store.dispatch(push('/')); } catch (e) { @@ -247,9 +242,9 @@ export class Backend { } } - async connect() { + async connectTunnel() { try { - const currentState = await this._ipc.getState(); + const currentState = await this._daemonRpc.getState(); if (currentState.state === 'secured') { log.debug('Refusing to connect as connection is already secured'); this._store.dispatch(connectionActions.connected()); @@ -259,18 +254,18 @@ export class Backend { this._store.dispatch(connectionActions.connecting()); await this._ensureAuthenticated(); - await this._ipc.connect(); + await this._daemonRpc.connectTunnel(); } catch (e) { log.error('Failed to connect: ', e.message); this._store.dispatch(connectionActions.disconnected()); } } - async disconnect() { + async disconnectTunnel() { // @TODO: Failure modes try { await this._ensureAuthenticated(); - await this._ipc.disconnect(); + await this._daemonRpc.disconnectTunnel(); } catch (e) { log.error('Failed to disconnect: ', e.message); } @@ -279,7 +274,7 @@ export class Backend { async updateRelaySettings(relaySettings: RelaySettingsUpdate) { try { await this._ensureAuthenticated(); - await this._ipc.updateRelaySettings(relaySettings); + await this._daemonRpc.updateRelaySettings(relaySettings); } catch (e) { log.error('Failed to update relay settings: ', e.message); } @@ -288,7 +283,7 @@ export class Backend { async fetchRelaySettings() { await this._ensureAuthenticated(); - const relaySettings = await this._ipc.getRelaySettings(); + const relaySettings = await this._daemonRpc.getRelaySettings(); log.debug('Got relay settings from backend', JSON.stringify(relaySettings)); if (relaySettings.normal) { @@ -339,13 +334,13 @@ export class Backend { } async updateAccountExpiry() { - const ipc = this._ipc; + const ipc = this._daemonRpc; const store = this._store; try { await this._ensureAuthenticated(); - const accountToken = await this._ipc.getAccount(); + const accountToken = await this._daemonRpc.getAccount(); if (!accountToken) { throw new NoAccountError(); } @@ -357,31 +352,22 @@ export class Backend { } } - async removeAccountFromHistory(accountToken: AccountToken) { - try { - await this._ensureAuthenticated(); - await this._ipc.removeAccountFromHistory(accountToken); - await this._fetchAccountHistory(); - } catch (e) { - log.error('Failed to remove account token from history', e.message); - } + async removeAccountFromHistory(accountToken: AccountToken): Promise<void> { + await this._ensureAuthenticated(); + await this._daemonRpc.removeAccountFromHistory(accountToken); + await this.fetchAccountHistory(); } - async _fetchAccountHistory() { - try { - await this._ensureAuthenticated(); - const accountHistory = await this._ipc.getAccountHistory(); - this._store.dispatch(accountActions.updateAccountHistory(accountHistory)); - } catch (e) { - log.info('Failed to fetch account history,', e.message); - throw e; - } + async fetchAccountHistory(): Promise<void> { + await this._ensureAuthenticated(); + const accountHistory = await this._daemonRpc.getAccountHistory(); + this._store.dispatch(accountActions.updateAccountHistory(accountHistory)); } - async _fetchRelayLocations() { + async fetchRelayLocations() { await this._ensureAuthenticated(); - const locations = await this._ipc.getRelayLocations(); + const locations = await this._daemonRpc.getRelayLocations(); log.info('Got relay locations'); @@ -401,10 +387,10 @@ export class Backend { this._store.dispatch(settingsActions.updateRelayLocations(storedLocations)); } - async _fetchLocation() { + async fetchLocation() { await this._ensureAuthenticated(); - const location = await this._ipc.getLocation(); + const location = await this._daemonRpc.getLocation(); log.info('Got location from daemon'); @@ -421,65 +407,148 @@ export class Backend { } async setAllowLan(allowLan: boolean) { - try { - await this._ensureAuthenticated(); - await this._ipc.setAllowLan(allowLan); + await this._ensureAuthenticated(); + await this._daemonRpc.setAllowLan(allowLan); - this._store.dispatch(settingsActions.updateAllowLan(allowLan)); - } catch (e) { - log.error('Failed to change the LAN sharing policy: ', e.message); - } + this._store.dispatch(settingsActions.updateAllowLan(allowLan)); } - async _fetchAllowLan() { + async fetchAllowLan() { await this._ensureAuthenticated(); - const allowLan = await this._ipc.getAllowLan(); + const allowLan = await this._daemonRpc.getAllowLan(); this._store.dispatch(settingsActions.updateAllowLan(allowLan)); } async fetchSecurityState() { await this._ensureAuthenticated(); - const securityState = await this._ipc.getState(); + const securityState = await this._daemonRpc.getState(); const connectionState = this._securityStateToConnectionState(securityState); this._dispatchConnectionState(connectionState); } + async _requestCredentials(): Promise<RpcCredentials> { + try { + return await this._credentialsProvider.request(); + } catch (providerError) { + throw new CredentialsRequestError(providerError); + } + } + + async _connectToDaemon(): Promise<void> { + let credentials; + try { + credentials = await this._requestCredentials(); + } catch (error) { + log.error(`Cannot request the RPC credentials: ${error.message}`); + return; + } + + this._credentials = credentials; + this._daemonRpc.connect(credentials.connectionString); + } + + async _onOpenConnection() { + this._reconnectBackoff.reset(); + + // make sure to re-subscribe to state notifications when connection is re-established. + try { + await this._subscribeStateListener(); + } catch (error) { + log.error(`Cannot subscribe for RPC notifications: ${error.message}`); + } + + this._fetchInitialState(); + } + + async _onCloseConnection(error: ?Error) { + // force to re-authenticate when connection closed + this._authenticationPromise = null; + + if (error) { + log.debug(`Lost connection to daemon: ${error.message}`); + + const recover = async () => { + try { + await this.connect(); + } catch (error) { + log.error(`Failed to reconnect: ${error.message}`); + } + }; + + this._reconnectBackoff.attempt(() => { + recover(); + }); + } + } + /** * Start reachability monitoring for online/offline detection * This is currently done via HTML5 APIs but will be replaced later * with proper backend integration. */ - _startReachability() { + _setupReachability() { + const { online, offline } = bindActionCreators(connectionActions, this._store.dispatch); + window.addEventListener('online', () => { - this._store.dispatch(connectionActions.online()); + online(); }); window.addEventListener('offline', () => { - // force disconnect since there is no real connection anyway. - this.disconnect(); - this._store.dispatch(connectionActions.offline()); + offline(); }); - // update online status in background - setTimeout(() => { - const action = navigator.onLine ? connectionActions.online() : connectionActions.offline(); - - this._store.dispatch(action); - }, 0); + if (navigator.onLine) { + online(); + } else { + offline(); + } } - async _registerIpcListeners() { + async _subscribeStateListener() { await this._ensureAuthenticated(); - this._ipc.registerStateListener((newState) => { - const connectionState = this._securityStateToConnectionState(newState); - log.debug(`Got new state from backend {state: ${newState.state}, \ - target_state: ${newState.target_state}}, translated to '${connectionState}'`); - this._dispatchConnectionState(connectionState); - this.sync(); + await this._daemonRpc.subscribeStateListener((newState, error) => { + if (error) { + log.error(`Received an error when processing the incoming state change: ${error.message}`); + } + + if (newState) { + const connectionState = this._securityStateToConnectionState(newState); + + log.debug( + `Got new state from backend {state: ${newState.state}, target_state: ${ + newState.target_state + }}, translated to '${connectionState}'`, + ); + + this._dispatchConnectionState(connectionState); + this._refreshStateOnChange(); + } }); } + async _fetchInitialState() { + try { + await Promise.all([ + this.fetchSecurityState(), + this.fetchRelaySettings(), + this.fetchRelayLocations(), + this.fetchAllowLan(), + this.fetchLocation(), + ]); + } catch (error) { + log.error(`Cannot prefetch data: ${error.message}`); + } + } + + async _refreshStateOnChange() { + try { + await this.fetchLocation(); + } catch (error) { + log.error(`Failed to fetch the location: ${error.message}`); + } + } + _securityStateToConnectionState(backendState: BackendState): ConnectionState { if (backendState.state === 'unsecured' && backendState.target_state === 'secured') { return 'connecting'; @@ -492,19 +561,44 @@ export class Backend { } _dispatchConnectionState(connectionState: ConnectionState) { + const { connecting, connected, disconnected } = bindActionCreators( + connectionActions, + this._store.dispatch, + ); switch (connectionState) { case 'connecting': - this._store.dispatch(connectionActions.connecting()); + connecting(); break; case 'connected': - this._store.dispatch(connectionActions.connected()); + connected(); break; case 'disconnected': - this._store.dispatch(connectionActions.disconnected()); + disconnected(); break; } } + _rpcErrorToBackendError(e) { + if (e instanceof JsonRpcTransportRemoteError) { + switch (e.code) { + case -200: // Account doesn't exist + return new InvalidAccountError(); + case -32603: // Internal error + // We treat all internal backend errors as the user cannot reach + // api.mullvad.net. This is not always true of course, but it is + // true so often that we choose to disregard the other edge cases + // for now. + return new CommunicationError(); + } + } else if (e instanceof JsonRpcTransportTimeOutError) { + return new CommunicationError(); + } else if (e instanceof NoDaemonError) { + return e; + } + + return new UnknownError(e.message); + } + _ensureAuthenticated(): Promise<void> { const credentials = this._credentials; if (credentials) { @@ -519,11 +613,41 @@ export class Backend { async _authenticate(sharedSecret: string) { try { - await this._ipc.authenticate(sharedSecret); + await this._daemonRpc.authenticate(sharedSecret); log.info('Authenticated with backend'); } catch (e) { - log.error('Failed to authenticate with backend: ', e.message); + log.error(`Failed to authenticate with backend: ${e.message}`); throw e; } } } + +/* + * Used to calculate the time to wait before reconnecting + * the websocket. + * + * It uses a linear backoff function that goes from 500ms + * to 3000ms + */ +class ReconnectionBackoff { + _attempt: number; + + constructor() { + this._attempt = 0; + } + + attempt(handler: () => void) { + setTimeout(handler, this._getIncreasedBackoff()); + } + + reset() { + this._attempt = 0; + } + + _getIncreasedBackoff() { + if (this._attempt < 6) { + this._attempt++; + } + return this._attempt * 500; + } +} diff --git a/app/lib/daemon-rpc.js b/app/lib/daemon-rpc.js new file mode 100644 index 0000000000..2ef3d06e13 --- /dev/null +++ b/app/lib/daemon-rpc.js @@ -0,0 +1,387 @@ +// @flow + +import JsonRpcTransport from './jsonrpc-transport'; +import { + object, + maybe, + string, + number, + boolean, + enumeration, + arrayOf, + oneOf, +} from 'validated/schema'; +import { validate } from 'validated/object'; + +import type { Node as SchemaNode } from 'validated/schema'; + +export type AccountData = { expiry: string }; +export type AccountToken = string; +export type Ip = string; +export type Location = { + ip: Ip, + country: string, + city: ?string, + latitude: number, + longitude: number, + mullvad_exit_ip: boolean, +}; +const LocationSchema = object({ + ip: string, + country: string, + city: maybe(string), + latitude: number, + longitude: number, + mullvad_exit_ip: boolean, +}); + +export type SecurityState = 'secured' | 'unsecured'; +export type BackendState = { + state: SecurityState, + target_state: SecurityState, +}; + +export type RelayProtocol = 'tcp' | 'udp'; +export type RelayLocation = {| city: [string, string] |} | {| country: string |}; + +type OpenVpnParameters = { + port: 'any' | { only: number }, + protocol: 'any' | { only: RelayProtocol }, +}; + +type TunnelOptions<TOpenVpnParameters> = { + openvpn: TOpenVpnParameters, +}; + +type RelaySettingsNormal<TTunnelOptions> = { + location: + | 'any' + | { + only: RelayLocation, + }, + tunnel: + | 'any' + | { + only: TTunnelOptions, + }, +}; + +// types describing the structure of RelaySettings +export type RelaySettingsCustom = { + host: string, + tunnel: { + openvpn: { + port: number, + protocol: RelayProtocol, + }, + }, +}; +export type RelaySettings = + | {| + normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>, + |} + | {| + custom_tunnel_endpoint: RelaySettingsCustom, + |}; + +// types describing the partial update of RelaySettings +export type RelaySettingsNormalUpdate = $Shape< + RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>, +>; +export type RelaySettingsUpdate = + | {| + normal: RelaySettingsNormalUpdate, + |} + | {| + custom_tunnel_endpoint: RelaySettingsCustom, + |}; + +const constraint = <T>(constraintValue: SchemaNode<T>) => { + return oneOf( + string, // any + object({ + only: constraintValue, + }), + ); +}; + +const RelaySettingsSchema = oneOf( + object({ + normal: object({ + location: constraint( + oneOf( + object({ + city: arrayOf(string), + }), + object({ + country: string, + }), + ), + ), + tunnel: constraint( + object({ + openvpn: object({ + port: constraint(number), + protocol: constraint(enumeration('udp', 'tcp')), + }), + }), + ), + }), + }), + object({ + custom_tunnel_endpoint: object({ + host: string, + tunnel: object({ + openvpn: object({ + port: number, + protocol: enumeration('udp', 'tcp'), + }), + }), + }), + }), +); + +export type RelayList = { + countries: Array<RelayListCountry>, +}; + +export type RelayListCountry = { + name: string, + code: string, + cities: Array<RelayListCity>, +}; + +export type RelayListCity = { + name: string, + code: string, + latitude: number, + longitude: number, + has_active_relays: boolean, +}; + +const RelayListSchema = object({ + countries: arrayOf( + object({ + name: string, + code: string, + cities: arrayOf( + object({ + name: string, + code: string, + latitude: number, + longitude: number, + has_active_relays: boolean, + }), + ), + }), + ), +}); + +const AccountDataSchema = object({ + expiry: string, +}); + +const allSecurityStates: Array<SecurityState> = ['secured', 'unsecured']; +const BackendStateSchema = object({ + state: enumeration(...allSecurityStates), + target_state: enumeration(...allSecurityStates), +}); + +export interface DaemonRpcProtocol { + connect(string): void; + disconnect(): void; + getAccountData(AccountToken): Promise<AccountData>; + getRelayLocations(): Promise<RelayList>; + getAccount(): Promise<?AccountToken>; + setAccount(accountToken: ?AccountToken): Promise<void>; + updateRelaySettings(RelaySettingsUpdate): Promise<void>; + getRelaySettings(): Promise<RelaySettings>; + setAllowLan(boolean): Promise<void>; + getAllowLan(): Promise<boolean>; + connectTunnel(): Promise<void>; + disconnectTunnel(): Promise<void>; + getLocation(): Promise<Location>; + getState(): Promise<BackendState>; + subscribeStateListener((state: ?BackendState, error: ?Error) => void): Promise<void>; + addOpenConnectionObserver(() => void): ConnectionObserver; + addCloseConnectionObserver((error: ?Error) => void): ConnectionObserver; + authenticate(sharedSecret: string): Promise<void>; + getAccountHistory(): Promise<Array<AccountToken>>; + removeAccountFromHistory(accountToken: AccountToken): Promise<void>; +} + +export class ResponseParseError extends Error { + _validationError: ?Error; + + constructor(message: string, validationError: ?Error) { + super(message); + this._validationError = validationError; + } + + get validationError(): ?Error { + return this._validationError; + } +} + +export type ConnectionObserver = { + unsubscribe: () => void, +}; + +export class DaemonRpc implements DaemonRpcProtocol { + _transport = new JsonRpcTransport(); + + async authenticate(sharedSecret: string): Promise<void> { + await this._transport.send('auth', sharedSecret); + } + + connect(connectionString: string) { + this._transport.connect(connectionString); + } + + disconnect() { + this._transport.disconnect(); + } + + addOpenConnectionObserver(handler: () => void): ConnectionObserver { + this._transport.on('open', handler); + return { + unsubscribe: () => { + this._transport.off('open', handler); + }, + }; + } + + addCloseConnectionObserver(handler: (error: ?Error) => void): ConnectionObserver { + this._transport.on('close', handler); + return { + unsubscribe: () => { + this._transport.off('close', handler); + }, + }; + } + + async getAccountData(accountToken: AccountToken): Promise<AccountData> { + // send the IPC with 30s timeout since the backend will wait + // for a HTTP request before replying + const response = await this._transport.send('get_account_data', accountToken, 30000); + try { + return validate(AccountDataSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_account_data', error); + } + } + + async getRelayLocations(): Promise<RelayList> { + const response = await this._transport.send('get_relay_locations'); + try { + return validate(RelayListSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_relay_locations', error); + } + } + + async getAccount(): Promise<?AccountToken> { + const response = await this._transport.send('get_account'); + if (response === null || typeof response === 'string') { + return response; + } else { + throw new ResponseParseError('Invalid response from get_account', null); + } + } + + async setAccount(accountToken: ?AccountToken): Promise<void> { + await this._transport.send('set_account', accountToken); + } + + async updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> { + await this._transport.send('update_relay_settings', [relaySettings]); + } + + async getRelaySettings(): Promise<RelaySettings> { + const response = await this._transport.send('get_relay_settings'); + try { + const validatedObject = validate(RelaySettingsSchema, response); + + /* $FlowFixMe: + There is no way to express the constraints with string literals, i.e: + + RelaySettingsSchema constraint: + oneOf(string, object) + + RelaySettings constraint: + 'any' | object + + These two are incompatible so we simply enforce the type for now. + */ + return ((validatedObject: any): RelaySettings); + } catch (e) { + throw new ResponseParseError('Invalid response from get_relay_settings', e); + } + } + + async setAllowLan(allowLan: boolean): Promise<void> { + await this._transport.send('set_allow_lan', [allowLan]); + } + + async getAllowLan(): Promise<boolean> { + const response = await this._transport.send('get_allow_lan'); + if (typeof response === 'boolean') { + return response; + } else { + throw new ResponseParseError('Invalid response from get_allow_lan', null); + } + } + + async connectTunnel(): Promise<void> { + await this._transport.send('connect'); + } + + async disconnectTunnel(): Promise<void> { + await this._transport.send('disconnect'); + } + + async getLocation(): Promise<Location> { + // send the IPC with 30s timeout since the backend will wait + // for a HTTP request before replying + + const response = await this._transport.send('get_current_location', [], 30000); + try { + return validate(LocationSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_current_location', error); + } + } + + async getState(): Promise<BackendState> { + const response = await this._transport.send('get_state'); + try { + return validate(BackendStateSchema, response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_state', error); + } + } + + subscribeStateListener(listener: (state: ?BackendState, error: ?Error) => void): Promise<void> { + return this._transport.subscribe('new_state', (payload) => { + try { + const newState = validate(BackendStateSchema, payload); + listener(newState, null); + } catch (error) { + listener(null, new ResponseParseError('Invalid payload from new_state', error)); + } + }); + } + + async getAccountHistory(): Promise<Array<AccountToken>> { + const response = await this._transport.send('get_account_history'); + try { + return validate(arrayOf(string), response); + } catch (error) { + throw new ResponseParseError('Invalid response from get_account_history', null); + } + } + + async removeAccountFromHistory(accountToken: AccountToken): Promise<void> { + await this._transport.send('remove_account_from_history', accountToken); + } +} diff --git a/app/lib/ipc-facade.js b/app/lib/ipc-facade.js deleted file mode 100644 index 103ef4d931..0000000000 --- a/app/lib/ipc-facade.js +++ /dev/null @@ -1,356 +0,0 @@ -// @flow - -import JsonRpcWs, { InvalidReply } from './jsonrpc-ws-ipc'; -import { - object, - maybe, - string, - number, - boolean, - enumeration, - arrayOf, - oneOf, -} from 'validated/schema'; -import { validate } from 'validated/object'; - -import type { Node as SchemaNode } from 'validated/schema'; - -export type AccountData = { expiry: string }; -export type AccountToken = string; -export type Ip = string; -export type Location = { - ip: Ip, - country: string, - city: ?string, - latitude: number, - longitude: number, - mullvad_exit_ip: boolean, -}; -const LocationSchema = object({ - ip: string, - country: string, - city: maybe(string), - latitude: number, - longitude: number, - mullvad_exit_ip: boolean, -}); - -export type SecurityState = 'secured' | 'unsecured'; -export type BackendState = { - state: SecurityState, - target_state: SecurityState, -}; - -export type RelayProtocol = 'tcp' | 'udp'; -export type RelayLocation = {| city: [string, string] |} | {| country: string |}; - -type OpenVpnParameters = { - port: 'any' | { only: number }, - protocol: 'any' | { only: RelayProtocol }, -}; - -type TunnelOptions<TOpenVpnParameters> = { - openvpn: TOpenVpnParameters, -}; - -type RelaySettingsNormal<TTunnelOptions> = { - location: - | 'any' - | { - only: RelayLocation, - }, - tunnel: - | 'any' - | { - only: TTunnelOptions, - }, -}; - -// types describing the structure of RelaySettings -export type RelaySettingsCustom = { - host: string, - tunnel: { - openvpn: { - port: number, - protocol: RelayProtocol, - }, - }, -}; -export type RelaySettings = - | {| - normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>, - |} - | {| - custom_tunnel_endpoint: RelaySettingsCustom, - |}; - -// types describing the partial update of RelaySettings -export type RelaySettingsNormalUpdate = $Shape< - RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>, ->; -export type RelaySettingsUpdate = - | {| - normal: RelaySettingsNormalUpdate, - |} - | {| - custom_tunnel_endpoint: RelaySettingsCustom, - |}; - -const constraint = <T>(constraintValue: SchemaNode<T>) => - oneOf( - string, - object({ - only: constraintValue, - }), - ); - -const RelaySettingsSchema = oneOf( - object({ - normal: object({ - location: constraint( - oneOf( - object({ - city: arrayOf(string), - }), - object({ - country: string, - }), - ), - ), - tunnel: constraint( - object({ - openvpn: object({ - port: constraint(number), - protocol: constraint(enumeration('udp', 'tcp')), - }), - }), - ), - }), - }), - object({ - custom_tunnel_endpoint: object({ - host: string, - tunnel: object({ - openvpn: object({ - port: number, - protocol: enumeration('udp', 'tcp'), - }), - }), - }), - }), -); - -export type RelayList = { - countries: Array<RelayListCountry>, -}; - -export type RelayListCountry = { - name: string, - code: string, - cities: Array<RelayListCity>, -}; - -export type RelayListCity = { - name: string, - code: string, - latitude: number, - longitude: number, - has_active_relays: boolean, -}; - -const RelayListSchema = object({ - countries: arrayOf( - object({ - name: string, - code: string, - cities: arrayOf( - object({ - name: string, - code: string, - latitude: number, - longitude: number, - has_active_relays: boolean, - }), - ), - }), - ), -}); - -export interface IpcFacade { - setConnectionString(string): void; - getAccountData(AccountToken): Promise<AccountData>; - getRelayLocations(): Promise<RelayList>; - getAccount(): Promise<?AccountToken>; - setAccount(accountToken: ?AccountToken): Promise<void>; - updateRelaySettings(RelaySettingsUpdate): Promise<void>; - getRelaySettings(): Promise<RelaySettings>; - setAllowLan(boolean): Promise<void>; - getAllowLan(): Promise<boolean>; - connect(): Promise<void>; - disconnect(): Promise<void>; - getLocation(): Promise<Location>; - getState(): Promise<BackendState>; - registerStateListener((BackendState) => void): void; - setCloseConnectionHandler(() => void): void; - authenticate(sharedSecret: string): Promise<void>; - getAccountHistory(): Promise<Array<AccountToken>>; - removeAccountFromHistory(accountToken: AccountToken): Promise<void>; -} - -export class RealIpc implements IpcFacade { - _ipc: JsonRpcWs; - - constructor(connectionString: string) { - this._ipc = new JsonRpcWs(connectionString); - } - - setConnectionString(str: string) { - this._ipc.setConnectionString(str); - } - - getAccountData(accountToken: AccountToken): Promise<AccountData> { - // send the IPC with 30s timeout since the backend will wait - // for a HTTP request before replying - - return this._ipc.send('get_account_data', accountToken, 30000).then((raw) => { - if (typeof raw === 'object' && raw && raw.expiry) { - return raw; - } else { - throw new InvalidReply(raw, 'Expected an object with expiry'); - } - }); - } - - async getRelayLocations(): Promise<RelayList> { - const raw = await this._ipc.send('get_relay_locations'); - try { - const validated: any = validate(RelayListSchema, raw); - return (validated: RelayList); - } catch (e) { - throw new InvalidReply(raw, e); - } - } - - getAccount(): Promise<?AccountToken> { - return this._ipc.send('get_account').then((raw) => { - if (raw === undefined || raw === null || typeof raw === 'string') { - return raw; - } else { - throw new InvalidReply(raw); - } - }); - } - - setAccount(accountToken: ?AccountToken): Promise<void> { - return this._ipc.send('set_account', accountToken).then(this._ignoreResponse); - } - - _ignoreResponse(_response: mixed): void { - return; - } - - updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> { - return this._ipc.send('update_relay_settings', [relaySettings]).then(this._ignoreResponse); - } - - getRelaySettings(): Promise<RelaySettings> { - return this._ipc.send('get_relay_settings').then((raw) => { - try { - const validated: any = validate(RelaySettingsSchema, raw); - return (validated: RelaySettings); - } catch (e) { - throw new InvalidReply(raw, e); - } - }); - } - - setAllowLan(allowLan: boolean): Promise<void> { - return this._ipc.send('set_allow_lan', [allowLan]).then(this._ignoreResponse); - } - - async getAllowLan(): Promise<boolean> { - const raw = await this._ipc.send('get_allow_lan'); - if (typeof raw === 'boolean') { - return raw; - } else { - throw new InvalidReply(raw, 'Expected a boolean'); - } - } - - connect(): Promise<void> { - return this._ipc.send('connect').then(this._ignoreResponse); - } - - disconnect(): Promise<void> { - return this._ipc.send('disconnect').then(this._ignoreResponse); - } - - getLocation(): Promise<Location> { - // send the IPC with 30s timeout since the backend will wait - // for a HTTP request before replying - - return this._ipc.send('get_current_location', [], 30000).then((raw) => { - try { - const validated: any = validate(LocationSchema, raw); - return (validated: Location); - } catch (e) { - throw new InvalidReply(raw, e); - } - }); - } - - getState(): Promise<BackendState> { - return this._ipc.send('get_state').then((raw) => { - return this._parseBackendState(raw); - }); - } - - _parseBackendState(raw: mixed): BackendState { - if (raw && raw.state && raw.target_state) { - const uncheckedRaw: any = raw; - - const states: Array<SecurityState> = ['secured', 'unsecured']; - const correctState = states.includes(uncheckedRaw.state); - const correctTargetState = states.includes(uncheckedRaw.target_state); - - if (!correctState || !correctTargetState) { - throw new InvalidReply(raw); - } - - return (uncheckedRaw: BackendState); - } else { - throw new InvalidReply(raw); - } - } - - registerStateListener(listener: (BackendState) => void) { - this._ipc.on('new_state', (rawEvent) => { - const parsedEvent: BackendState = this._parseBackendState(rawEvent); - - listener(parsedEvent); - }); - } - - setCloseConnectionHandler(handler: () => void) { - this._ipc.setCloseConnectionHandler(handler); - } - - authenticate(sharedSecret: string): Promise<void> { - return this._ipc.send('auth', sharedSecret).then(this._ignoreResponse); - } - - getAccountHistory(): Promise<Array<AccountToken>> { - return this._ipc.send('get_account_history').then((raw) => { - if (Array.isArray(raw) && raw.every((i) => typeof i === 'string')) { - const checked: any = raw; - return (checked: Array<AccountToken>); - } else { - throw new InvalidReply(raw, 'Expected an array of strings'); - } - }); - } - - removeAccountFromHistory(accountToken: AccountToken): Promise<void> { - return this._ipc.send('remove_account_from_history', accountToken).then(this._ignoreResponse); - } -} diff --git a/app/lib/jsonrpc-transport.js b/app/lib/jsonrpc-transport.js new file mode 100644 index 0000000000..2fa98df988 --- /dev/null +++ b/app/lib/jsonrpc-transport.js @@ -0,0 +1,326 @@ +// @flow + +import { EventEmitter } from 'events'; +import jsonrpc from 'jsonrpc-lite'; +import uuid from 'uuid'; +import { log } from '../lib/platform'; + +export type UnansweredRequest = { + resolve: (mixed) => void, + reject: (mixed) => void, + timerId: TimeoutID, + message: Object, +}; + +export type JsonRpcErrorResponse = { + type: 'error', + payload: { + id: string, + error: { + code: number, + message: string, + }, + }, +}; +export type JsonRpcNotification = { + type: 'notification', + payload: { + method: string, + params: { + subscription: string, + result: mixed, + }, + }, +}; +export type JsonRpcSuccess = { + type: 'success', + payload: { + id: string, + result: mixed, + }, +}; +export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess; + +export class RemoteError extends Error { + _code: number; + _details: string; + + constructor(code: number, details: string) { + super(`Remote JSON-RPC error ${code}: ${details}`); + this._code = code; + this._details = details; + } + + get code(): number { + return this._code; + } + + get details(): string { + return this._details; + } +} + +export class TimeOutError extends Error { + _jsonRpcMessage: Object; + + constructor(jsonRpcMessage: Object) { + super('Request timed out'); + + this._jsonRpcMessage = jsonRpcMessage; + } + + get jsonRpcMessage(): Object { + return this._jsonRpcMessage; + } +} + +export class SubscriptionError extends Error { + _reply: mixed; + + constructor(message: string, reply: mixed) { + const replyString = JSON.stringify(reply); + + super(`${message}: ${replyString}`); + + this._reply = reply; + } + + get reply(): mixed { + return this._reply; + } +} + +export class ConnectionError extends Error { + _code: number; + + constructor(code: number) { + super(ConnectionError.reason(code)); + this._code = code; + } + + get code(): number { + return this._code; + } + + static reason(code: number): string { + switch (code) { + case 1006: + return 'Abnormal closure'; + case 1011: + return 'Internal error'; + case 1012: + return 'Service restart'; + case 1014: + return 'Bad gateway'; + default: + return `Unknown (${code})`; + } + } +} + +const DEFAULT_TIMEOUT_MILLIS = 5000; + +export default class JsonRpcTransport extends EventEmitter { + _unansweredRequests: Map<string, UnansweredRequest> = new Map(); + _subscriptions: Map<string | number, (mixed) => void> = new Map(); + _websocket: ?WebSocket; + _websocketFactory: (string) => WebSocket; + + constructor(websocketFactory: ?(string) => WebSocket) { + super(); + this._websocketFactory = + websocketFactory || ((connectionString) => new WebSocket(connectionString)); + } + + /// Connect websocket + connect(connectionString: string) { + this.disconnect(); + + log.info('Connecting to websocket', connectionString); + + const websocket = this._websocketFactory(connectionString); + + websocket.onopen = () => { + log.info('Websocket is connected'); + this.emit('open'); + }; + + websocket.onmessage = (event) => { + const data = event.data; + if (typeof data === 'string') { + this._onMessage(data); + } else { + log.error('Got invalid reply from the server', event); + } + }; + + websocket.onclose = (event) => { + log.info(`The websocket connection closed with code: ${event.code}`); + + // Remove all subscriptions since they are connection based + this._subscriptions.clear(); + + // 1000 is a code used for normal connection closure. + const connectionError = event.code === 1000 ? null : new ConnectionError(event.code); + + this.emit('close', connectionError); + }; + + this._websocket = websocket; + } + + disconnect() { + if (this._websocket) { + this._websocket.close(); + this._websocket = null; + } + } + + async subscribe(event: string, listener: (mixed) => void): Promise<*> { + log.silly(`Adding a listener to ${event}`); + + try { + const subscriptionId = await this.send(`${event}_subscribe`); + if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') { + this._subscriptions.set(subscriptionId, listener); + } else { + throw new SubscriptionError( + 'The subscription id was not a string or a number', + subscriptionId, + ); + } + } catch (e) { + log.error(`Failed adding listener to ${event}: ${e.message}`); + throw e; + } + } + + async send( + action: string, + data: mixed, + timeout: number = DEFAULT_TIMEOUT_MILLIS, + ): Promise<mixed> { + let socket: WebSocket; + try { + socket = await this._getWebSocket(); + } catch (error) { + throw error; + } + + return new Promise((resolve, reject) => { + const id = uuid.v4(); + const payload = this._prepareParams(data); + const timerId = setTimeout(() => this._onTimeout(id), timeout); + const message = jsonrpc.request(id, action, payload); + this._unansweredRequests.set(id, { + resolve, + reject, + timerId, + message, + }); + + try { + log.silly('Sending message', id, action); + socket.send(JSON.stringify(message)); + } catch (error) { + log.error(`Failed sending RPC message "${action}": ${error.message}`); + throw error; + } + }); + } + + _prepareParams(data: mixed): Array<mixed> | Object { + // JSONRPC only accepts arrays and objects as params, but + // this isn't very nice to use, so this method wraps other + // types in an array. The choice of array is based on try-and-error + + if (data === undefined) { + return []; + } else if (data === null) { + return [null]; + } else if (Array.isArray(data) || typeof data === 'object') { + return data; + } else { + return [data]; + } + } + + _getWebSocket(): Promise<WebSocket> { + if (this._websocket && this._websocket.readyState === 1) { + return Promise.resolve(this._websocket); + } else { + return new Promise((resolve, reject) => { + log.debug('Waiting for websocket to connect'); + + this.once('open', () => { + const ws = this._websocket; + if (ws) { + resolve(ws); + } else { + reject(new Error('Internal error')); + } + }); + }); + } + } + + _onTimeout(requestId) { + const request = this._unansweredRequests.get(requestId); + + this._unansweredRequests.delete(requestId); + + if (request) { + log.warn(`Request ${requestId} timed out: `, request.message); + request.reject(new TimeOutError(request.message)); + } else { + log.warn(`Request ${requestId} timed out but it seems to already have been answered`); + } + } + + _onMessage(message: string) { + const result = jsonrpc.parse(message); + const messages = Array.isArray(result) ? result : [result]; + + for (const message of messages) { + if (message.type === 'notification') { + this._onNotification(message); + } else { + this._onReply(message); + } + } + } + + _onNotification(message: JsonRpcNotification) { + const subscriptionId = message.payload.params.subscription; + const listener = this._subscriptions.get(subscriptionId); + + if (listener) { + log.silly('Got notification', message.payload.method, message.payload.params.result); + listener(message.payload.params.result); + } else { + log.warn('Got notification for', message.payload.method, 'but no one is listening for it'); + } + } + + _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) { + const id = message.payload.id; + const request = this._unansweredRequests.get(id); + this._unansweredRequests.delete(id); + + if (request) { + log.silly('Got answer to', id, message.type); + + clearTimeout(request.timerId); + + if (message.type === 'error') { + const error = message.payload.error; + request.reject(new RemoteError(error.code, error.message)); + } else { + const reply = message.payload.result; + request.resolve(reply); + } + } else { + log.warn(`Got reply to ${id} but no one was waiting for it`); + } + } +} diff --git a/app/lib/jsonrpc-ws-ipc.js b/app/lib/jsonrpc-ws-ipc.js deleted file mode 100644 index 84dec96c77..0000000000 --- a/app/lib/jsonrpc-ws-ipc.js +++ /dev/null @@ -1,312 +0,0 @@ -// @flow - -import jsonrpc from 'jsonrpc-lite'; -import uuid from 'uuid'; -import { log } from '../lib/platform'; - -export type UnansweredRequest = { - resolve: (mixed) => void, - reject: (mixed) => void, - timerId: TimeoutID, - message: Object, -}; - -export type JsonRpcErrorResponse = { - type: 'error', - payload: { - id: string, - error: { - code: number, - message: string, - }, - }, -}; -export type JsonRpcNotification = { - type: 'notification', - payload: { - method: string, - params: { - subscription: string, - result: mixed, - }, - }, -}; -export type JsonRpcSuccess = { - type: 'success', - payload: { - id: string, - result: mixed, - }, -}; -export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess; - -export class JsonRpcError extends Error { - _code: number; - _details: string; - - constructor(code: number, details: string) { - super(`Remote JSON-RPC error ${code}: ${details}`); - this._code = code; - this._details = details; - } - - get code(): number { - return this._code; - } - - get details(): string { - return this._details; - } -} - -export class TimeOutError extends Error { - jsonRpcMessage: Object; - - constructor(jsonRpcMessage: Object) { - super('Request timed out'); - this.name = 'TimeOutError'; - this.jsonRpcMessage = jsonRpcMessage; - } -} - -export class InvalidReply extends Error { - reply: mixed; - - constructor(reply: mixed, msg: ?string) { - super(msg); - this.name = 'InvalidReply'; - this.reply = reply; - - if (msg) { - this.message = msg + ' - '; - } - this.message += JSON.stringify(reply); - } -} - -const DEFAULT_TIMEOUT_MILLIS = 5000; - -export default class Ipc { - _connectionString: ?string; - _onConnect: Array<{ resolve: () => void }>; - _unansweredRequests: Map<string, UnansweredRequest>; - _subscriptions: Map<string | number, (mixed) => void>; - _websocket: WebSocket; - _backoff: ReconnectionBackoff; - _websocketFactory: (string) => WebSocket; - _closeConnectionHandler: ?() => void; - - constructor(connectionString: string, websocketFactory: ?(string) => WebSocket) { - this._connectionString = connectionString; - this._onConnect = []; - this._unansweredRequests = new Map(); - this._subscriptions = new Map(); - this._websocketFactory = - websocketFactory || ((connectionString) => new WebSocket(connectionString)); - - this._backoff = new ReconnectionBackoff(); - this._reconnect(); - } - - setConnectionString(str: string) { - this._connectionString = str; - } - - setCloseConnectionHandler(handler: ?() => void) { - this._closeConnectionHandler = handler; - } - - async on(event: string, listener: (mixed) => void): Promise<*> { - log.silly(`Adding a listener to ${event}`); - try { - const subscriptionId = await this.send(`${event}_subscribe`); - if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') { - this._subscriptions.set(subscriptionId, listener); - } else { - throw new InvalidReply(subscriptionId, 'The subscription id was not a string or a number'); - } - } catch (e) { - log.error(`Failed adding listener to ${event}: ${e.message}`); - } - } - - send(action: string, data: mixed, timeout: number = DEFAULT_TIMEOUT_MILLIS): Promise<mixed> { - return new Promise(async (resolve, reject) => { - const id = uuid.v4(); - - const params = this._prepareParams(data); - const timerId = setTimeout(() => this._onTimeout(id), timeout); - const jsonrpcMessage = jsonrpc.request(id, action, params); - this._unansweredRequests.set(id, { - resolve: resolve, - reject: reject, - timerId: timerId, - message: jsonrpcMessage, - }); - - try { - const ws = await this._getWebSocket(); - log.silly('Sending message', id, action); - ws.send(jsonrpcMessage); - } catch (e) { - log.error(`Failed sending RPC message "${action}": ${e.message}`); - reject(e); - } - }); - } - - _prepareParams(data: mixed): Array<mixed> | Object { - // JSONRPC only accepts arrays and objects as params, but - // this isn't very nice to use, so this method wraps other - // types in an array. The choice of array is based on try-and-error - - if (data === undefined) { - return []; - } else if (data === null) { - return [null]; - } else if (Array.isArray(data) || typeof data === 'object') { - return data; - } else { - return [data]; - } - } - - _getWebSocket(): Promise<WebSocket> { - return new Promise((resolve) => { - if (this._websocket && this._websocket.readyState === 1) { - resolve(this._websocket); - } else { - log.debug('Waiting for websocket to connect'); - this._onConnect.push({ - resolve: () => resolve(this._websocket), - }); - } - }); - } - - _onTimeout(requestId) { - const request = this._unansweredRequests.get(requestId); - this._unansweredRequests.delete(requestId); - - if (!request) { - log.warn(requestId, 'timed out but it seems to already have been answered'); - return; - } - - log.warn(request.message, 'timed out'); - request.reject(new TimeOutError(request.message)); - } - - _onMessage(message: string) { - const json = JSON.parse(message); - const c = jsonrpc.parseObject(json); - - if (c.type === 'notification') { - this._onNotification(c); - } else { - this._onReply(c); - } - } - - _onNotification(message: JsonRpcNotification) { - const subscriptionId = message.payload.params.subscription; - const listener = this._subscriptions.get(subscriptionId); - - if (listener) { - log.silly('Got notification', message.payload.method, message.payload.params.result); - listener(message.payload.params.result); - } else { - log.warn('Got notification for', message.payload.method, 'but no one is listening for it'); - } - } - - _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) { - const id = message.payload.id; - const request = this._unansweredRequests.get(id); - this._unansweredRequests.delete(id); - - if (!request) { - log.warn('Got reply to', id, 'but no one was waiting for it'); - return; - } - - log.silly('Got answer to', id, message.type); - - clearTimeout(request.timerId); - - if (message.type === 'error') { - const error = message.payload.error; - request.reject(new JsonRpcError(error.code, error.message)); - } else { - const reply = message.payload.result; - request.resolve(reply); - } - } - - _reconnect() { - const connectionString = this._connectionString; - if (!connectionString) return; - - log.info('Connecting to websocket', connectionString); - this._websocket = this._websocketFactory(connectionString); - - this._websocket.onopen = () => { - log.info('Websocket is connected'); - this._backoff.successfullyConnected(); - - while (this._onConnect.length > 0) { - this._onConnect.pop().resolve(); - } - }; - - this._websocket.onmessage = (evt) => { - const data = evt.data; - if (typeof data === 'string') { - this._onMessage(data); - } else { - log.error('Got invalid reply from the server', evt); - } - }; - - this._websocket.onclose = () => { - if (this._closeConnectionHandler) { - this._closeConnectionHandler(); - } - - const delay = this._backoff.getIncreasedBackoff(); - log.warn( - 'The websocket connetion closed, attempting to reconnect it in', - delay, - 'milliseconds', - ); - setTimeout(() => this._reconnect(), delay); - }; - } -} - -/* - * Used to calculate the time to wait before reconnecting - * the websocket. - * - * It uses a linear backoff function that goes from 500ms - * to 3000ms - */ -class ReconnectionBackoff { - _attempt: number; - - constructor() { - this._attempt = 0; - } - - successfullyConnected() { - this._attempt = 0; - } - - getIncreasedBackoff() { - if (this._attempt < 6) { - this._attempt++; - } - - return this._attempt * 500; - } -} diff --git a/app/lib/relay-settings-builder.js b/app/lib/relay-settings-builder.js index 0386d1ef4f..5f6c279e3f 100644 --- a/app/lib/relay-settings-builder.js +++ b/app/lib/relay-settings-builder.js @@ -6,7 +6,7 @@ import type { RelaySettingsUpdate, RelaySettingsNormalUpdate, RelaySettingsCustom, -} from './ipc-facade'; +} from './daemon-rpc'; type LocationBuilder<Self> = { country: (country: string) => Self, diff --git a/app/lib/rpc-address-file.js b/app/lib/rpc-address-file.js index 5185ddc749..6eec716591 100644 --- a/app/lib/rpc-address-file.js +++ b/app/lib/rpc-address-file.js @@ -29,7 +29,7 @@ export class RpcAddressFile { return this._filePath; } - poll(): Promise<void> { + waitUntilExists(): Promise<void> { let promise = this._pollPromise; if (!promise) { @@ -52,20 +52,7 @@ export class RpcAddressFile { return promise; } - isTrusted() { - const filePath = this._filePath; - switch (process.platform) { - case 'win32': - return isOwnedByLocalSystem(filePath); - case 'darwin': - case 'linux': - return isOwnedAndOnlyWritableByRoot(filePath); - default: - throw new Error(`Unknown platform: ${process.platform}`); - } - } - - async waitUntilExists(): Promise<RpcCredentials> { + async parse(): Promise<RpcCredentials> { const data = await fsReadFileAsync(this._filePath, 'utf8'); const [connectionString, sharedSecret] = data.split('\n', 2); @@ -78,6 +65,19 @@ export class RpcAddressFile { throw new Error('Cannot parse the RPC address file'); } } + + isTrusted() { + const filePath = this._filePath; + switch (process.platform) { + case 'win32': + return isOwnedByLocalSystem(filePath); + case 'darwin': + case 'linux': + return isOwnedAndOnlyWritableByRoot(filePath); + default: + throw new Error(`Unknown platform: ${process.platform}`); + } + } } function getRpcAddressFilePath() { |
