summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrej Mihajlov <and@mullvad.net>2018-06-20 15:34:40 +0200
committerAndrej Mihajlov <and@mullvad.net>2018-07-03 13:37:54 +0200
commit73840e98952dd3f7c005fcec44c971455da179eb (patch)
tree15a887410858007a802450ad2870d8bf495dd16d
parent67e82627564f8e4a2d8da4bcf5f0fd00867876bc (diff)
downloadmullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.tar.xz
mullvadvpn-73840e98952dd3f7c005fcec44c971455da179eb.zip
Refactor IpcFacade to DaemonRpc and JsonRpcWs to JsonRpcTransport
-rw-r--r--app/app.android.js2
-rw-r--r--app/app.js40
-rw-r--r--app/components/Account.js2
-rw-r--r--app/components/Login.js2
-rw-r--r--app/components/SelectLocation.js2
-rw-r--r--app/containers/SelectLocationPage.js2
-rw-r--r--app/lib/backend.js412
-rw-r--r--app/lib/daemon-rpc.js387
-rw-r--r--app/lib/ipc-facade.js356
-rw-r--r--app/lib/jsonrpc-transport.js326
-rw-r--r--app/lib/jsonrpc-ws-ipc.js312
-rw-r--r--app/lib/relay-settings-builder.js2
-rw-r--r--app/lib/rpc-address-file.js30
-rw-r--r--app/main.js4
-rw-r--r--app/redux/account/actions.js2
-rw-r--r--app/redux/account/reducers.js2
-rw-r--r--app/redux/connection/actions.js6
-rw-r--r--app/redux/connection/reducers.js2
-rw-r--r--app/redux/settings/reducers.js2
-rw-r--r--package.json1
-rw-r--r--test/auth.spec.js20
-rw-r--r--test/autologin.spec.js2
-rw-r--r--test/connect.spec.js4
-rw-r--r--test/helpers/IpcChain.js18
-rw-r--r--test/helpers/ipc-helpers.js14
-rw-r--r--test/ipc.spec.js134
-rw-r--r--test/jsonrpc-transport.spec.js144
-rw-r--r--test/login.spec.js2
-rw-r--r--test/logout.spec.js2
-rw-r--r--test/mocks/ipc.js58
-rw-r--r--yarn.lock12
31 files changed, 1247 insertions, 1057 deletions
diff --git a/app/app.android.js b/app/app.android.js
index dbbb0c1c26..8158922673 100644
--- a/app/app.android.js
+++ b/app/app.android.js
@@ -29,7 +29,7 @@ DeviceEventEmitter.addListener('com.mullvad.daemon-connection', async (_event, a
await backend.autologin();
await backend.fetchRelaySettings();
await backend.fetchSecurityState();
- await backend.connect();
+ await backend.connectTunnel();
} catch (e) {
if (e instanceof NoAccountError) {
log.debug('No previously configured account set, showing window');
diff --git a/app/app.js b/app/app.js
index 70d1052560..5ebc1137a1 100644
--- a/app/app.js
+++ b/app/app.js
@@ -10,39 +10,55 @@ import { log } from './lib/platform';
import makeRoutes from './routes';
import configureStore from './redux/store';
import { Backend, NoAccountError } from './lib/backend';
+import { DaemonRpc } from './lib/daemon-rpc';
import { setShutdownHandler } from './shutdown-handler';
import type { ConnectionState } from './redux/connection/reducers';
import type { TrayIconType } from './tray-icon-controller';
+import type { RpcCredentialsProvider, RpcCredentials } from './lib/backend';
const initialState = null;
const memoryHistory = createMemoryHistory();
const store = configureStore(initialState, memoryHistory);
-const backend = new Backend(store);
-ipcRenderer.on('daemon-connection', async (_event, args) => {
- backend.setCredentials(args.credentials);
- backend.sync();
+class CredentialsProvider implements RpcCredentialsProvider {
+ async request(): Promise<RpcCredentials> {
+ return new Promise((resolve, _reject) => {
+ ipcRenderer.once('daemon-connection', async (_event, credentials: RpcCredentials) => {
+ log.debug('Got credentials: ', credentials);
+ resolve(credentials);
+ });
+ ipcRenderer.send('daemon-connection');
+ });
+ }
+}
+
+const rpc = new DaemonRpc();
+const credentialsProvider = new CredentialsProvider();
+const backend = new Backend(store, rpc, credentialsProvider);
+
+(async function() {
+ backend.connect();
+
try {
await backend.autologin();
- await backend.fetchRelaySettings();
- await backend.fetchSecurityState();
- await backend.connect();
- } catch (e) {
- if (e instanceof NoAccountError) {
+ } catch (error) {
+ if (error instanceof NoAccountError) {
log.debug('No previously configured account set, showing window');
ipcRenderer.send('show-window');
+ } else {
+ log.error(`Failed to autologin: ${error.message}`);
}
}
-});
-ipcRenderer.send('daemon-connection');
+ backend.connectTunnel();
+})();
setShutdownHandler(async () => {
log.info('Executing a shutdown handler');
try {
- await backend.disconnect();
+ await backend.disconnectTunnel();
log.info('Disconnected the tunnel');
} catch (e) {
log.error(`Failed to shutdown tunnel: ${e.message}`);
diff --git a/app/components/Account.js b/app/components/Account.js
index fc72ff5b30..f8ea3dc19b 100644
--- a/app/components/Account.js
+++ b/app/components/Account.js
@@ -8,7 +8,7 @@ import styles from './AccountStyles';
import Img from './Img';
import { formatAccount } from '../lib/formatters';
-import type { AccountToken } from '../lib/ipc-facade';
+import type { AccountToken } from '../lib/daemon-rpc';
export type AccountProps = {
accountToken: AccountToken,
diff --git a/app/components/Login.js b/app/components/Login.js
index dd297c8e9c..d15681c58b 100644
--- a/app/components/Login.js
+++ b/app/components/Login.js
@@ -11,7 +11,7 @@ import styles from './LoginStyles';
import { colors } from '../config';
import type { AccountReduxState } from '../redux/account/reducers';
-import type { AccountToken } from '../lib/ipc-facade';
+import type { AccountToken } from '../lib/daemon-rpc';
export type Props = {
account: AccountReduxState,
diff --git a/app/components/SelectLocation.js b/app/components/SelectLocation.js
index 5864ed55a3..14d0a04f9c 100644
--- a/app/components/SelectLocation.js
+++ b/app/components/SelectLocation.js
@@ -14,7 +14,7 @@ import type {
RelayLocationRedux,
RelayLocationCityRedux,
} from '../redux/settings/reducers';
-import type { RelayLocation } from '../lib/ipc-facade';
+import type { RelayLocation } from '../lib/daemon-rpc';
export type SelectLocationProps = {
settings: SettingsReduxState,
diff --git a/app/containers/SelectLocationPage.js b/app/containers/SelectLocationPage.js
index b2ae476912..29b47c14a3 100644
--- a/app/containers/SelectLocationPage.js
+++ b/app/containers/SelectLocationPage.js
@@ -26,7 +26,7 @@ const mapDispatchToProps = (dispatch: ReduxDispatch, props: SharedRouteProps) =>
await backend.updateRelaySettings(relayUpdate);
await backend.fetchRelaySettings();
- await backend.connect();
+ await backend.connectTunnel();
pushHistory('/connect');
} catch (e) {
diff --git a/app/lib/backend.js b/app/lib/backend.js
index 39521d8edb..4fb9eb3cfa 100644
--- a/app/lib/backend.js
+++ b/app/lib/backend.js
@@ -1,16 +1,23 @@
// @flow
-import { log } from '../lib/platform';
-import { IpcFacade, RealIpc } from './ipc-facade';
-import { JsonRpcError, TimeOutError } from './jsonrpc-ws-ipc';
+import { bindActionCreators } from 'redux';
+import { push } from 'react-router-redux';
+import {
+ RemoteError as JsonRpcTransportRemoteError,
+ TimeOutError as JsonRpcTransportTimeOutError,
+} from './jsonrpc-transport';
import accountActions from '../redux/account/actions';
import connectionActions from '../redux/connection/actions';
import settingsActions from '../redux/settings/actions';
-import { push } from 'react-router-redux';
+import { log } from '../lib/platform';
-import type { RpcCredentials } from './rpc-address-file';
+import type { RpcCredentials as OriginalRpcCredentials } from './rpc-address-file';
+import type {
+ DaemonRpcProtocol,
+ ConnectionObserver as DaemonConnectionObserver,
+} from './daemon-rpc';
import type { ReduxStore } from '../redux/store';
-import type { AccountToken, BackendState, RelaySettingsUpdate } from './ipc-facade';
+import type { AccountToken, BackendState, RelaySettingsUpdate } from './daemon-rpc';
import type { ConnectionState } from '../redux/connection/reducers';
export class NoCreditError extends Error {
@@ -75,71 +82,80 @@ export class UnknownError extends Error {
}
}
+export class CredentialsRequestError extends Error {
+ _reason: Error;
+
+ constructor(reason: Error) {
+ super('Failed to request the RPC credentials');
+ this._reason = reason;
+ }
+
+ get reason(): Error {
+ return this._reason;
+ }
+}
+
+export type RpcCredentials = OriginalRpcCredentials;
+export interface RpcCredentialsProvider {
+ request(): Promise<RpcCredentials>;
+}
+
/**
* Backend implementation
*/
export class Backend {
- _ipc: IpcFacade;
- _credentials: ?RpcCredentials;
- _authenticationPromise: ?Promise<void>;
_store: ReduxStore;
+ _daemonRpc: DaemonRpcProtocol;
+ _credentialsProvider: RpcCredentialsProvider;
+ _reconnectBackoff = new ReconnectionBackoff();
- constructor(store: ReduxStore, credentials?: RpcCredentials, ipc: ?IpcFacade) {
- this._store = store;
- this._credentials = credentials;
+ _credentials: ?RpcCredentials;
+ _authenticationPromise: ?Promise<void>;
- if (ipc) {
- this._ipc = ipc;
+ _openConnectionObserver: ?DaemonConnectionObserver;
+ _closeConnectionObserver: ?DaemonConnectionObserver;
- // force to re-authenticate when connection closed
- this._ipc.setCloseConnectionHandler(() => {
- this._authenticationPromise = null;
- });
-
- this._registerIpcListeners();
- this._startReachability();
- }
- }
+ constructor(
+ store: ReduxStore,
+ rpc: DaemonRpcProtocol,
+ credentialsProvider: RpcCredentialsProvider,
+ ) {
+ this._store = store;
+ this._daemonRpc = rpc;
+ this._credentialsProvider = credentialsProvider;
- setCredentials(credentials: RpcCredentials) {
- log.debug('Got connection info to backend', credentials.connectionString);
- this._credentials = credentials;
+ this._openConnectionObserver = rpc.addOpenConnectionObserver(() => {
+ this._onOpenConnection();
+ });
- if (this._ipc) {
- this._credentials = credentials;
- } else {
- this._ipc = new RealIpc(credentials.connectionString);
+ this._closeConnectionObserver = rpc.addCloseConnectionObserver((error) => {
+ this._onCloseConnection(error);
+ });
- // force to re-authenticate when connection closed
- this._ipc.setCloseConnectionHandler(() => {
- this._authenticationPromise = null;
- });
- }
- this._registerIpcListeners();
+ this._setupReachability();
}
- async sync() {
- log.info('Syncing with the backend...');
+ dispose() {
+ const openConnectionObserver = this._openConnectionObserver;
+ const closeConnectionObserver = this._closeConnectionObserver;
- try {
- await this._fetchRelayLocations();
- } catch (e) {
- log.error('Failed to fetch the relay locations: ', e.message);
+ if (openConnectionObserver) {
+ openConnectionObserver.unsubscribe();
+ this._openConnectionObserver = null;
}
- try {
- await this._fetchLocation();
- } catch (e) {
- log.error('Failed to fetch the location: ', e.message);
+ if (closeConnectionObserver) {
+ closeConnectionObserver.unsubscribe();
+ this._closeConnectionObserver = null;
}
+ }
- try {
- await this._fetchAllowLan();
- } catch (e) {
- log.error('Failed to fetch the LAN sharing policy: ', e.message);
- }
+ connect() {
+ this._connectToDaemon();
+ }
- await this._fetchAccountHistory();
+ disconnect() {
+ this._daemonRpc.disconnect();
}
async login(accountToken: AccountToken) {
@@ -150,11 +166,11 @@ export class Backend {
try {
await this._ensureAuthenticated();
- const accountData = await this._ipc.getAccountData(accountToken);
+ const accountData = await this._daemonRpc.getAccountData(accountToken);
log.debug('Account exists', accountData);
- await this._ipc.setAccount(accountToken);
+ await this._daemonRpc.setAccount(accountToken);
log.info('Log in complete');
@@ -166,10 +182,10 @@ export class Backend {
setTimeout(() => {
this._store.dispatch(push('/connect'));
log.debug('Autoconnecting...');
- this.connect();
+ this.connectTunnel();
}, 1000);
- await this._fetchAccountHistory();
+ await this.fetchAccountHistory();
} catch (e) {
log.error('Failed to log in,', e.message);
@@ -178,27 +194,6 @@ export class Backend {
}
}
- _rpcErrorToBackendError(e) {
- if (e instanceof JsonRpcError) {
- switch (e.code) {
- case -200: // Account doesn't exist
- return new InvalidAccountError();
- case -32603: // Internal error
- // We treat all internal backend errors as the user cannot reach
- // api.mullvad.net. This is not always true of course, but it is
- // true so often that we choose to disregard the other edge cases
- // for now.
- return new CommunicationError();
- }
- } else if (e instanceof TimeOutError) {
- return new CommunicationError();
- } else if (e instanceof NoDaemonError) {
- return e;
- }
-
- return new UnknownError(e.message);
- }
-
async autologin() {
try {
log.debug('Attempting to log in automatically');
@@ -207,7 +202,7 @@ export class Backend {
this._store.dispatch(accountActions.startLogin());
- const accountToken = await this._ipc.getAccount();
+ const accountToken = await this._daemonRpc.getAccount();
if (!accountToken) {
throw new NoAccountError();
}
@@ -215,7 +210,7 @@ export class Backend {
log.debug('The backend had an account number stored: ', accountToken);
this._store.dispatch(accountActions.startLogin(accountToken));
- const accountData = await this._ipc.getAccountData(accountToken);
+ const accountData = await this._daemonRpc.getAccountData(accountToken);
log.debug('The stored account number still exists', accountData);
this._store.dispatch(accountActions.loginSuccessful(accountData.expiry));
@@ -234,12 +229,12 @@ export class Backend {
// @TODO: What does it mean for a logout to be successful or failed?
try {
await this._ensureAuthenticated();
- await this._ipc.setAccount(null);
+ await this._daemonRpc.setAccount(null);
this._store.dispatch(accountActions.loggedOut());
// disconnect user during logout
- await this.disconnect();
+ await this.disconnectTunnel();
this._store.dispatch(push('/'));
} catch (e) {
@@ -247,9 +242,9 @@ export class Backend {
}
}
- async connect() {
+ async connectTunnel() {
try {
- const currentState = await this._ipc.getState();
+ const currentState = await this._daemonRpc.getState();
if (currentState.state === 'secured') {
log.debug('Refusing to connect as connection is already secured');
this._store.dispatch(connectionActions.connected());
@@ -259,18 +254,18 @@ export class Backend {
this._store.dispatch(connectionActions.connecting());
await this._ensureAuthenticated();
- await this._ipc.connect();
+ await this._daemonRpc.connectTunnel();
} catch (e) {
log.error('Failed to connect: ', e.message);
this._store.dispatch(connectionActions.disconnected());
}
}
- async disconnect() {
+ async disconnectTunnel() {
// @TODO: Failure modes
try {
await this._ensureAuthenticated();
- await this._ipc.disconnect();
+ await this._daemonRpc.disconnectTunnel();
} catch (e) {
log.error('Failed to disconnect: ', e.message);
}
@@ -279,7 +274,7 @@ export class Backend {
async updateRelaySettings(relaySettings: RelaySettingsUpdate) {
try {
await this._ensureAuthenticated();
- await this._ipc.updateRelaySettings(relaySettings);
+ await this._daemonRpc.updateRelaySettings(relaySettings);
} catch (e) {
log.error('Failed to update relay settings: ', e.message);
}
@@ -288,7 +283,7 @@ export class Backend {
async fetchRelaySettings() {
await this._ensureAuthenticated();
- const relaySettings = await this._ipc.getRelaySettings();
+ const relaySettings = await this._daemonRpc.getRelaySettings();
log.debug('Got relay settings from backend', JSON.stringify(relaySettings));
if (relaySettings.normal) {
@@ -339,13 +334,13 @@ export class Backend {
}
async updateAccountExpiry() {
- const ipc = this._ipc;
+ const ipc = this._daemonRpc;
const store = this._store;
try {
await this._ensureAuthenticated();
- const accountToken = await this._ipc.getAccount();
+ const accountToken = await this._daemonRpc.getAccount();
if (!accountToken) {
throw new NoAccountError();
}
@@ -357,31 +352,22 @@ export class Backend {
}
}
- async removeAccountFromHistory(accountToken: AccountToken) {
- try {
- await this._ensureAuthenticated();
- await this._ipc.removeAccountFromHistory(accountToken);
- await this._fetchAccountHistory();
- } catch (e) {
- log.error('Failed to remove account token from history', e.message);
- }
+ async removeAccountFromHistory(accountToken: AccountToken): Promise<void> {
+ await this._ensureAuthenticated();
+ await this._daemonRpc.removeAccountFromHistory(accountToken);
+ await this.fetchAccountHistory();
}
- async _fetchAccountHistory() {
- try {
- await this._ensureAuthenticated();
- const accountHistory = await this._ipc.getAccountHistory();
- this._store.dispatch(accountActions.updateAccountHistory(accountHistory));
- } catch (e) {
- log.info('Failed to fetch account history,', e.message);
- throw e;
- }
+ async fetchAccountHistory(): Promise<void> {
+ await this._ensureAuthenticated();
+ const accountHistory = await this._daemonRpc.getAccountHistory();
+ this._store.dispatch(accountActions.updateAccountHistory(accountHistory));
}
- async _fetchRelayLocations() {
+ async fetchRelayLocations() {
await this._ensureAuthenticated();
- const locations = await this._ipc.getRelayLocations();
+ const locations = await this._daemonRpc.getRelayLocations();
log.info('Got relay locations');
@@ -401,10 +387,10 @@ export class Backend {
this._store.dispatch(settingsActions.updateRelayLocations(storedLocations));
}
- async _fetchLocation() {
+ async fetchLocation() {
await this._ensureAuthenticated();
- const location = await this._ipc.getLocation();
+ const location = await this._daemonRpc.getLocation();
log.info('Got location from daemon');
@@ -421,65 +407,148 @@ export class Backend {
}
async setAllowLan(allowLan: boolean) {
- try {
- await this._ensureAuthenticated();
- await this._ipc.setAllowLan(allowLan);
+ await this._ensureAuthenticated();
+ await this._daemonRpc.setAllowLan(allowLan);
- this._store.dispatch(settingsActions.updateAllowLan(allowLan));
- } catch (e) {
- log.error('Failed to change the LAN sharing policy: ', e.message);
- }
+ this._store.dispatch(settingsActions.updateAllowLan(allowLan));
}
- async _fetchAllowLan() {
+ async fetchAllowLan() {
await this._ensureAuthenticated();
- const allowLan = await this._ipc.getAllowLan();
+ const allowLan = await this._daemonRpc.getAllowLan();
this._store.dispatch(settingsActions.updateAllowLan(allowLan));
}
async fetchSecurityState() {
await this._ensureAuthenticated();
- const securityState = await this._ipc.getState();
+ const securityState = await this._daemonRpc.getState();
const connectionState = this._securityStateToConnectionState(securityState);
this._dispatchConnectionState(connectionState);
}
+ async _requestCredentials(): Promise<RpcCredentials> {
+ try {
+ return await this._credentialsProvider.request();
+ } catch (providerError) {
+ throw new CredentialsRequestError(providerError);
+ }
+ }
+
+ async _connectToDaemon(): Promise<void> {
+ let credentials;
+ try {
+ credentials = await this._requestCredentials();
+ } catch (error) {
+ log.error(`Cannot request the RPC credentials: ${error.message}`);
+ return;
+ }
+
+ this._credentials = credentials;
+ this._daemonRpc.connect(credentials.connectionString);
+ }
+
+ async _onOpenConnection() {
+ this._reconnectBackoff.reset();
+
+ // make sure to re-subscribe to state notifications when connection is re-established.
+ try {
+ await this._subscribeStateListener();
+ } catch (error) {
+ log.error(`Cannot subscribe for RPC notifications: ${error.message}`);
+ }
+
+ this._fetchInitialState();
+ }
+
+ async _onCloseConnection(error: ?Error) {
+ // force to re-authenticate when connection closed
+ this._authenticationPromise = null;
+
+ if (error) {
+ log.debug(`Lost connection to daemon: ${error.message}`);
+
+ const recover = async () => {
+ try {
+ await this.connect();
+ } catch (error) {
+ log.error(`Failed to reconnect: ${error.message}`);
+ }
+ };
+
+ this._reconnectBackoff.attempt(() => {
+ recover();
+ });
+ }
+ }
+
/**
* Start reachability monitoring for online/offline detection
* This is currently done via HTML5 APIs but will be replaced later
* with proper backend integration.
*/
- _startReachability() {
+ _setupReachability() {
+ const { online, offline } = bindActionCreators(connectionActions, this._store.dispatch);
+
window.addEventListener('online', () => {
- this._store.dispatch(connectionActions.online());
+ online();
});
window.addEventListener('offline', () => {
- // force disconnect since there is no real connection anyway.
- this.disconnect();
- this._store.dispatch(connectionActions.offline());
+ offline();
});
- // update online status in background
- setTimeout(() => {
- const action = navigator.onLine ? connectionActions.online() : connectionActions.offline();
-
- this._store.dispatch(action);
- }, 0);
+ if (navigator.onLine) {
+ online();
+ } else {
+ offline();
+ }
}
- async _registerIpcListeners() {
+ async _subscribeStateListener() {
await this._ensureAuthenticated();
- this._ipc.registerStateListener((newState) => {
- const connectionState = this._securityStateToConnectionState(newState);
- log.debug(`Got new state from backend {state: ${newState.state}, \
- target_state: ${newState.target_state}}, translated to '${connectionState}'`);
- this._dispatchConnectionState(connectionState);
- this.sync();
+ await this._daemonRpc.subscribeStateListener((newState, error) => {
+ if (error) {
+ log.error(`Received an error when processing the incoming state change: ${error.message}`);
+ }
+
+ if (newState) {
+ const connectionState = this._securityStateToConnectionState(newState);
+
+ log.debug(
+ `Got new state from backend {state: ${newState.state}, target_state: ${
+ newState.target_state
+ }}, translated to '${connectionState}'`,
+ );
+
+ this._dispatchConnectionState(connectionState);
+ this._refreshStateOnChange();
+ }
});
}
+ async _fetchInitialState() {
+ try {
+ await Promise.all([
+ this.fetchSecurityState(),
+ this.fetchRelaySettings(),
+ this.fetchRelayLocations(),
+ this.fetchAllowLan(),
+ this.fetchLocation(),
+ ]);
+ } catch (error) {
+ log.error(`Cannot prefetch data: ${error.message}`);
+ }
+ }
+
+ async _refreshStateOnChange() {
+ try {
+ await this.fetchLocation();
+ } catch (error) {
+ log.error(`Failed to fetch the location: ${error.message}`);
+ }
+ }
+
_securityStateToConnectionState(backendState: BackendState): ConnectionState {
if (backendState.state === 'unsecured' && backendState.target_state === 'secured') {
return 'connecting';
@@ -492,19 +561,44 @@ export class Backend {
}
_dispatchConnectionState(connectionState: ConnectionState) {
+ const { connecting, connected, disconnected } = bindActionCreators(
+ connectionActions,
+ this._store.dispatch,
+ );
switch (connectionState) {
case 'connecting':
- this._store.dispatch(connectionActions.connecting());
+ connecting();
break;
case 'connected':
- this._store.dispatch(connectionActions.connected());
+ connected();
break;
case 'disconnected':
- this._store.dispatch(connectionActions.disconnected());
+ disconnected();
break;
}
}
+ _rpcErrorToBackendError(e) {
+ if (e instanceof JsonRpcTransportRemoteError) {
+ switch (e.code) {
+ case -200: // Account doesn't exist
+ return new InvalidAccountError();
+ case -32603: // Internal error
+ // We treat all internal backend errors as the user cannot reach
+ // api.mullvad.net. This is not always true of course, but it is
+ // true so often that we choose to disregard the other edge cases
+ // for now.
+ return new CommunicationError();
+ }
+ } else if (e instanceof JsonRpcTransportTimeOutError) {
+ return new CommunicationError();
+ } else if (e instanceof NoDaemonError) {
+ return e;
+ }
+
+ return new UnknownError(e.message);
+ }
+
_ensureAuthenticated(): Promise<void> {
const credentials = this._credentials;
if (credentials) {
@@ -519,11 +613,41 @@ export class Backend {
async _authenticate(sharedSecret: string) {
try {
- await this._ipc.authenticate(sharedSecret);
+ await this._daemonRpc.authenticate(sharedSecret);
log.info('Authenticated with backend');
} catch (e) {
- log.error('Failed to authenticate with backend: ', e.message);
+ log.error(`Failed to authenticate with backend: ${e.message}`);
throw e;
}
}
}
+
+/*
+ * Used to calculate the time to wait before reconnecting
+ * the websocket.
+ *
+ * It uses a linear backoff function that goes from 500ms
+ * to 3000ms
+ */
+class ReconnectionBackoff {
+ _attempt: number;
+
+ constructor() {
+ this._attempt = 0;
+ }
+
+ attempt(handler: () => void) {
+ setTimeout(handler, this._getIncreasedBackoff());
+ }
+
+ reset() {
+ this._attempt = 0;
+ }
+
+ _getIncreasedBackoff() {
+ if (this._attempt < 6) {
+ this._attempt++;
+ }
+ return this._attempt * 500;
+ }
+}
diff --git a/app/lib/daemon-rpc.js b/app/lib/daemon-rpc.js
new file mode 100644
index 0000000000..2ef3d06e13
--- /dev/null
+++ b/app/lib/daemon-rpc.js
@@ -0,0 +1,387 @@
+// @flow
+
+import JsonRpcTransport from './jsonrpc-transport';
+import {
+ object,
+ maybe,
+ string,
+ number,
+ boolean,
+ enumeration,
+ arrayOf,
+ oneOf,
+} from 'validated/schema';
+import { validate } from 'validated/object';
+
+import type { Node as SchemaNode } from 'validated/schema';
+
+export type AccountData = { expiry: string };
+export type AccountToken = string;
+export type Ip = string;
+export type Location = {
+ ip: Ip,
+ country: string,
+ city: ?string,
+ latitude: number,
+ longitude: number,
+ mullvad_exit_ip: boolean,
+};
+const LocationSchema = object({
+ ip: string,
+ country: string,
+ city: maybe(string),
+ latitude: number,
+ longitude: number,
+ mullvad_exit_ip: boolean,
+});
+
+export type SecurityState = 'secured' | 'unsecured';
+export type BackendState = {
+ state: SecurityState,
+ target_state: SecurityState,
+};
+
+export type RelayProtocol = 'tcp' | 'udp';
+export type RelayLocation = {| city: [string, string] |} | {| country: string |};
+
+type OpenVpnParameters = {
+ port: 'any' | { only: number },
+ protocol: 'any' | { only: RelayProtocol },
+};
+
+type TunnelOptions<TOpenVpnParameters> = {
+ openvpn: TOpenVpnParameters,
+};
+
+type RelaySettingsNormal<TTunnelOptions> = {
+ location:
+ | 'any'
+ | {
+ only: RelayLocation,
+ },
+ tunnel:
+ | 'any'
+ | {
+ only: TTunnelOptions,
+ },
+};
+
+// types describing the structure of RelaySettings
+export type RelaySettingsCustom = {
+ host: string,
+ tunnel: {
+ openvpn: {
+ port: number,
+ protocol: RelayProtocol,
+ },
+ },
+};
+export type RelaySettings =
+ | {|
+ normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>,
+ |}
+ | {|
+ custom_tunnel_endpoint: RelaySettingsCustom,
+ |};
+
+// types describing the partial update of RelaySettings
+export type RelaySettingsNormalUpdate = $Shape<
+ RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>,
+>;
+export type RelaySettingsUpdate =
+ | {|
+ normal: RelaySettingsNormalUpdate,
+ |}
+ | {|
+ custom_tunnel_endpoint: RelaySettingsCustom,
+ |};
+
+const constraint = <T>(constraintValue: SchemaNode<T>) => {
+ return oneOf(
+ string, // any
+ object({
+ only: constraintValue,
+ }),
+ );
+};
+
+const RelaySettingsSchema = oneOf(
+ object({
+ normal: object({
+ location: constraint(
+ oneOf(
+ object({
+ city: arrayOf(string),
+ }),
+ object({
+ country: string,
+ }),
+ ),
+ ),
+ tunnel: constraint(
+ object({
+ openvpn: object({
+ port: constraint(number),
+ protocol: constraint(enumeration('udp', 'tcp')),
+ }),
+ }),
+ ),
+ }),
+ }),
+ object({
+ custom_tunnel_endpoint: object({
+ host: string,
+ tunnel: object({
+ openvpn: object({
+ port: number,
+ protocol: enumeration('udp', 'tcp'),
+ }),
+ }),
+ }),
+ }),
+);
+
+export type RelayList = {
+ countries: Array<RelayListCountry>,
+};
+
+export type RelayListCountry = {
+ name: string,
+ code: string,
+ cities: Array<RelayListCity>,
+};
+
+export type RelayListCity = {
+ name: string,
+ code: string,
+ latitude: number,
+ longitude: number,
+ has_active_relays: boolean,
+};
+
+const RelayListSchema = object({
+ countries: arrayOf(
+ object({
+ name: string,
+ code: string,
+ cities: arrayOf(
+ object({
+ name: string,
+ code: string,
+ latitude: number,
+ longitude: number,
+ has_active_relays: boolean,
+ }),
+ ),
+ }),
+ ),
+});
+
+const AccountDataSchema = object({
+ expiry: string,
+});
+
+const allSecurityStates: Array<SecurityState> = ['secured', 'unsecured'];
+const BackendStateSchema = object({
+ state: enumeration(...allSecurityStates),
+ target_state: enumeration(...allSecurityStates),
+});
+
+export interface DaemonRpcProtocol {
+ connect(string): void;
+ disconnect(): void;
+ getAccountData(AccountToken): Promise<AccountData>;
+ getRelayLocations(): Promise<RelayList>;
+ getAccount(): Promise<?AccountToken>;
+ setAccount(accountToken: ?AccountToken): Promise<void>;
+ updateRelaySettings(RelaySettingsUpdate): Promise<void>;
+ getRelaySettings(): Promise<RelaySettings>;
+ setAllowLan(boolean): Promise<void>;
+ getAllowLan(): Promise<boolean>;
+ connectTunnel(): Promise<void>;
+ disconnectTunnel(): Promise<void>;
+ getLocation(): Promise<Location>;
+ getState(): Promise<BackendState>;
+ subscribeStateListener((state: ?BackendState, error: ?Error) => void): Promise<void>;
+ addOpenConnectionObserver(() => void): ConnectionObserver;
+ addCloseConnectionObserver((error: ?Error) => void): ConnectionObserver;
+ authenticate(sharedSecret: string): Promise<void>;
+ getAccountHistory(): Promise<Array<AccountToken>>;
+ removeAccountFromHistory(accountToken: AccountToken): Promise<void>;
+}
+
+export class ResponseParseError extends Error {
+ _validationError: ?Error;
+
+ constructor(message: string, validationError: ?Error) {
+ super(message);
+ this._validationError = validationError;
+ }
+
+ get validationError(): ?Error {
+ return this._validationError;
+ }
+}
+
+export type ConnectionObserver = {
+ unsubscribe: () => void,
+};
+
+export class DaemonRpc implements DaemonRpcProtocol {
+ _transport = new JsonRpcTransport();
+
+ async authenticate(sharedSecret: string): Promise<void> {
+ await this._transport.send('auth', sharedSecret);
+ }
+
+ connect(connectionString: string) {
+ this._transport.connect(connectionString);
+ }
+
+ disconnect() {
+ this._transport.disconnect();
+ }
+
+ addOpenConnectionObserver(handler: () => void): ConnectionObserver {
+ this._transport.on('open', handler);
+ return {
+ unsubscribe: () => {
+ this._transport.off('open', handler);
+ },
+ };
+ }
+
+ addCloseConnectionObserver(handler: (error: ?Error) => void): ConnectionObserver {
+ this._transport.on('close', handler);
+ return {
+ unsubscribe: () => {
+ this._transport.off('close', handler);
+ },
+ };
+ }
+
+ async getAccountData(accountToken: AccountToken): Promise<AccountData> {
+ // send the IPC with 30s timeout since the backend will wait
+ // for a HTTP request before replying
+ const response = await this._transport.send('get_account_data', accountToken, 30000);
+ try {
+ return validate(AccountDataSchema, response);
+ } catch (error) {
+ throw new ResponseParseError('Invalid response from get_account_data', error);
+ }
+ }
+
+ async getRelayLocations(): Promise<RelayList> {
+ const response = await this._transport.send('get_relay_locations');
+ try {
+ return validate(RelayListSchema, response);
+ } catch (error) {
+ throw new ResponseParseError('Invalid response from get_relay_locations', error);
+ }
+ }
+
+ async getAccount(): Promise<?AccountToken> {
+ const response = await this._transport.send('get_account');
+ if (response === null || typeof response === 'string') {
+ return response;
+ } else {
+ throw new ResponseParseError('Invalid response from get_account', null);
+ }
+ }
+
+ async setAccount(accountToken: ?AccountToken): Promise<void> {
+ await this._transport.send('set_account', accountToken);
+ }
+
+ async updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> {
+ await this._transport.send('update_relay_settings', [relaySettings]);
+ }
+
+ async getRelaySettings(): Promise<RelaySettings> {
+ const response = await this._transport.send('get_relay_settings');
+ try {
+ const validatedObject = validate(RelaySettingsSchema, response);
+
+ /* $FlowFixMe:
+ There is no way to express the constraints with string literals, i.e:
+
+ RelaySettingsSchema constraint:
+ oneOf(string, object)
+
+ RelaySettings constraint:
+ 'any' | object
+
+ These two are incompatible so we simply enforce the type for now.
+ */
+ return ((validatedObject: any): RelaySettings);
+ } catch (e) {
+ throw new ResponseParseError('Invalid response from get_relay_settings', e);
+ }
+ }
+
+ async setAllowLan(allowLan: boolean): Promise<void> {
+ await this._transport.send('set_allow_lan', [allowLan]);
+ }
+
+ async getAllowLan(): Promise<boolean> {
+ const response = await this._transport.send('get_allow_lan');
+ if (typeof response === 'boolean') {
+ return response;
+ } else {
+ throw new ResponseParseError('Invalid response from get_allow_lan', null);
+ }
+ }
+
+ async connectTunnel(): Promise<void> {
+ await this._transport.send('connect');
+ }
+
+ async disconnectTunnel(): Promise<void> {
+ await this._transport.send('disconnect');
+ }
+
+ async getLocation(): Promise<Location> {
+ // send the IPC with 30s timeout since the backend will wait
+ // for a HTTP request before replying
+
+ const response = await this._transport.send('get_current_location', [], 30000);
+ try {
+ return validate(LocationSchema, response);
+ } catch (error) {
+ throw new ResponseParseError('Invalid response from get_current_location', error);
+ }
+ }
+
+ async getState(): Promise<BackendState> {
+ const response = await this._transport.send('get_state');
+ try {
+ return validate(BackendStateSchema, response);
+ } catch (error) {
+ throw new ResponseParseError('Invalid response from get_state', error);
+ }
+ }
+
+ subscribeStateListener(listener: (state: ?BackendState, error: ?Error) => void): Promise<void> {
+ return this._transport.subscribe('new_state', (payload) => {
+ try {
+ const newState = validate(BackendStateSchema, payload);
+ listener(newState, null);
+ } catch (error) {
+ listener(null, new ResponseParseError('Invalid payload from new_state', error));
+ }
+ });
+ }
+
+ async getAccountHistory(): Promise<Array<AccountToken>> {
+ const response = await this._transport.send('get_account_history');
+ try {
+ return validate(arrayOf(string), response);
+ } catch (error) {
+ throw new ResponseParseError('Invalid response from get_account_history', null);
+ }
+ }
+
+ async removeAccountFromHistory(accountToken: AccountToken): Promise<void> {
+ await this._transport.send('remove_account_from_history', accountToken);
+ }
+}
diff --git a/app/lib/ipc-facade.js b/app/lib/ipc-facade.js
deleted file mode 100644
index 103ef4d931..0000000000
--- a/app/lib/ipc-facade.js
+++ /dev/null
@@ -1,356 +0,0 @@
-// @flow
-
-import JsonRpcWs, { InvalidReply } from './jsonrpc-ws-ipc';
-import {
- object,
- maybe,
- string,
- number,
- boolean,
- enumeration,
- arrayOf,
- oneOf,
-} from 'validated/schema';
-import { validate } from 'validated/object';
-
-import type { Node as SchemaNode } from 'validated/schema';
-
-export type AccountData = { expiry: string };
-export type AccountToken = string;
-export type Ip = string;
-export type Location = {
- ip: Ip,
- country: string,
- city: ?string,
- latitude: number,
- longitude: number,
- mullvad_exit_ip: boolean,
-};
-const LocationSchema = object({
- ip: string,
- country: string,
- city: maybe(string),
- latitude: number,
- longitude: number,
- mullvad_exit_ip: boolean,
-});
-
-export type SecurityState = 'secured' | 'unsecured';
-export type BackendState = {
- state: SecurityState,
- target_state: SecurityState,
-};
-
-export type RelayProtocol = 'tcp' | 'udp';
-export type RelayLocation = {| city: [string, string] |} | {| country: string |};
-
-type OpenVpnParameters = {
- port: 'any' | { only: number },
- protocol: 'any' | { only: RelayProtocol },
-};
-
-type TunnelOptions<TOpenVpnParameters> = {
- openvpn: TOpenVpnParameters,
-};
-
-type RelaySettingsNormal<TTunnelOptions> = {
- location:
- | 'any'
- | {
- only: RelayLocation,
- },
- tunnel:
- | 'any'
- | {
- only: TTunnelOptions,
- },
-};
-
-// types describing the structure of RelaySettings
-export type RelaySettingsCustom = {
- host: string,
- tunnel: {
- openvpn: {
- port: number,
- protocol: RelayProtocol,
- },
- },
-};
-export type RelaySettings =
- | {|
- normal: RelaySettingsNormal<TunnelOptions<OpenVpnParameters>>,
- |}
- | {|
- custom_tunnel_endpoint: RelaySettingsCustom,
- |};
-
-// types describing the partial update of RelaySettings
-export type RelaySettingsNormalUpdate = $Shape<
- RelaySettingsNormal<TunnelOptions<$Shape<OpenVpnParameters>>>,
->;
-export type RelaySettingsUpdate =
- | {|
- normal: RelaySettingsNormalUpdate,
- |}
- | {|
- custom_tunnel_endpoint: RelaySettingsCustom,
- |};
-
-const constraint = <T>(constraintValue: SchemaNode<T>) =>
- oneOf(
- string,
- object({
- only: constraintValue,
- }),
- );
-
-const RelaySettingsSchema = oneOf(
- object({
- normal: object({
- location: constraint(
- oneOf(
- object({
- city: arrayOf(string),
- }),
- object({
- country: string,
- }),
- ),
- ),
- tunnel: constraint(
- object({
- openvpn: object({
- port: constraint(number),
- protocol: constraint(enumeration('udp', 'tcp')),
- }),
- }),
- ),
- }),
- }),
- object({
- custom_tunnel_endpoint: object({
- host: string,
- tunnel: object({
- openvpn: object({
- port: number,
- protocol: enumeration('udp', 'tcp'),
- }),
- }),
- }),
- }),
-);
-
-export type RelayList = {
- countries: Array<RelayListCountry>,
-};
-
-export type RelayListCountry = {
- name: string,
- code: string,
- cities: Array<RelayListCity>,
-};
-
-export type RelayListCity = {
- name: string,
- code: string,
- latitude: number,
- longitude: number,
- has_active_relays: boolean,
-};
-
-const RelayListSchema = object({
- countries: arrayOf(
- object({
- name: string,
- code: string,
- cities: arrayOf(
- object({
- name: string,
- code: string,
- latitude: number,
- longitude: number,
- has_active_relays: boolean,
- }),
- ),
- }),
- ),
-});
-
-export interface IpcFacade {
- setConnectionString(string): void;
- getAccountData(AccountToken): Promise<AccountData>;
- getRelayLocations(): Promise<RelayList>;
- getAccount(): Promise<?AccountToken>;
- setAccount(accountToken: ?AccountToken): Promise<void>;
- updateRelaySettings(RelaySettingsUpdate): Promise<void>;
- getRelaySettings(): Promise<RelaySettings>;
- setAllowLan(boolean): Promise<void>;
- getAllowLan(): Promise<boolean>;
- connect(): Promise<void>;
- disconnect(): Promise<void>;
- getLocation(): Promise<Location>;
- getState(): Promise<BackendState>;
- registerStateListener((BackendState) => void): void;
- setCloseConnectionHandler(() => void): void;
- authenticate(sharedSecret: string): Promise<void>;
- getAccountHistory(): Promise<Array<AccountToken>>;
- removeAccountFromHistory(accountToken: AccountToken): Promise<void>;
-}
-
-export class RealIpc implements IpcFacade {
- _ipc: JsonRpcWs;
-
- constructor(connectionString: string) {
- this._ipc = new JsonRpcWs(connectionString);
- }
-
- setConnectionString(str: string) {
- this._ipc.setConnectionString(str);
- }
-
- getAccountData(accountToken: AccountToken): Promise<AccountData> {
- // send the IPC with 30s timeout since the backend will wait
- // for a HTTP request before replying
-
- return this._ipc.send('get_account_data', accountToken, 30000).then((raw) => {
- if (typeof raw === 'object' && raw && raw.expiry) {
- return raw;
- } else {
- throw new InvalidReply(raw, 'Expected an object with expiry');
- }
- });
- }
-
- async getRelayLocations(): Promise<RelayList> {
- const raw = await this._ipc.send('get_relay_locations');
- try {
- const validated: any = validate(RelayListSchema, raw);
- return (validated: RelayList);
- } catch (e) {
- throw new InvalidReply(raw, e);
- }
- }
-
- getAccount(): Promise<?AccountToken> {
- return this._ipc.send('get_account').then((raw) => {
- if (raw === undefined || raw === null || typeof raw === 'string') {
- return raw;
- } else {
- throw new InvalidReply(raw);
- }
- });
- }
-
- setAccount(accountToken: ?AccountToken): Promise<void> {
- return this._ipc.send('set_account', accountToken).then(this._ignoreResponse);
- }
-
- _ignoreResponse(_response: mixed): void {
- return;
- }
-
- updateRelaySettings(relaySettings: RelaySettingsUpdate): Promise<void> {
- return this._ipc.send('update_relay_settings', [relaySettings]).then(this._ignoreResponse);
- }
-
- getRelaySettings(): Promise<RelaySettings> {
- return this._ipc.send('get_relay_settings').then((raw) => {
- try {
- const validated: any = validate(RelaySettingsSchema, raw);
- return (validated: RelaySettings);
- } catch (e) {
- throw new InvalidReply(raw, e);
- }
- });
- }
-
- setAllowLan(allowLan: boolean): Promise<void> {
- return this._ipc.send('set_allow_lan', [allowLan]).then(this._ignoreResponse);
- }
-
- async getAllowLan(): Promise<boolean> {
- const raw = await this._ipc.send('get_allow_lan');
- if (typeof raw === 'boolean') {
- return raw;
- } else {
- throw new InvalidReply(raw, 'Expected a boolean');
- }
- }
-
- connect(): Promise<void> {
- return this._ipc.send('connect').then(this._ignoreResponse);
- }
-
- disconnect(): Promise<void> {
- return this._ipc.send('disconnect').then(this._ignoreResponse);
- }
-
- getLocation(): Promise<Location> {
- // send the IPC with 30s timeout since the backend will wait
- // for a HTTP request before replying
-
- return this._ipc.send('get_current_location', [], 30000).then((raw) => {
- try {
- const validated: any = validate(LocationSchema, raw);
- return (validated: Location);
- } catch (e) {
- throw new InvalidReply(raw, e);
- }
- });
- }
-
- getState(): Promise<BackendState> {
- return this._ipc.send('get_state').then((raw) => {
- return this._parseBackendState(raw);
- });
- }
-
- _parseBackendState(raw: mixed): BackendState {
- if (raw && raw.state && raw.target_state) {
- const uncheckedRaw: any = raw;
-
- const states: Array<SecurityState> = ['secured', 'unsecured'];
- const correctState = states.includes(uncheckedRaw.state);
- const correctTargetState = states.includes(uncheckedRaw.target_state);
-
- if (!correctState || !correctTargetState) {
- throw new InvalidReply(raw);
- }
-
- return (uncheckedRaw: BackendState);
- } else {
- throw new InvalidReply(raw);
- }
- }
-
- registerStateListener(listener: (BackendState) => void) {
- this._ipc.on('new_state', (rawEvent) => {
- const parsedEvent: BackendState = this._parseBackendState(rawEvent);
-
- listener(parsedEvent);
- });
- }
-
- setCloseConnectionHandler(handler: () => void) {
- this._ipc.setCloseConnectionHandler(handler);
- }
-
- authenticate(sharedSecret: string): Promise<void> {
- return this._ipc.send('auth', sharedSecret).then(this._ignoreResponse);
- }
-
- getAccountHistory(): Promise<Array<AccountToken>> {
- return this._ipc.send('get_account_history').then((raw) => {
- if (Array.isArray(raw) && raw.every((i) => typeof i === 'string')) {
- const checked: any = raw;
- return (checked: Array<AccountToken>);
- } else {
- throw new InvalidReply(raw, 'Expected an array of strings');
- }
- });
- }
-
- removeAccountFromHistory(accountToken: AccountToken): Promise<void> {
- return this._ipc.send('remove_account_from_history', accountToken).then(this._ignoreResponse);
- }
-}
diff --git a/app/lib/jsonrpc-transport.js b/app/lib/jsonrpc-transport.js
new file mode 100644
index 0000000000..2fa98df988
--- /dev/null
+++ b/app/lib/jsonrpc-transport.js
@@ -0,0 +1,326 @@
+// @flow
+
+import { EventEmitter } from 'events';
+import jsonrpc from 'jsonrpc-lite';
+import uuid from 'uuid';
+import { log } from '../lib/platform';
+
+export type UnansweredRequest = {
+ resolve: (mixed) => void,
+ reject: (mixed) => void,
+ timerId: TimeoutID,
+ message: Object,
+};
+
+export type JsonRpcErrorResponse = {
+ type: 'error',
+ payload: {
+ id: string,
+ error: {
+ code: number,
+ message: string,
+ },
+ },
+};
+export type JsonRpcNotification = {
+ type: 'notification',
+ payload: {
+ method: string,
+ params: {
+ subscription: string,
+ result: mixed,
+ },
+ },
+};
+export type JsonRpcSuccess = {
+ type: 'success',
+ payload: {
+ id: string,
+ result: mixed,
+ },
+};
+export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess;
+
+export class RemoteError extends Error {
+ _code: number;
+ _details: string;
+
+ constructor(code: number, details: string) {
+ super(`Remote JSON-RPC error ${code}: ${details}`);
+ this._code = code;
+ this._details = details;
+ }
+
+ get code(): number {
+ return this._code;
+ }
+
+ get details(): string {
+ return this._details;
+ }
+}
+
+export class TimeOutError extends Error {
+ _jsonRpcMessage: Object;
+
+ constructor(jsonRpcMessage: Object) {
+ super('Request timed out');
+
+ this._jsonRpcMessage = jsonRpcMessage;
+ }
+
+ get jsonRpcMessage(): Object {
+ return this._jsonRpcMessage;
+ }
+}
+
+export class SubscriptionError extends Error {
+ _reply: mixed;
+
+ constructor(message: string, reply: mixed) {
+ const replyString = JSON.stringify(reply);
+
+ super(`${message}: ${replyString}`);
+
+ this._reply = reply;
+ }
+
+ get reply(): mixed {
+ return this._reply;
+ }
+}
+
+export class ConnectionError extends Error {
+ _code: number;
+
+ constructor(code: number) {
+ super(ConnectionError.reason(code));
+ this._code = code;
+ }
+
+ get code(): number {
+ return this._code;
+ }
+
+ static reason(code: number): string {
+ switch (code) {
+ case 1006:
+ return 'Abnormal closure';
+ case 1011:
+ return 'Internal error';
+ case 1012:
+ return 'Service restart';
+ case 1014:
+ return 'Bad gateway';
+ default:
+ return `Unknown (${code})`;
+ }
+ }
+}
+
+const DEFAULT_TIMEOUT_MILLIS = 5000;
+
+export default class JsonRpcTransport extends EventEmitter {
+ _unansweredRequests: Map<string, UnansweredRequest> = new Map();
+ _subscriptions: Map<string | number, (mixed) => void> = new Map();
+ _websocket: ?WebSocket;
+ _websocketFactory: (string) => WebSocket;
+
+ constructor(websocketFactory: ?(string) => WebSocket) {
+ super();
+ this._websocketFactory =
+ websocketFactory || ((connectionString) => new WebSocket(connectionString));
+ }
+
+ /// Connect websocket
+ connect(connectionString: string) {
+ this.disconnect();
+
+ log.info('Connecting to websocket', connectionString);
+
+ const websocket = this._websocketFactory(connectionString);
+
+ websocket.onopen = () => {
+ log.info('Websocket is connected');
+ this.emit('open');
+ };
+
+ websocket.onmessage = (event) => {
+ const data = event.data;
+ if (typeof data === 'string') {
+ this._onMessage(data);
+ } else {
+ log.error('Got invalid reply from the server', event);
+ }
+ };
+
+ websocket.onclose = (event) => {
+ log.info(`The websocket connection closed with code: ${event.code}`);
+
+ // Remove all subscriptions since they are connection based
+ this._subscriptions.clear();
+
+ // 1000 is a code used for normal connection closure.
+ const connectionError = event.code === 1000 ? null : new ConnectionError(event.code);
+
+ this.emit('close', connectionError);
+ };
+
+ this._websocket = websocket;
+ }
+
+ disconnect() {
+ if (this._websocket) {
+ this._websocket.close();
+ this._websocket = null;
+ }
+ }
+
+ async subscribe(event: string, listener: (mixed) => void): Promise<*> {
+ log.silly(`Adding a listener to ${event}`);
+
+ try {
+ const subscriptionId = await this.send(`${event}_subscribe`);
+ if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') {
+ this._subscriptions.set(subscriptionId, listener);
+ } else {
+ throw new SubscriptionError(
+ 'The subscription id was not a string or a number',
+ subscriptionId,
+ );
+ }
+ } catch (e) {
+ log.error(`Failed adding listener to ${event}: ${e.message}`);
+ throw e;
+ }
+ }
+
+ async send(
+ action: string,
+ data: mixed,
+ timeout: number = DEFAULT_TIMEOUT_MILLIS,
+ ): Promise<mixed> {
+ let socket: WebSocket;
+ try {
+ socket = await this._getWebSocket();
+ } catch (error) {
+ throw error;
+ }
+
+ return new Promise((resolve, reject) => {
+ const id = uuid.v4();
+ const payload = this._prepareParams(data);
+ const timerId = setTimeout(() => this._onTimeout(id), timeout);
+ const message = jsonrpc.request(id, action, payload);
+ this._unansweredRequests.set(id, {
+ resolve,
+ reject,
+ timerId,
+ message,
+ });
+
+ try {
+ log.silly('Sending message', id, action);
+ socket.send(JSON.stringify(message));
+ } catch (error) {
+ log.error(`Failed sending RPC message "${action}": ${error.message}`);
+ throw error;
+ }
+ });
+ }
+
+ _prepareParams(data: mixed): Array<mixed> | Object {
+ // JSONRPC only accepts arrays and objects as params, but
+ // this isn't very nice to use, so this method wraps other
+ // types in an array. The choice of array is based on try-and-error
+
+ if (data === undefined) {
+ return [];
+ } else if (data === null) {
+ return [null];
+ } else if (Array.isArray(data) || typeof data === 'object') {
+ return data;
+ } else {
+ return [data];
+ }
+ }
+
+ _getWebSocket(): Promise<WebSocket> {
+ if (this._websocket && this._websocket.readyState === 1) {
+ return Promise.resolve(this._websocket);
+ } else {
+ return new Promise((resolve, reject) => {
+ log.debug('Waiting for websocket to connect');
+
+ this.once('open', () => {
+ const ws = this._websocket;
+ if (ws) {
+ resolve(ws);
+ } else {
+ reject(new Error('Internal error'));
+ }
+ });
+ });
+ }
+ }
+
+ _onTimeout(requestId) {
+ const request = this._unansweredRequests.get(requestId);
+
+ this._unansweredRequests.delete(requestId);
+
+ if (request) {
+ log.warn(`Request ${requestId} timed out: `, request.message);
+ request.reject(new TimeOutError(request.message));
+ } else {
+ log.warn(`Request ${requestId} timed out but it seems to already have been answered`);
+ }
+ }
+
+ _onMessage(message: string) {
+ const result = jsonrpc.parse(message);
+ const messages = Array.isArray(result) ? result : [result];
+
+ for (const message of messages) {
+ if (message.type === 'notification') {
+ this._onNotification(message);
+ } else {
+ this._onReply(message);
+ }
+ }
+ }
+
+ _onNotification(message: JsonRpcNotification) {
+ const subscriptionId = message.payload.params.subscription;
+ const listener = this._subscriptions.get(subscriptionId);
+
+ if (listener) {
+ log.silly('Got notification', message.payload.method, message.payload.params.result);
+ listener(message.payload.params.result);
+ } else {
+ log.warn('Got notification for', message.payload.method, 'but no one is listening for it');
+ }
+ }
+
+ _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) {
+ const id = message.payload.id;
+ const request = this._unansweredRequests.get(id);
+ this._unansweredRequests.delete(id);
+
+ if (request) {
+ log.silly('Got answer to', id, message.type);
+
+ clearTimeout(request.timerId);
+
+ if (message.type === 'error') {
+ const error = message.payload.error;
+ request.reject(new RemoteError(error.code, error.message));
+ } else {
+ const reply = message.payload.result;
+ request.resolve(reply);
+ }
+ } else {
+ log.warn(`Got reply to ${id} but no one was waiting for it`);
+ }
+ }
+}
diff --git a/app/lib/jsonrpc-ws-ipc.js b/app/lib/jsonrpc-ws-ipc.js
deleted file mode 100644
index 84dec96c77..0000000000
--- a/app/lib/jsonrpc-ws-ipc.js
+++ /dev/null
@@ -1,312 +0,0 @@
-// @flow
-
-import jsonrpc from 'jsonrpc-lite';
-import uuid from 'uuid';
-import { log } from '../lib/platform';
-
-export type UnansweredRequest = {
- resolve: (mixed) => void,
- reject: (mixed) => void,
- timerId: TimeoutID,
- message: Object,
-};
-
-export type JsonRpcErrorResponse = {
- type: 'error',
- payload: {
- id: string,
- error: {
- code: number,
- message: string,
- },
- },
-};
-export type JsonRpcNotification = {
- type: 'notification',
- payload: {
- method: string,
- params: {
- subscription: string,
- result: mixed,
- },
- },
-};
-export type JsonRpcSuccess = {
- type: 'success',
- payload: {
- id: string,
- result: mixed,
- },
-};
-export type JsonRpcMessage = JsonRpcErrorResponse | JsonRpcNotification | JsonRpcSuccess;
-
-export class JsonRpcError extends Error {
- _code: number;
- _details: string;
-
- constructor(code: number, details: string) {
- super(`Remote JSON-RPC error ${code}: ${details}`);
- this._code = code;
- this._details = details;
- }
-
- get code(): number {
- return this._code;
- }
-
- get details(): string {
- return this._details;
- }
-}
-
-export class TimeOutError extends Error {
- jsonRpcMessage: Object;
-
- constructor(jsonRpcMessage: Object) {
- super('Request timed out');
- this.name = 'TimeOutError';
- this.jsonRpcMessage = jsonRpcMessage;
- }
-}
-
-export class InvalidReply extends Error {
- reply: mixed;
-
- constructor(reply: mixed, msg: ?string) {
- super(msg);
- this.name = 'InvalidReply';
- this.reply = reply;
-
- if (msg) {
- this.message = msg + ' - ';
- }
- this.message += JSON.stringify(reply);
- }
-}
-
-const DEFAULT_TIMEOUT_MILLIS = 5000;
-
-export default class Ipc {
- _connectionString: ?string;
- _onConnect: Array<{ resolve: () => void }>;
- _unansweredRequests: Map<string, UnansweredRequest>;
- _subscriptions: Map<string | number, (mixed) => void>;
- _websocket: WebSocket;
- _backoff: ReconnectionBackoff;
- _websocketFactory: (string) => WebSocket;
- _closeConnectionHandler: ?() => void;
-
- constructor(connectionString: string, websocketFactory: ?(string) => WebSocket) {
- this._connectionString = connectionString;
- this._onConnect = [];
- this._unansweredRequests = new Map();
- this._subscriptions = new Map();
- this._websocketFactory =
- websocketFactory || ((connectionString) => new WebSocket(connectionString));
-
- this._backoff = new ReconnectionBackoff();
- this._reconnect();
- }
-
- setConnectionString(str: string) {
- this._connectionString = str;
- }
-
- setCloseConnectionHandler(handler: ?() => void) {
- this._closeConnectionHandler = handler;
- }
-
- async on(event: string, listener: (mixed) => void): Promise<*> {
- log.silly(`Adding a listener to ${event}`);
- try {
- const subscriptionId = await this.send(`${event}_subscribe`);
- if (typeof subscriptionId === 'string' || typeof subscriptionId === 'number') {
- this._subscriptions.set(subscriptionId, listener);
- } else {
- throw new InvalidReply(subscriptionId, 'The subscription id was not a string or a number');
- }
- } catch (e) {
- log.error(`Failed adding listener to ${event}: ${e.message}`);
- }
- }
-
- send(action: string, data: mixed, timeout: number = DEFAULT_TIMEOUT_MILLIS): Promise<mixed> {
- return new Promise(async (resolve, reject) => {
- const id = uuid.v4();
-
- const params = this._prepareParams(data);
- const timerId = setTimeout(() => this._onTimeout(id), timeout);
- const jsonrpcMessage = jsonrpc.request(id, action, params);
- this._unansweredRequests.set(id, {
- resolve: resolve,
- reject: reject,
- timerId: timerId,
- message: jsonrpcMessage,
- });
-
- try {
- const ws = await this._getWebSocket();
- log.silly('Sending message', id, action);
- ws.send(jsonrpcMessage);
- } catch (e) {
- log.error(`Failed sending RPC message "${action}": ${e.message}`);
- reject(e);
- }
- });
- }
-
- _prepareParams(data: mixed): Array<mixed> | Object {
- // JSONRPC only accepts arrays and objects as params, but
- // this isn't very nice to use, so this method wraps other
- // types in an array. The choice of array is based on try-and-error
-
- if (data === undefined) {
- return [];
- } else if (data === null) {
- return [null];
- } else if (Array.isArray(data) || typeof data === 'object') {
- return data;
- } else {
- return [data];
- }
- }
-
- _getWebSocket(): Promise<WebSocket> {
- return new Promise((resolve) => {
- if (this._websocket && this._websocket.readyState === 1) {
- resolve(this._websocket);
- } else {
- log.debug('Waiting for websocket to connect');
- this._onConnect.push({
- resolve: () => resolve(this._websocket),
- });
- }
- });
- }
-
- _onTimeout(requestId) {
- const request = this._unansweredRequests.get(requestId);
- this._unansweredRequests.delete(requestId);
-
- if (!request) {
- log.warn(requestId, 'timed out but it seems to already have been answered');
- return;
- }
-
- log.warn(request.message, 'timed out');
- request.reject(new TimeOutError(request.message));
- }
-
- _onMessage(message: string) {
- const json = JSON.parse(message);
- const c = jsonrpc.parseObject(json);
-
- if (c.type === 'notification') {
- this._onNotification(c);
- } else {
- this._onReply(c);
- }
- }
-
- _onNotification(message: JsonRpcNotification) {
- const subscriptionId = message.payload.params.subscription;
- const listener = this._subscriptions.get(subscriptionId);
-
- if (listener) {
- log.silly('Got notification', message.payload.method, message.payload.params.result);
- listener(message.payload.params.result);
- } else {
- log.warn('Got notification for', message.payload.method, 'but no one is listening for it');
- }
- }
-
- _onReply(message: JsonRpcErrorResponse | JsonRpcSuccess) {
- const id = message.payload.id;
- const request = this._unansweredRequests.get(id);
- this._unansweredRequests.delete(id);
-
- if (!request) {
- log.warn('Got reply to', id, 'but no one was waiting for it');
- return;
- }
-
- log.silly('Got answer to', id, message.type);
-
- clearTimeout(request.timerId);
-
- if (message.type === 'error') {
- const error = message.payload.error;
- request.reject(new JsonRpcError(error.code, error.message));
- } else {
- const reply = message.payload.result;
- request.resolve(reply);
- }
- }
-
- _reconnect() {
- const connectionString = this._connectionString;
- if (!connectionString) return;
-
- log.info('Connecting to websocket', connectionString);
- this._websocket = this._websocketFactory(connectionString);
-
- this._websocket.onopen = () => {
- log.info('Websocket is connected');
- this._backoff.successfullyConnected();
-
- while (this._onConnect.length > 0) {
- this._onConnect.pop().resolve();
- }
- };
-
- this._websocket.onmessage = (evt) => {
- const data = evt.data;
- if (typeof data === 'string') {
- this._onMessage(data);
- } else {
- log.error('Got invalid reply from the server', evt);
- }
- };
-
- this._websocket.onclose = () => {
- if (this._closeConnectionHandler) {
- this._closeConnectionHandler();
- }
-
- const delay = this._backoff.getIncreasedBackoff();
- log.warn(
- 'The websocket connetion closed, attempting to reconnect it in',
- delay,
- 'milliseconds',
- );
- setTimeout(() => this._reconnect(), delay);
- };
- }
-}
-
-/*
- * Used to calculate the time to wait before reconnecting
- * the websocket.
- *
- * It uses a linear backoff function that goes from 500ms
- * to 3000ms
- */
-class ReconnectionBackoff {
- _attempt: number;
-
- constructor() {
- this._attempt = 0;
- }
-
- successfullyConnected() {
- this._attempt = 0;
- }
-
- getIncreasedBackoff() {
- if (this._attempt < 6) {
- this._attempt++;
- }
-
- return this._attempt * 500;
- }
-}
diff --git a/app/lib/relay-settings-builder.js b/app/lib/relay-settings-builder.js
index 0386d1ef4f..5f6c279e3f 100644
--- a/app/lib/relay-settings-builder.js
+++ b/app/lib/relay-settings-builder.js
@@ -6,7 +6,7 @@ import type {
RelaySettingsUpdate,
RelaySettingsNormalUpdate,
RelaySettingsCustom,
-} from './ipc-facade';
+} from './daemon-rpc';
type LocationBuilder<Self> = {
country: (country: string) => Self,
diff --git a/app/lib/rpc-address-file.js b/app/lib/rpc-address-file.js
index 5185ddc749..6eec716591 100644
--- a/app/lib/rpc-address-file.js
+++ b/app/lib/rpc-address-file.js
@@ -29,7 +29,7 @@ export class RpcAddressFile {
return this._filePath;
}
- poll(): Promise<void> {
+ waitUntilExists(): Promise<void> {
let promise = this._pollPromise;
if (!promise) {
@@ -52,20 +52,7 @@ export class RpcAddressFile {
return promise;
}
- isTrusted() {
- const filePath = this._filePath;
- switch (process.platform) {
- case 'win32':
- return isOwnedByLocalSystem(filePath);
- case 'darwin':
- case 'linux':
- return isOwnedAndOnlyWritableByRoot(filePath);
- default:
- throw new Error(`Unknown platform: ${process.platform}`);
- }
- }
-
- async waitUntilExists(): Promise<RpcCredentials> {
+ async parse(): Promise<RpcCredentials> {
const data = await fsReadFileAsync(this._filePath, 'utf8');
const [connectionString, sharedSecret] = data.split('\n', 2);
@@ -78,6 +65,19 @@ export class RpcAddressFile {
throw new Error('Cannot parse the RPC address file');
}
}
+
+ isTrusted() {
+ const filePath = this._filePath;
+ switch (process.platform) {
+ case 'win32':
+ return isOwnedByLocalSystem(filePath);
+ case 'darwin':
+ case 'linux':
+ return isOwnedAndOnlyWritableByRoot(filePath);
+ default:
+ throw new Error(`Unknown platform: ${process.platform}`);
+ }
+ }
}
function getRpcAddressFilePath() {
diff --git a/app/main.js b/app/main.js
index 3a10f6808c..ee685e819c 100644
--- a/app/main.js
+++ b/app/main.js
@@ -163,7 +163,7 @@ const ApplicationMain = {
ipcMain.on('daemon-connection', async (event) => {
const addressFile = new RpcAddressFile();
- log.debug(`Reading the RPC connection info from "${addressFile.filePath}"`);
+ log.debug(`Waiting for RPC address file: "${addressFile.filePath}"`);
try {
await addressFile.waitUntilExists();
@@ -192,7 +192,7 @@ const ApplicationMain = {
log.debug('Read RPC connection info', credentials.connectionString);
- event.sender.send('daemon-connection', { credentials });
+ event.sender.send('daemon-connection', credentials);
} catch (error) {
log.error(`Cannot parse the RPC address file: ${error.message}`);
return;
diff --git a/app/redux/account/actions.js b/app/redux/account/actions.js
index c778b111fd..df25c1e823 100644
--- a/app/redux/account/actions.js
+++ b/app/redux/account/actions.js
@@ -1,6 +1,6 @@
// @flow
-import type { AccountToken } from '../../lib/ipc-facade';
+import type { AccountToken } from '../../lib/daemon-rpc';
import type { Backend } from '../../lib/backend';
type StartLoginAction = {
diff --git a/app/redux/account/reducers.js b/app/redux/account/reducers.js
index 094864c9a8..dbd854a6c0 100644
--- a/app/redux/account/reducers.js
+++ b/app/redux/account/reducers.js
@@ -1,7 +1,7 @@
// @flow
import type { ReduxAction } from '../store';
-import type { AccountToken } from '../../lib/ipc-facade';
+import type { AccountToken } from '../../lib/daemon-rpc';
export type LoginState = 'none' | 'logging in' | 'failed' | 'ok';
export type AccountReduxState = {
diff --git a/app/redux/connection/actions.js b/app/redux/connection/actions.js
index b4f1df09be..e16a50e47f 100644
--- a/app/redux/connection/actions.js
+++ b/app/redux/connection/actions.js
@@ -4,10 +4,10 @@ import { Clipboard } from 'reactxp';
import type { Backend } from '../../lib/backend';
import type { ReduxThunk } from '../store';
-import type { Ip } from '../../lib/ipc-facade';
+import type { Ip } from '../../lib/daemon-rpc';
-const connect = (backend: Backend): ReduxThunk => () => backend.connect();
-const disconnect = (backend: Backend) => () => backend.disconnect();
+const connect = (backend: Backend): ReduxThunk => () => backend.connectTunnel();
+const disconnect = (backend: Backend) => () => backend.disconnectTunnel();
const copyIPAddress = (): ReduxThunk => {
return (_, getState) => {
const ip = getState().connection.ip;
diff --git a/app/redux/connection/reducers.js b/app/redux/connection/reducers.js
index a50fbf0757..c75d0f2a8f 100644
--- a/app/redux/connection/reducers.js
+++ b/app/redux/connection/reducers.js
@@ -1,7 +1,7 @@
// @flow
import type { ReduxAction } from '../store';
-import type { Ip } from '../../lib/ipc-facade';
+import type { Ip } from '../../lib/daemon-rpc';
export type ConnectionState = 'disconnected' | 'connecting' | 'connected';
export type ConnectionReduxState = {
diff --git a/app/redux/settings/reducers.js b/app/redux/settings/reducers.js
index f559c1f725..4a25bf9502 100644
--- a/app/redux/settings/reducers.js
+++ b/app/redux/settings/reducers.js
@@ -1,7 +1,7 @@
// @flow
import type { ReduxAction } from '../store';
-import type { RelayProtocol, RelayLocation } from '../../lib/ipc-facade';
+import type { RelayProtocol, RelayLocation } from '../../lib/daemon-rpc';
export type RelaySettingsRedux =
| {|
diff --git a/package.json b/package.json
index 3c2a6182de..75e9382085 100644
--- a/package.json
+++ b/package.json
@@ -61,6 +61,7 @@
"eslint-plugin-react": "^7.9.1",
"flow-bin": "^0.66.0",
"flow-typed": "^2.4.0",
+ "mock-socket": "^7.1.0",
"npm-run-all": "^4.0.1",
"prettier": "1.13.3",
"redux-mock-store": "^1.3.0",
diff --git a/test/auth.spec.js b/test/auth.spec.js
index e6b2c25486..4f919dff87 100644
--- a/test/auth.spec.js
+++ b/test/auth.spec.js
@@ -8,30 +8,34 @@ describe('authentication', () => {
it('authenticates before ipc call if unauthenticated', (done) => {
const { store, mockIpc } = setupIpcAndStore();
+ const credentials = {
+ connectionString: 'ws://localhost:1234/',
+ sharedSecret: '1234',
+ };
+
const chain = new IpcChain(mockIpc);
- chain.onSuccessOrFailure(done);
chain.expect('authenticate').withInputValidation((secret) => {
expect(secret).to.equal(credentials.sharedSecret);
});
chain.expect('connect');
+ chain.end(done);
- const credentials = {
- sharedSecret: '',
- connectionString: '',
- };
- const backend = new Backend(store, credentials, mockIpc);
- backend.connect();
+ const backend = new Backend(store, mockIpc);
+ backend.connect(credentials);
+
+ backend.connectTunnel();
});
it('reauthenticates on reconnect', async () => {
const { mockIpc, backend } = setupBackendAndStore();
mockIpc.authenticate = spy(mockIpc.authenticate);
+ await mockIpc.connectTunnel();
mockIpc.killWebSocket();
expect(mockIpc.authenticate).to.not.have.been.called();
- await backend.connect();
+ await backend.connectTunnel();
expect(mockIpc.authenticate).to.have.been.called.once;
});
});
diff --git a/test/autologin.spec.js b/test/autologin.spec.js
index 60c5cc5bde..505ad285c0 100644
--- a/test/autologin.spec.js
+++ b/test/autologin.spec.js
@@ -16,7 +16,7 @@ describe('autologin', () => {
expect(num).to.equal(randomAccountToken);
});
- chain.onSuccessOrFailure(done);
+ chain.end(done);
backend.autologin();
});
diff --git a/test/connect.spec.js b/test/connect.spec.js
index f1621dbeb6..409a04c0fb 100644
--- a/test/connect.spec.js
+++ b/test/connect.spec.js
@@ -7,7 +7,7 @@ describe('connect', () => {
it("should set the connection state to 'disconnected' on failed attempts", (done) => {
const { store, mockIpc, backend } = setupBackendAndStore();
- mockIpc.connect = () => new Promise((_, reject) => reject('Some error'));
+ mockIpc.connectTunnel = () => new Promise((_, reject) => reject('Some error'));
store.dispatch(connectionActions.connected());
@@ -23,7 +23,7 @@ describe('connect', () => {
it('should update the state with the server address', () => {
const { store, backend } = setupBackendAndStore();
- return backend.connect().then(() => {
+ return backend.connectTunnel().then(() => {
const state = store.getState().connection;
expect(state.status).to.equal('connecting');
});
diff --git a/test/helpers/IpcChain.js b/test/helpers/IpcChain.js
index 1505621cc0..547851fdcf 100644
--- a/test/helpers/IpcChain.js
+++ b/test/helpers/IpcChain.js
@@ -1,7 +1,5 @@
// @flow
-import { check, failFast } from './ipc-helpers';
-
export class IpcChain {
_expectedCalls: Array<string>;
_recordedCalls: Array<string>;
@@ -39,12 +37,11 @@ export class IpcChain {
const inputValidation = step.inputValidation;
if (inputValidation) {
- const failedInputValidation = failFast(() => {
+ try {
inputValidation(...args);
- }, this._done);
-
- if (failedInputValidation) {
+ } catch (error) {
this._abort();
+ this._done(error);
return;
}
}
@@ -65,16 +62,19 @@ export class IpcChain {
}
_ensureChainCalledCorrectly() {
- check(() => {
+ try {
expect(this._expectedCalls).to.deep.equal(this._recordedCalls);
- }, this._done);
+ this._done();
+ } catch (error) {
+ this._done(error);
+ }
}
_registerCall(ipcCall: string) {
this._recordedCalls.push(ipcCall);
}
- onSuccessOrFailure(done: (*) => void) {
+ end(done: (?Error) => void) {
this._done = done;
}
}
diff --git a/test/helpers/ipc-helpers.js b/test/helpers/ipc-helpers.js
index 4466c06eb2..5bfe1d0267 100644
--- a/test/helpers/ipc-helpers.js
+++ b/test/helpers/ipc-helpers.js
@@ -12,7 +12,6 @@ type Check = () => void;
export function setupIpcAndStore() {
const memoryHistory = createMemoryHistory();
const store = configureStore(null, memoryHistory);
-
const mockIpc = newMockIpc();
return { store, mockIpc };
@@ -20,12 +19,7 @@ export function setupIpcAndStore() {
export function setupBackendAndStore() {
const { store, mockIpc } = setupIpcAndStore();
-
- const credentials = {
- sharedSecret: '',
- connectionString: '',
- };
- const backend = new Backend(store, credentials, mockIpc);
+ const backend = new Backend(store, mockIpc);
return { store, mockIpc, backend };
}
@@ -33,11 +27,7 @@ export function setupBackendAndStore() {
export function setupBackendAndMockStore() {
const store = mockStore(_initialState());
const mockIpc = newMockIpc();
- const credentials = {
- sharedSecret: '',
- connectionString: '',
- };
- const backend = new Backend(store, credentials, mockIpc);
+ const backend = new Backend(store, mockIpc);
return { store, mockIpc, backend };
}
diff --git a/test/ipc.spec.js b/test/ipc.spec.js
deleted file mode 100644
index 1c8b19b5a6..0000000000
--- a/test/ipc.spec.js
+++ /dev/null
@@ -1,134 +0,0 @@
-// @flow
-
-import Ipc from '../app/lib/jsonrpc-ws-ipc';
-import jsonrpc from 'jsonrpc-lite';
-import type { JsonRpcMessage } from '../app/lib/jsonrpc-ws-ipc';
-
-describe('The IPC server', () => {
- it('should send as soon as the websocket connects', () => {
- const { ws, ipc } = setupIpc();
- ws.close();
-
- let sent = false;
- const p = ipc.send('hello').then(() => {
- expect(sent).to.be.true;
- });
-
- ws.on('hello', (msg) => {
- sent = true;
-
- ws.replyOk(msg.id);
- });
- ws.acceptConnection();
-
- return p;
- });
-
- it('should reject failed jsonrpc requests', () => {
- const { ws, ipc } = setupIpc();
- ws.on('WHAT_IS_THIS', (msg) => {
- ws.replyFail(msg.id, 'Method not found', -32601);
- });
-
- return ipc.send('WHAT_IS_THIS').catch((e) => {
- expect(e.code).to.equal(-32601);
- expect(e.message).to.contain('Method not found');
- });
- });
-
- it('should route reply to correct promise', () => {
- const { ws, ipc } = setupIpc();
-
- ws.on('a message', (msg) => ws.replyOk(msg.id, 'a reply'));
-
- const decoy = ipc
- .send('a decoy', [], 1)
- .then(() => {
- throw new Error('Should not be called');
- })
- .catch((e) => {
- if (e.name !== 'TimeOutError') {
- throw e;
- }
- });
- const message = ipc.send('a message', [], 1).then((reply) => expect(reply).to.equal('a reply'));
-
- return Promise.all([message, decoy]);
- });
-
- it('should timeout if no response is returned', () => {
- const { ipc } = setupIpc();
-
- return ipc.send('a message', [], 1).catch((e) => {
- expect(e.name).to.equal('TimeOutError');
- expect(e.message).to.contain('timed out');
- });
- });
-
- it('should route notifications', (done) => {
- const { ws, ipc } = setupIpc();
-
- const eventListener = (event) => {
- try {
- expect(event).to.equal('an event!');
- done();
- } catch (ex) {
- done(ex);
- }
- };
-
- ws.on('event_subscribe', (msg) => ws.replyOk(msg.id, 1));
- ipc
- .on('event', eventListener)
- .then(() => {
- ws.reply(jsonrpc.notification('event', { subscription: 1, result: 'an event!' }));
- })
- .catch((e) => done(e));
- });
-});
-
-function mockWebsocket() {
- const ws: any = {
- listeners: {},
- readyState: 1,
- };
-
- ws.on = (event, listener) => (ws.listeners[event] = listener);
- ws.send = (data) => {
- const listener = ws.listeners[data.method];
- if (listener) {
- listener(data);
- }
- };
-
- ws.factory = () => ws;
-
- ws.acceptConnection = () => {
- ws.readyState = 1;
- ws.onopen();
- };
- ws.close = () => {
- ws.readyState = 3;
- ws.onclose();
- };
-
- ws.reply = (msg: JsonRpcMessage) => {
- ws.onmessage({ data: JSON.stringify(msg) });
- };
- ws.replyOk = (id: string, msg) => {
- ws.reply(jsonrpc.success(id, msg || ''));
- };
- ws.replyFail = (id: string, msg: string, code: number) => {
- ws.reply(jsonrpc.error(id, new jsonrpc.JsonRpcError(msg, code)));
- };
-
- return ws;
-}
-
-function setupIpc() {
- const ws = mockWebsocket();
- return {
- ws: ws,
- ipc: new Ipc('1.2.3.4', ws.factory),
- };
-}
diff --git a/test/jsonrpc-transport.spec.js b/test/jsonrpc-transport.spec.js
new file mode 100644
index 0000000000..124bba8f00
--- /dev/null
+++ b/test/jsonrpc-transport.spec.js
@@ -0,0 +1,144 @@
+// @flow
+
+import JsonRpcTransport, {
+ TimeOutError as JsonRpcTransportTimeOutError,
+} from '../app/lib/jsonrpc-transport';
+import jsonrpc from 'jsonrpc-lite';
+import { Server, WebSocket as MockWebSocket } from 'mock-socket';
+
+describe('JSON RPC transport', () => {
+ const WEBSOCKET_URL = 'ws://localhost:8080';
+ let server: Server, transport: JsonRpcTransport;
+
+ beforeEach(() => {
+ server = new Server(WEBSOCKET_URL);
+ transport = new JsonRpcTransport((s) => new MockWebSocket(s));
+ });
+
+ afterEach(() => {
+ server.close();
+ });
+
+ it('should send as soon as the websocket connects', (done) => {
+ server.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+
+ if (payload.method === 'hello') {
+ server.send(JSON.stringify(jsonrpc.success(payload.id, 'ok')));
+ }
+ });
+
+ transport
+ .send('hello')
+ .then(() => {
+ done();
+ })
+ .catch((error) => {
+ done(error);
+ });
+
+ transport.connect(WEBSOCKET_URL);
+ });
+
+ it('should reject failed jsonrpc requests', (done) => {
+ server.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+
+ if (payload.method === 'invalid-method') {
+ server.send(
+ JSON.stringify(
+ jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)),
+ ),
+ );
+ }
+ });
+
+ transport.send('invalid-method').catch((error) => {
+ try {
+ expect(error.code).to.equal(-32601);
+ expect(error.message).to.contain('Method not found');
+ done();
+ } catch (error) {
+ done(error);
+ }
+ });
+
+ transport.connect(WEBSOCKET_URL);
+ });
+
+ it('should route reply to correct promise', () => {
+ server.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+
+ if (payload.method === 'a message') {
+ server.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply')));
+ }
+ });
+
+ const decoy = transport
+ .send('a decoy', [], 100)
+ .then(() => {
+ throw new Error('Should not be called');
+ })
+ .catch((error) => {
+ expect(error).to.be.an.instanceof(JsonRpcTransportTimeOutError);
+ });
+
+ const message = transport.send('a message', [], 100).then((reply) => {
+ expect(reply).to.equal('a reply');
+ });
+
+ transport.connect(WEBSOCKET_URL);
+
+ return Promise.all([message, decoy]);
+ });
+
+ it('should timeout if no response is returned', (done) => {
+ transport
+ .send('timeout-message', {}, 1)
+ .then(() => {
+ done(new Error('Should not be called'));
+ })
+ .catch((error) => {
+ try {
+ expect(error).to.be.an.instanceof(JsonRpcTransportTimeOutError);
+ expect(error.message).to.contain('Request timed out');
+ done();
+ } catch (error) {
+ done(error);
+ }
+ });
+
+ transport.connect(WEBSOCKET_URL);
+ });
+
+ it('should route notifications', (done) => {
+ server.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+
+ if (payload.method === 'event_subscribe') {
+ server.send(JSON.stringify(jsonrpc.success(payload.id, 1)));
+ }
+ });
+
+ transport
+ .subscribe('event', (event) => {
+ try {
+ expect(event).to.equal('an event!');
+ done();
+ } catch (error) {
+ done(error);
+ }
+ })
+ .then(() => {
+ server.send(
+ JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'an event!' })),
+ );
+ })
+ .catch((error) => {
+ done(error);
+ });
+
+ transport.connect(WEBSOCKET_URL);
+ });
+});
diff --git a/test/login.spec.js b/test/login.spec.js
index 68db14494b..a2390b5241 100644
--- a/test/login.spec.js
+++ b/test/login.spec.js
@@ -24,7 +24,7 @@ describe('Logging in', () => {
expect(an).to.equal('123');
});
- chain.onSuccessOrFailure(done);
+ chain.end(done);
store.dispatch(accountActions.login(backend, '123'));
});
diff --git a/test/logout.spec.js b/test/logout.spec.js
index 65bc3aba09..c973c6a81a 100644
--- a/test/logout.spec.js
+++ b/test/logout.spec.js
@@ -19,7 +19,7 @@ describe('logging out', () => {
expect(num).to.be.null;
});
chain.expect('disconnect');
- chain.onSuccessOrFailure(done);
+ chain.end(done);
backend.logout();
});
diff --git a/test/mocks/ipc.js b/test/mocks/ipc.js
index 590313ec13..b610244462 100644
--- a/test/mocks/ipc.js
+++ b/test/mocks/ipc.js
@@ -1,39 +1,39 @@
// @flow
-import type { IpcFacade, AccountToken, AccountData, BackendState } from '../../app/lib/ipc-facade';
+import type {
+ DaemonRpcProtocol,
+ AccountToken,
+ AccountData,
+ BackendState,
+} from '../../app/lib/daemon-rpc';
interface MockIpc {
sendNewState: (BackendState) => void;
killWebSocket: () => void;
-getAccountData: (AccountToken) => Promise<AccountData>;
- -connect: () => Promise<void>;
+ -connectTunnel: () => Promise<void>;
-getAccount: () => Promise<?AccountToken>;
-authenticate: (string) => Promise<void>;
}
export function newMockIpc() {
const stateListeners = [];
- const connectionCloseListeners = [];
+ let connectionOpenListener: ?() => void;
+ let connectionCloseListener: ?(error: ?Error) => void;
- const mockIpc: IpcFacade & MockIpc = {
+ const mockIpc: DaemonRpcProtocol & MockIpc = {
setConnectionString: (_str: string) => {},
-
getAccountData: (accountToken) =>
Promise.resolve({
accountToken: accountToken,
expiry: '',
}),
-
getRelayLocations: () =>
Promise.resolve({
countries: [],
}),
-
getAccount: () => Promise.resolve('1111'),
-
setAccount: () => Promise.resolve(),
-
updateRelaySettings: () => Promise.resolve(),
-
getRelaySettings: () =>
Promise.resolve({
custom_tunnel_endpoint: {
@@ -46,17 +46,16 @@ export function newMockIpc() {
},
},
}),
-
setAllowLan: (_allowLan: boolean) => Promise.resolve(),
-
getAllowLan: () => Promise.resolve(true),
-
- connect: () => Promise.resolve(),
-
- disconnect: () => Promise.resolve(),
-
- shutdown: () => Promise.resolve(),
-
+ connect: () => {
+ if (connectionOpenListener) {
+ connectionOpenListener();
+ }
+ },
+ disconnect: () => {},
+ connectTunnel: () => Promise.resolve(),
+ disconnectTunnel: () => Promise.resolve(),
getLocation: () =>
Promise.resolve({
ip: '',
@@ -66,36 +65,33 @@ export function newMockIpc() {
longitude: 0.0,
mullvad_exit_ip: false,
}),
-
getState: () =>
Promise.resolve({
state: 'unsecured',
target_state: 'unsecured',
}),
-
- registerStateListener: (listener: (BackendState) => void) => {
+ subscribeStateListener: (listener: (state: ?BackendState, error: ?Error) => void) => {
stateListeners.push(listener);
+ return Promise.resolve();
},
-
sendNewState: (state: BackendState) => {
for (const listener of stateListeners) {
listener(state);
}
},
-
- setCloseConnectionHandler: (listener: () => void) => {
- connectionCloseListeners.push(listener);
+ addOpenConnectionObserver: (listener: () => void) => {
+ connectionOpenListener = listener;
+ },
+ addCloseConnectionObserver: (listener: (error: ?Error) => void) => {
+ connectionCloseListener = listener;
},
-
authenticate: (_secret: string) => Promise.resolve(),
-
getAccountHistory: () => Promise.resolve([]),
-
removeAccountFromHistory: (_accountToken) => Promise.resolve(),
killWebSocket: () => {
- for (const listener of connectionCloseListeners) {
- listener();
+ if (connectionCloseListener) {
+ connectionCloseListener();
}
},
};
diff --git a/yarn.lock b/yarn.lock
index e489cb06c4..9973ea0614 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -1822,7 +1822,7 @@ commander@2:
version "2.14.1"
resolved "https://registry.yarnpkg.com/commander/-/commander-2.14.1.tgz#2235123e37af8ca3c65df45b026dbd357b01b9aa"
-commander@2.15.1, commander@^2.15.1:
+commander@2.15.1, commander@^2.15.1, commander@^2.9.0:
version "2.15.1"
resolved "https://registry.yarnpkg.com/commander/-/commander-2.15.1.tgz#df46e867d0fc2aec66a34662b406a9ccafff5b0f"
@@ -1830,7 +1830,7 @@ commander@^2.11.0:
version "2.12.2"
resolved "https://registry.yarnpkg.com/commander/-/commander-2.12.2.tgz#0f5946c427ed9ec0d91a46bb9def53e54650e555"
-commander@^2.2.0, commander@^2.9.0:
+commander@^2.2.0:
version "2.9.0"
resolved "https://registry.yarnpkg.com/commander/-/commander-2.9.0.tgz#9c99094176e12240cb22d6c5146098400fe0f7d4"
dependencies:
@@ -5265,6 +5265,10 @@ mocha@^5.2.0:
mkdirp "0.5.1"
supports-color "5.4.0"
+mock-socket@^7.1.0:
+ version "7.1.0"
+ resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-7.1.0.tgz#482ecccafb0f0e86b8905ba2aa57c1fe73ba262d"
+
moment@^2.20.1:
version "2.20.1"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.20.1.tgz#d6eb1a46cbcc14a2b2f9434112c1ff8907f313fd"
@@ -7911,8 +7915,8 @@ validate-npm-package-license@^3.0.1:
spdx-expression-parse "^3.0.0"
validated@^1.1.0:
- version "1.1.1"
- resolved "https://registry.yarnpkg.com/validated/-/validated-1.1.1.tgz#176daaf2537d51cee708160238fd774a32b4a20a"
+ version "1.2.0"
+ resolved "https://registry.yarnpkg.com/validated/-/validated-1.2.0.tgz#44ca3791cd5b2fc24433f7d1ae3a8896dfe1aec3"
dependencies:
commander "^2.9.0"
custom-error-instance "^2.1.1"