summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrej Mihajlov <and@mullvad.net>2019-01-17 13:35:34 +0100
committerAndrej Mihajlov <and@mullvad.net>2019-01-18 14:07:20 +0100
commit089dfcc519ff2545e05679008d12a42edc2fa42b (patch)
treec31e8453c078df31e37ecc08e9db6cb0ee761a12
parentc087ca76570696550c788f85d9c4d2d283dc7453 (diff)
downloadmullvadvpn-089dfcc519ff2545e05679008d12a42edc2fa42b.tar.xz
mullvadvpn-089dfcc519ff2545e05679008d12a42edc2fa42b.zip
Refactor the SocketTransport
-rw-r--r--gui/packages/desktop/src/main/jsonrpc-client.js148
1 files changed, 80 insertions, 68 deletions
diff --git a/gui/packages/desktop/src/main/jsonrpc-client.js b/gui/packages/desktop/src/main/jsonrpc-client.js
index 0f51ca57cb..7635b62827 100644
--- a/gui/packages/desktop/src/main/jsonrpc-client.js
+++ b/gui/packages/desktop/src/main/jsonrpc-client.js
@@ -1,5 +1,6 @@
// @flow
+import assert from 'assert';
import { EventEmitter } from 'events';
import log from 'electron-log';
import jsonrpc from 'jsonrpc-lite';
@@ -320,7 +321,7 @@ export default class JsonRpcClient<T> extends EventEmitter {
interface Transport<T> {
close(): void;
- onOpen: (event: Event) => void;
+ onOpen: () => void;
onMessage: (Object) => void;
onClose: (error: ?Error) => void;
send(message: string): void;
@@ -329,15 +330,12 @@ interface Transport<T> {
export class WebsocketTransport implements Transport<string> {
ws: ?WebSocket;
- onOpen: (event: Event) => void;
- onMessage: (Object) => void;
- onClose: (error: ?Error) => void;
+ onOpen = () => {};
+ onMessage = (_message: Object) => {};
+ onClose = (_error: ?Error) => {};
constructor(ws: ?WebSocket) {
this.ws = ws;
- this.onOpen = () => {};
- this.onMessage = () => {};
- this.onClose = () => {};
}
close() {
@@ -355,7 +353,9 @@ export class WebsocketTransport implements Transport<string> {
this.ws.close();
}
this.ws = new WebSocket(params);
- this.ws.onopen = this.onOpen;
+ this.ws.onopen = (_event) => {
+ this.onOpen();
+ };
this.ws.onmessage = (event) => {
try {
const data = event.data;
@@ -381,88 +381,100 @@ export class WebsocketTransport implements Transport<string> {
// Given the correct parameters, this transport supports named pipes/unix
// domain sockets, and also TCP/UDP sockets
export class SocketTransport implements Transport<{ path: string }> {
- connection: ?net.Socket;
- onMessage: (message: Object) => void;
- onClose: (error: ?Error) => void;
- onOpen: (event: Event) => void;
- socketClosed: boolean;
- shouldClose: boolean;
+ onMessage = (_message: Object) => {};
+ onClose = (_error: ?Error) => {};
+ onOpen = () => {};
- constructor() {
- this.connection = null;
- this.onMessage = () => {};
- this.onClose = () => {};
- this.onOpen = () => {};
- this.socketClosed = false;
- this.shouldClose = false;
- }
+ _connection: ?net.Socket;
+ _socketReady = false;
+ _shouldClose = false;
+ _lastError: ?Error;
- _connect(options: { path: string }) {
- const connection = new net.Socket();
+ connect(options: { path: string }) {
+ assert(!this._connection, 'Make sure to close the existing socket');
- connection.on('error', (err) => {
- this._fail(err);
- });
+ const jsonStream = JSONStream.parse()
+ .on('data', this._onJsonStreamData)
+ .on('error', this._onJsonStreamError);
- connection.on('close', (hadErr) => {
- // if there's no error but nobody expected the socket to be closed an
- // error should still be propagated
- let err = null;
- if (!this.shouldClose || hadErr) {
- err = new TransportError('socket closed unexpectedly');
- }
- this._fail(err);
- });
+ const connection = new net.Socket()
+ .on('ready', this._onSocketReady)
+ .on('error', this._onSocketError)
+ .on('close', this._onSocketClose);
- connection.on('connect', (event) => {
- this.connection = connection;
- this.onOpen(event);
- });
+ this._connection = connection;
+ this._socketReady = false;
+ this._shouldClose = false;
+ this._lastError = null;
- const jsonStream = JSONStream.parse();
+ log.debug('Connect socket');
connection.pipe(jsonStream);
-
- jsonStream.on('data', this.onMessage);
-
- jsonStream.on('error', (err) => {
- this._fail(err);
- });
-
- this.socketClosed = false;
- this.shouldClose = false;
connection.connect(options);
}
- _fail(err: ?Error) {
- if (!this.socketClosed) {
- this.socketClosed = true;
- this.onClose(err);
- this.close();
- }
- }
-
close() {
- this.shouldClose = true;
+ this._shouldClose = true;
+
try {
- if (this.connection) {
- this.connection.end();
+ if (this._connection) {
+ this._connection.end();
}
} catch (error) {
- log.error('failed to close the connection: ', error);
+ log.error('Failed to close the socket: ', error);
}
- this.connection = null;
+
+ this._connection = null;
}
send(msg: string) {
- if (this.connection) {
- this.connection.write(msg);
+ if (this._socketReady && this._connection) {
+ this._connection.write(msg);
} else {
throw new TransportError('Socket not connected');
}
}
- connect(options: { path: string }): void {
- this._connect(options);
- }
+ _onSocketReady = () => {
+ this._socketReady = true;
+
+ log.debug('Socket is ready');
+
+ this.onOpen();
+ };
+
+ _onSocketError = (error: Error) => {
+ this._lastError = error;
+
+ log.error('Socket error: ', error);
+ };
+
+ _onSocketClose = (hadError: boolean) => {
+ if (this._shouldClose) {
+ log.debug(`Socket was closed deliberately`);
+
+ this.onClose();
+ } else if (hadError) {
+ log.debug(`Socket was closed due to an error`);
+
+ this.onClose(this._lastError);
+ } else {
+ log.debug(`Socket was closed by peer`);
+
+ this.onClose(new TransportError('Socket was closed by peer'));
+ }
+ };
+
+ _onJsonStreamData = (data) => {
+ this.onMessage(data);
+ };
+
+ _onJsonStreamError = (error: Error) => {
+ log.error('Socket JSON stream error: ', error);
+
+ if (this._connection) {
+ // This will destroy the socket and emit "error" and "close" events
+ this._connection.destroy(error);
+ }
+ };
}