diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2019-01-17 13:35:34 +0100 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2019-01-18 14:07:20 +0100 |
| commit | 089dfcc519ff2545e05679008d12a42edc2fa42b (patch) | |
| tree | c31e8453c078df31e37ecc08e9db6cb0ee761a12 | |
| parent | c087ca76570696550c788f85d9c4d2d283dc7453 (diff) | |
| download | mullvadvpn-089dfcc519ff2545e05679008d12a42edc2fa42b.tar.xz mullvadvpn-089dfcc519ff2545e05679008d12a42edc2fa42b.zip | |
Refactor the SocketTransport
| -rw-r--r-- | gui/packages/desktop/src/main/jsonrpc-client.js | 148 |
1 files changed, 80 insertions, 68 deletions
diff --git a/gui/packages/desktop/src/main/jsonrpc-client.js b/gui/packages/desktop/src/main/jsonrpc-client.js index 0f51ca57cb..7635b62827 100644 --- a/gui/packages/desktop/src/main/jsonrpc-client.js +++ b/gui/packages/desktop/src/main/jsonrpc-client.js @@ -1,5 +1,6 @@ // @flow +import assert from 'assert'; import { EventEmitter } from 'events'; import log from 'electron-log'; import jsonrpc from 'jsonrpc-lite'; @@ -320,7 +321,7 @@ export default class JsonRpcClient<T> extends EventEmitter { interface Transport<T> { close(): void; - onOpen: (event: Event) => void; + onOpen: () => void; onMessage: (Object) => void; onClose: (error: ?Error) => void; send(message: string): void; @@ -329,15 +330,12 @@ interface Transport<T> { export class WebsocketTransport implements Transport<string> { ws: ?WebSocket; - onOpen: (event: Event) => void; - onMessage: (Object) => void; - onClose: (error: ?Error) => void; + onOpen = () => {}; + onMessage = (_message: Object) => {}; + onClose = (_error: ?Error) => {}; constructor(ws: ?WebSocket) { this.ws = ws; - this.onOpen = () => {}; - this.onMessage = () => {}; - this.onClose = () => {}; } close() { @@ -355,7 +353,9 @@ export class WebsocketTransport implements Transport<string> { this.ws.close(); } this.ws = new WebSocket(params); - this.ws.onopen = this.onOpen; + this.ws.onopen = (_event) => { + this.onOpen(); + }; this.ws.onmessage = (event) => { try { const data = event.data; @@ -381,88 +381,100 @@ export class WebsocketTransport implements Transport<string> { // Given the correct parameters, this transport supports named pipes/unix // domain sockets, and also TCP/UDP sockets export class SocketTransport implements Transport<{ path: string }> { - connection: ?net.Socket; - onMessage: (message: Object) => void; - onClose: (error: ?Error) => void; - onOpen: (event: Event) => void; - socketClosed: boolean; - shouldClose: boolean; + onMessage = (_message: Object) => {}; + onClose = (_error: ?Error) => {}; + onOpen = () => {}; - constructor() { - this.connection = null; - this.onMessage = () => {}; - this.onClose = () => {}; - this.onOpen = () => {}; - this.socketClosed = false; - this.shouldClose = false; - } + _connection: ?net.Socket; + _socketReady = false; + _shouldClose = false; + _lastError: ?Error; - _connect(options: { path: string }) { - const connection = new net.Socket(); + connect(options: { path: string }) { + assert(!this._connection, 'Make sure to close the existing socket'); - connection.on('error', (err) => { - this._fail(err); - }); + const jsonStream = JSONStream.parse() + .on('data', this._onJsonStreamData) + .on('error', this._onJsonStreamError); - connection.on('close', (hadErr) => { - // if there's no error but nobody expected the socket to be closed an - // error should still be propagated - let err = null; - if (!this.shouldClose || hadErr) { - err = new TransportError('socket closed unexpectedly'); - } - this._fail(err); - }); + const connection = new net.Socket() + .on('ready', this._onSocketReady) + .on('error', this._onSocketError) + .on('close', this._onSocketClose); - connection.on('connect', (event) => { - this.connection = connection; - this.onOpen(event); - }); + this._connection = connection; + this._socketReady = false; + this._shouldClose = false; + this._lastError = null; - const jsonStream = JSONStream.parse(); + log.debug('Connect socket'); connection.pipe(jsonStream); - - jsonStream.on('data', this.onMessage); - - jsonStream.on('error', (err) => { - this._fail(err); - }); - - this.socketClosed = false; - this.shouldClose = false; connection.connect(options); } - _fail(err: ?Error) { - if (!this.socketClosed) { - this.socketClosed = true; - this.onClose(err); - this.close(); - } - } - close() { - this.shouldClose = true; + this._shouldClose = true; + try { - if (this.connection) { - this.connection.end(); + if (this._connection) { + this._connection.end(); } } catch (error) { - log.error('failed to close the connection: ', error); + log.error('Failed to close the socket: ', error); } - this.connection = null; + + this._connection = null; } send(msg: string) { - if (this.connection) { - this.connection.write(msg); + if (this._socketReady && this._connection) { + this._connection.write(msg); } else { throw new TransportError('Socket not connected'); } } - connect(options: { path: string }): void { - this._connect(options); - } + _onSocketReady = () => { + this._socketReady = true; + + log.debug('Socket is ready'); + + this.onOpen(); + }; + + _onSocketError = (error: Error) => { + this._lastError = error; + + log.error('Socket error: ', error); + }; + + _onSocketClose = (hadError: boolean) => { + if (this._shouldClose) { + log.debug(`Socket was closed deliberately`); + + this.onClose(); + } else if (hadError) { + log.debug(`Socket was closed due to an error`); + + this.onClose(this._lastError); + } else { + log.debug(`Socket was closed by peer`); + + this.onClose(new TransportError('Socket was closed by peer')); + } + }; + + _onJsonStreamData = (data) => { + this.onMessage(data); + }; + + _onJsonStreamError = (error: Error) => { + log.error('Socket JSON stream error: ', error); + + if (this._connection) { + // This will destroy the socket and emit "error" and "close" events + this._connection.destroy(error); + } + }; } |
