diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2018-08-06 17:14:10 +0200 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2018-08-08 16:25:34 +0200 |
| commit | 3e6a4de0a1682bbe51300f4bee4f84f2679a8c25 (patch) | |
| tree | a2b25c7791df2a241e7430a8bd6fcc977ff61b00 | |
| parent | 9f01cc07d2d6da86d43a002e52a189f60b63eb5d (diff) | |
| download | mullvadvpn-3e6a4de0a1682bbe51300f4bee4f84f2679a8c25.tar.xz mullvadvpn-3e6a4de0a1682bbe51300f4bee4f84f2679a8c25.zip | |
Stop buffering JSON RPC messages when disconnected
| -rw-r--r-- | app/lib/jsonrpc-transport.js | 117 | ||||
| -rw-r--r-- | package.json | 2 | ||||
| -rw-r--r-- | test/jsonrpc-transport.spec.js | 123 | ||||
| -rw-r--r-- | yarn.lock | 21 |
4 files changed, 126 insertions, 137 deletions
diff --git a/app/lib/jsonrpc-transport.js b/app/lib/jsonrpc-transport.js index 2fa98df988..cbf1e7dce8 100644 --- a/app/lib/jsonrpc-transport.js +++ b/app/lib/jsonrpc-transport.js @@ -123,7 +123,7 @@ const DEFAULT_TIMEOUT_MILLIS = 5000; export default class JsonRpcTransport extends EventEmitter { _unansweredRequests: Map<string, UnansweredRequest> = new Map(); _subscriptions: Map<string | number, (mixed) => void> = new Map(); - _websocket: ?WebSocket; + _webSocket: ?WebSocket; _websocketFactory: (string) => WebSocket; constructor(websocketFactory: ?(string) => WebSocket) { @@ -133,46 +133,60 @@ export default class JsonRpcTransport extends EventEmitter { } /// Connect websocket - connect(connectionString: string) { - this.disconnect(); + connect(connectionString: string): Promise<void> { + return new Promise((resolve, reject) => { + this.disconnect(); - log.info('Connecting to websocket', connectionString); + log.info('Connecting to websocket', connectionString); - const websocket = this._websocketFactory(connectionString); + const webSocket = this._websocketFactory(connectionString); - websocket.onopen = () => { - log.info('Websocket is connected'); - this.emit('open'); - }; + // A flag used to determine if Promise was resolved. + let isPromiseResolved = false; - websocket.onmessage = (event) => { - const data = event.data; - if (typeof data === 'string') { - this._onMessage(data); - } else { - log.error('Got invalid reply from the server', event); - } - }; + webSocket.onopen = () => { + log.info('Websocket is connected'); + this.emit('open'); + + // Resolve the Promise + resolve(); + isPromiseResolved = true; + }; + + webSocket.onmessage = (event) => { + const data = event.data; + if (typeof data === 'string') { + this._onMessage(data); + } else { + log.error('Got invalid reply from the server', event); + } + }; - websocket.onclose = (event) => { - log.info(`The websocket connection closed with code: ${event.code}`); + webSocket.onclose = (event) => { + log.info(`The websocket connection closed with code: ${event.code}`); - // Remove all subscriptions since they are connection based - this._subscriptions.clear(); + // Remove all subscriptions since they are connection based + this._subscriptions.clear(); - // 1000 is a code used for normal connection closure. - const connectionError = event.code === 1000 ? null : new ConnectionError(event.code); + // 1000 is a code used for normal connection closure. + const connectionError = event.code === 1000 ? null : new ConnectionError(event.code); - this.emit('close', connectionError); - }; + this.emit('close', connectionError); - this._websocket = websocket; + // Prevent rejecting a previously resolved Promise. + if (!isPromiseResolved) { + reject(connectionError); + } + }; + + this._webSocket = webSocket; + }); } disconnect() { - if (this._websocket) { - this._websocket.close(); - this._websocket = null; + if (this._webSocket) { + this._webSocket.close(); + this._webSocket = null; } } @@ -195,19 +209,14 @@ export default class JsonRpcTransport extends EventEmitter { } } - async send( - action: string, - data: mixed, - timeout: number = DEFAULT_TIMEOUT_MILLIS, - ): Promise<mixed> { - let socket: WebSocket; - try { - socket = await this._getWebSocket(); - } catch (error) { - throw error; - } - + send(action: string, data: mixed, timeout: number = DEFAULT_TIMEOUT_MILLIS): Promise<mixed> { return new Promise((resolve, reject) => { + const webSocket = this._webSocket; + if (!webSocket) { + reject(new Error('Websocket is not connected.')); + return; + } + const id = uuid.v4(); const payload = this._prepareParams(data); const timerId = setTimeout(() => this._onTimeout(id), timeout); @@ -221,9 +230,14 @@ export default class JsonRpcTransport extends EventEmitter { try { log.silly('Sending message', id, action); - socket.send(JSON.stringify(message)); + webSocket.send(JSON.stringify(message)); } catch (error) { log.error(`Failed sending RPC message "${action}": ${error.message}`); + + // clean up on error + this._unansweredRequests.delete(id); + clearTimeout(timerId); + throw error; } }); @@ -245,25 +259,6 @@ export default class JsonRpcTransport extends EventEmitter { } } - _getWebSocket(): Promise<WebSocket> { - if (this._websocket && this._websocket.readyState === 1) { - return Promise.resolve(this._websocket); - } else { - return new Promise((resolve, reject) => { - log.debug('Waiting for websocket to connect'); - - this.once('open', () => { - const ws = this._websocket; - if (ws) { - resolve(ws); - } else { - reject(new Error('Internal error')); - } - }); - }); - } - } - _onTimeout(requestId) { const request = this._unansweredRequests.get(requestId); diff --git a/package.json b/package.json index 5610fe987d..68e25c2dd9 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,7 @@ "eslint-plugin-react": "^7.9.1", "flow-bin": "^0.78.0", "flow-typed": "^2.4.0", - "mock-socket": "^7.1.0", + "mock-socket": "^8.0.2", "npm-run-all": "^4.0.1", "prettier": "1.13.7", "rimraf": "^2.5.4" diff --git a/test/jsonrpc-transport.spec.js b/test/jsonrpc-transport.spec.js index 1f36be83a6..96db4acec6 100644 --- a/test/jsonrpc-transport.spec.js +++ b/test/jsonrpc-transport.spec.js @@ -1,10 +1,8 @@ // @flow -import JsonRpcTransport, { - TimeOutError as JsonRpcTransportTimeOutError, -} from '../app/lib/jsonrpc-transport'; import jsonrpc from 'jsonrpc-lite'; import { Server, WebSocket as MockWebSocket } from 'mock-socket'; +import JsonRpcTransport, { TimeOutError } from '../app/lib/jsonrpc-transport'; describe('JSON RPC transport', () => { const WEBSOCKET_URL = 'ws://localhost:8080'; @@ -12,104 +10,87 @@ describe('JSON RPC transport', () => { beforeEach(() => { server = new Server(WEBSOCKET_URL); - transport = new JsonRpcTransport((s) => new MockWebSocket(s)); + transport = new JsonRpcTransport((url) => new MockWebSocket(url)); }); afterEach(() => { server.close(); }); - it('should send as soon as the websocket connects', () => { - server.on('message', (msg) => { - const { payload } = jsonrpc.parse(msg); - - if (payload.method === 'hello') { - server.send(JSON.stringify(jsonrpc.success(payload.id, 'ok'))); - } - }); - - const sendPromise = transport.send('hello'); - - transport.connect(WEBSOCKET_URL); - - return expect(sendPromise).to.eventually.be.fulfilled; - }); - - it('should reject failed jsonrpc requests', () => { - server.on('message', (msg) => { - const { payload } = jsonrpc.parse(msg); - - if (payload.method === 'invalid-method') { - server.send( - JSON.stringify( - jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)), - ), - ); - } + it('should reject failed jsonrpc requests', async () => { + server.on('connection', (socket) => { + socket.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + if (payload.method === 'invalid-method') { + socket.send( + JSON.stringify( + jsonrpc.error(payload.id, new jsonrpc.JsonRpcError('Method not found', -32601)), + ), + ); + } + }); }); + await transport.connect(WEBSOCKET_URL); const sendPromise = transport.send('invalid-method'); - transport.connect(WEBSOCKET_URL); - return expect(sendPromise).to.eventually.be.rejectedWith('Method not found'); }); - it('should route reply to correct promise', () => { - server.on('message', (msg) => { - const { payload } = jsonrpc.parse(msg); - - if (payload.method === 'a message') { - server.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply'))); - } + it('should route reply to correct promise', async () => { + server.on('connection', (socket) => { + socket.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + if (payload.method === 'a message') { + socket.send(JSON.stringify(jsonrpc.success(payload.id, 'a reply'))); + } + }); }); + await transport.connect(WEBSOCKET_URL); + const decoyPromise = transport.send('a decoy', [], 100); const messagePromise = transport.send('a message', [], 100); - transport.connect(WEBSOCKET_URL); - return Promise.all([ expect(messagePromise).to.eventually.be.equal('a reply'), - expect(decoyPromise).to.eventually.be.rejectedWith(JsonRpcTransportTimeOutError), + expect(decoyPromise).to.eventually.be.rejectedWith(TimeOutError), ]); }); - it('should timeout if no response is returned', () => { + it('should timeout if no response is returned', async () => { + await transport.connect(WEBSOCKET_URL); const sendPromise = transport.send('timeout-message', {}, 1); - transport.connect(WEBSOCKET_URL); - - return expect(sendPromise).to.eventually.be.rejectedWith( - JsonRpcTransportTimeOutError, - 'Request timed out', - ); + return expect(sendPromise).to.eventually.be.rejectedWith(TimeOutError, 'Request timed out'); }); - it('should route notifications', () => { - server.on('message', (msg) => { - const { payload } = jsonrpc.parse(msg); - - if (payload.method === 'event_subscribe') { - server.send(JSON.stringify(jsonrpc.success(payload.id, 1))); - } + it('should route notifications', async () => { + server.on('connection', (socket) => { + socket.on('message', (msg) => { + const { payload } = jsonrpc.parse(msg); + if (payload.method === 'event_subscribe') { + socket.send(JSON.stringify(jsonrpc.success(payload.id, 1))); + } + }); }); - transport.connect(WEBSOCKET_URL); + await transport.connect(WEBSOCKET_URL); - let subscribePromise; - const eventPromise = new Promise((resolve) => { - subscribePromise = transport.subscribe('event', resolve).then((value) => { - server.send( - JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'beacon' })), - ); - return value; - }); - }); + const eventPromiseHelper = (() => { + let stolenResolve: (any) => void; + const eventHandler = (...args) => stolenResolve(...args); + eventHandler.promise = new Promise((resolve) => (stolenResolve = resolve)); + return eventHandler; + })(); - return Promise.all([ - expect(subscribePromise).to.eventually.be.fulfilled, - expect(eventPromise).to.eventually.be.equal('beacon'), - ]); + await transport.subscribe('event', eventPromiseHelper); + + server.emit( + 'message', + JSON.stringify(jsonrpc.notification('event', { subscription: 1, result: 'beacon' })), + ); + + return expect(eventPromiseHelper.promise).to.eventually.be.equal('beacon'); }); }); @@ -5275,9 +5275,11 @@ mocha@^5.2.0: mkdirp "0.5.1" supports-color "5.4.0" -mock-socket@^7.1.0: - version "7.1.0" - resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-7.1.0.tgz#482ecccafb0f0e86b8905ba2aa57c1fe73ba262d" +mock-socket@^8.0.2: + version "8.0.2" + resolved "https://registry.yarnpkg.com/mock-socket/-/mock-socket-8.0.2.tgz#899dbe376a33a10165341939e5dd4653532dcd13" + dependencies: + url-parse "^1.2.0" moment@^2.20.1: version "2.20.1" @@ -6111,6 +6113,10 @@ qs@~6.5.1: version "6.5.1" resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.1.tgz#349cdf6eef89ec45c12d7d5eb3fc0c870343a6d8" +querystringify@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.0.0.tgz#fa3ed6e68eb15159457c89b37bc6472833195755" + quickselect@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/quickselect/-/quickselect-1.0.1.tgz#1e6ceaa9db1ca7c75aafcc863c7bef2037ca62a1" @@ -6725,7 +6731,7 @@ require-uncached@^1.0.3: caller-path "^0.1.0" resolve-from "^1.0.0" -requires-port@1.x.x: +requires-port@1.x.x, requires-port@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff" @@ -7841,6 +7847,13 @@ url-parse-lax@^1.0.0: dependencies: prepend-http "^1.0.1" +url-parse@^1.2.0: + version "1.4.3" + resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.4.3.tgz#bfaee455c889023219d757e045fa6a684ec36c15" + dependencies: + querystringify "^2.0.0" + requires-port "^1.0.0" + url-to-options@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/url-to-options/-/url-to-options-1.0.1.tgz#1505a03a289a48cbd7a434efbaeec5055f5633a9" |
