summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrej Mihajlov <and@mullvad.net>2018-08-06 17:14:10 +0200
committerAndrej Mihajlov <and@mullvad.net>2018-08-08 16:25:34 +0200
commit3e6a4de0a1682bbe51300f4bee4f84f2679a8c25 (patch)
treea2b25c7791df2a241e7430a8bd6fcc977ff61b00
parent9f01cc07d2d6da86d43a002e52a189f60b63eb5d (diff)
downloadmullvadvpn-3e6a4de0a1682bbe51300f4bee4f84f2679a8c25.tar.xz
mullvadvpn-3e6a4de0a1682bbe51300f4bee4f84f2679a8c25.zip
Stop buffering JSON RPC messages when disconnected
-rw-r--r--app/lib/jsonrpc-transport.js117
-rw-r--r--package.json2
-rw-r--r--test/jsonrpc-transport.spec.js123
-rw-r--r--yarn.lock21
4 files changed, 126 insertions, 137 deletions
diff --git a/app/lib/jsonrpc-transport.js b/app/lib/jsonrpc-transport.js
index 2fa98df988..cbf1e7dce8 100644
--- a/app/lib/jsonrpc-transport.js
+++ b/app/lib/jsonrpc-transport.js
@@ -123,7 +123,7 @@ const DEFAULT_TIMEOUT_MILLIS = 5000;
export default class JsonRpcTransport extends EventEmitter {
_unansweredRequests: Map<string, UnansweredRequest> = new Map();
_subscriptions: Map<string | number, (mixed) => void> = new Map();
- _websocket: ?WebSocket;
+ _webSocket: ?WebSocket;
_websocketFactory: (string) => WebSocket;
constructor(websocketFactory: ?(string) => WebSocket) {
@@ -133,46 +133,60 @@ export default class JsonRpcTransport extends EventEmitter {
}
/// Connect websocket
- connect(connectionString: string) {
- this.disconnect();
+ connect(connectionString: string): Promise<void> {
+ return new Promise((resolve, reject) => {
+ this.disconnect();
- log.info('Connecting to websocket', connectionString);
+ log.info('Connecting to websocket', connectionString);
- const websocket = this._websocketFactory(connectionString);
+ const webSocket = this._websocketFactory(connectionString);
- websocket.onopen = () => {
- log.info('Websocket is connected');
- this.emit('open');
- };
+ // A flag used to determine if Promise was resolved.
+ let isPromiseResolved = false;
- websocket.onmessage = (event) => {
- const data = event.data;
- if (typeof data === 'string') {
- this._onMessage(data);
- } else {
- log.error('Got invalid reply from the server', event);
- }
- };
+ webSocket.onopen = () => {
+ log.info('Websocket is connected');
+ this.emit('open');
+
+ // Resolve the Promise
+ resolve();
+ isPromiseResolved = true;
+ };
+
+ webSocket.onmessage = (event) => {
+ const data = event.data;
+ if (typeof data === 'string') {
+ this._onMessage(data);
+ } else {
+ log.error('Got invalid reply from the server', event);
+ }
+ };
- websocket.onclose = (event) => {
- log.info(`The websocket connection closed with code: ${event.code}`);
+ webSocket.onclose = (event) => {
+ log.info(`The websocket connection closed with code: ${event.code}`);
- // Remove all subscriptions since they are connection based
- this._subscriptions.clear();
+ // Remove all subscriptions since they are connection based
+ this._subscriptions.clear();
- // 1000 is a code used for normal connection closure.
- const connectionError = event.code === 1000 ? null : new ConnectionError(event.code);
+ // 1000 is a code used for normal connection closure.
+ const connectionError = event.code === 1000 ? null : new ConnectionError(event.code);
- this.emit('close', connectionError);
- };
+ this.emit('close', connectionError);
- this._websocket = websocket;
+ // Prevent rejecting a previously resolved Promise.
+ if (!isPromiseResolved) {
+ reject(connectionError);
+ }
+ };
+
+ this._webSocket = webSocket;
+ });
}
disconnect() {
- if (this._websocket) {
- this._websocket.close();
- this._websocket = null;
+ if (this._webSocket) {
+ this._webSocket.close();
+ this._webSocket = null;
}
}
@@ -195,19 +209,14 @@ export default class JsonRpcTransport extends EventEmitter {
}
}
- async send(
- action: string,
- data: mixed,
- timeout: number = DEFAULT_TIMEOUT_MILLIS,
- ): Promise<mixed> {
- let socket: WebSocket;
- try {
- socket = await this._getWebSocket();
- } catch (error) {
- throw error;
- }
-
+ send(action: string, data: mixed, timeout: number = DEFAULT_TIMEOUT_MILLIS): Promise<mixed> {
return new Promise((resolve, reject) => {
+ const webSocket = this._webSocket;
+ if (!webSocket) {
+ reject(new Error('Websocket is not connected.'));
+ return;
+ }
+
const id = uuid.v4();
const payload = this._prepareParams(data);
const timerId = setTimeout(() => this._onTimeout(id), timeout);
@@ -221,9 +230,14 @@ export default class JsonRpcTransport extends EventEmitter {
try {
log.silly('Sending message', id, action);
- socket.send(JSON.stringify(message));
+ webSocket.send(JSON.stringify(message));
} catch (error) {
log.error(`Failed sending RPC message "${action}": ${error.message}`);
+
+ // clean up on error
+ this._unansweredRequests.delete(id);
+ clearTimeout(timerId);
+
throw error;
}
});
@@ -245,25 +259,6 @@ export default class JsonRpcTransport extends EventEmitter {
}
}
- _getWebSocket(): Promise<WebSocket> {
- if (this._websocket && this._websocket.readyState === 1) {
- return Promise.resolve(this._websocket);
- } else {
- return new Promise((resolve, reject) => {
- log.debug('Waiting for websocket to connect');
-
- this.once('open', () => {
- const ws = this._websocket;
- if (ws) {
- resolve(ws);
- } else {
- reject(new Error('Internal error'));
- }
- });
- });
- }
- }
-
_onTimeout(requestId) {
const request = this._unansweredRequests.get(requestId);
diff --git a/package.json b/package.json
index 5610fe987d..68e25c2dd9 100644
--- a/package.json
+++ b/package.json
@@ -64,7 +64,7 @@
"eslint-plugin-react": "^7.9.1",
"flow-bin": "^0.78.0",
"flow-typed": "^2.4.0",
- "mock-socket": "^7.1.0",
+ "mock-socket": "^8.0.2",
"npm-run-all": "^4.0.1",
"prettier": "1.13.7",
"rimraf": "^2.5.4"
diff --git a/test/jsonrpc-transport.spec.js b/test/jsonrpc-transport.spec.js
index 1f36be83a6..96db4acec6 100644
--- a/test/jsonrpc-transport.spec.js
+++ b/test/jsonrpc-transport.spec.js
@@ -1,10 +1,8 @@
// @flow
-import JsonRpcTransport, {
- TimeOutError as JsonRpcTransportTimeOutError,
-} from '../app/lib/jsonrpc-transport';
import jsonrpc from 'jsonrpc-lite';
import { Server, WebSocket as MockWebSocket } from 'mock-socket';
+import JsonRpcTransport, { TimeOutError } from '../app/lib/jsonrpc-transport';
describe('JSON RPC transport', () => {
const WEBSOCKET_URL = 'ws://localhost:8080';
@@ -12,104 +10,87 @@ describe('JSON RPC transport', () => {
beforeEach(() => {
server = new Server(WEBSOCKET_URL);
- transport = new JsonRpcTransport((s) => new MockWebSocket(s));
+ transport = new JsonRpcTransport((url) => new MockWebSocket(url));
});
afterEach(() => {
server.close();
});
- it('should send as soon as the websocket connects', () => {
- server.on('message', (msg) => {
- const { payload } = jsonrpc.parse(msg);
-
- if (payload.method === 'hello') {
- server.send(JSON.stringify(jsonrpc.success(payload.id, 'ok')));
- }
- });
-
- const sendPromise = transport.send('hello');
-
- transport.connect(WEBSOCKET_URL);
-
- return expect(sendPromise).to.eventually.be.fulfilled;
- });
-
- it('should reject failed jsonrpc requests', () => {
- server.on('message', (msg) => {
- const { payload } = jsonrpc.parse(msg);
-
- if (payload.method === 'invalid-method') {
- server.send(
- JSON.stringify(
- jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)),
- ),
- );
- }
+ it('should reject failed jsonrpc requests', async () => {
+ server.on('connection', (socket) => {
+ socket.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+ if (payload.method === 'invalid-method') {
+ socket.send(
+ JSON.stringify(
+ jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)),
+ ),
+ );
+ }
+ });
});
+ await transport.connect(WEBSOCKET_URL);
const sendPromise = transport.send('invalid-method');
- transport.connect(WEBSOCKET_URL);
-
return expect(sendPromise).to.eventually.be.rejectedWith('Method not found');
});
- it('should route reply to correct promise', () => {
- server.on('message', (msg) => {
- const { payload } = jsonrpc.parse(msg);
-
- if (payload.method === 'a message') {
- server.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply')));
- }
+ it('should route reply to correct promise', async () => {
+ server.on('connection', (socket) => {
+ socket.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+ if (payload.method === 'a message') {
+ socket.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply')));
+ }
+ });
});
+ await transport.connect(WEBSOCKET_URL);
+
const decoyPromise = transport.send('a decoy', [], 100);
const messagePromise = transport.send('a message', [], 100);
- transport.connect(WEBSOCKET_URL);
-
return Promise.all([
expect(messagePromise).to.eventually.be.equal('a reply'),
- expect(decoyPromise).to.eventually.be.rejectedWith(JsonRpcTransportTimeOutError),
+ expect(decoyPromise).to.eventually.be.rejectedWith(TimeOutError),
]);
});
- it('should timeout if no response is returned', () => {
+ it('should timeout if no response is returned', async () => {
+ await transport.connect(WEBSOCKET_URL);
const sendPromise = transport.send('timeout-message', {}, 1);
- transport.connect(WEBSOCKET_URL);
-
- return expect(sendPromise).to.eventually.be.rejectedWith(
- JsonRpcTransportTimeOutError,
- 'Request timed out',
- );
+ return expect(sendPromise).to.eventually.be.rejectedWith(TimeOutError, 'Request timed out');
});
- it('should route notifications', () => {
- server.on('message', (msg) => {
- const { payload } = jsonrpc.parse(msg);
-
- if (payload.method === 'event_subscribe') {
- server.send(JSON.stringify(jsonrpc.success(payload.id, 1)));
- }
+ it('should route notifications', async () => {
+ server.on('connection', (socket) => {
+ socket.on('message', (msg) => {
+ const { payload } = jsonrpc.parse(msg);
+ if (payload.method === 'event_subscribe') {
+ socket.send(JSON.stringify(jsonrpc.success(payload.id, 1)));
+ }
+ });
});
- transport.connect(WEBSOCKET_URL);
+ await transport.connect(WEBSOCKET_URL);
- let subscribePromise;
- const eventPromise = new Promise((resolve) => {
- subscribePromise = transport.subscribe('event', resolve).then((value) => {
- server.send(
- JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'beacon' })),
- );
- return value;
- });
- });
+ const eventPromiseHelper = (() => {
+ let stolenResolve: (any) => void;
+ const eventHandler = (...args) => stolenResolve(...args);
+ eventHandler.promise = new Promise((resolve) => (stolenResolve = resolve));
+ return eventHandler;
+ })();
- return Promise.all([
- expect(subscribePromise).to.eventually.be.fulfilled,
- expect(eventPromise).to.eventually.be.equal('beacon'),
- ]);
+ await transport.subscribe('event', eventPromiseHelper);
+
+ server.emit(
+ 'message',
+ JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'beacon' })),
+ );
+
+ return expect(eventPromiseHelper.promise).to.eventually.be.equal('beacon');
});
});
diff --git a/yarn.lock b/yarn.lock
index 00bddf05b1..3febd43401 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -5275,9 +5275,11 @@ mocha@^5.2.0:
mkdirp "0.5.1"
supports-color "5.4.0"
-mock-socket@^7.1.0:
- version "7.1.0"
- resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-7.1.0.tgz#482ecccafb0f0e86b8905ba2aa57c1fe73ba262d"
+mock-socket@^8.0.2:
+ version "8.0.2"
+ resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-8.0.2.tgz#899dbe376a33a10165341939e5dd4653532dcd13"
+ dependencies:
+ url-parse "^1.2.0"
moment@^2.20.1:
version "2.20.1"
@@ -6111,6 +6113,10 @@ qs@~6.5.1:
version "6.5.1"
resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.1.tgz#349cdf6eef89ec45c12d7d5eb3fc0c870343a6d8"
+querystringify@^2.0.0:
+ version "2.0.0"
+ resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.0.0.tgz#fa3ed6e68eb15159457c89b37bc6472833195755"
+
quickselect@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/quickselect/-/quickselect-1.0.1.tgz#1e6ceaa9db1ca7c75aafcc863c7bef2037ca62a1"
@@ -6725,7 +6731,7 @@ require-uncached@^1.0.3:
caller-path "^0.1.0"
resolve-from "^1.0.0"
-requires-port@1.x.x:
+requires-port@1.x.x, requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
@@ -7841,6 +7847,13 @@ url-parse-lax@^1.0.0:
dependencies:
prepend-http "^1.0.1"
+url-parse@^1.2.0:
+ version "1.4.3"
+ resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.4.3.tgz#bfaee455c889023219d757e045fa6a684ec36c15"
+ dependencies:
+ querystringify "^2.0.0"
+ requires-port "^1.0.0"
+
url-to-options@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/url-to-options/-/url-to-options-1.0.1.tgz#1505a03a289a48cbd7a434efbaeec5055f5633a9"