summaryrefslogtreecommitdiffhomepage
path: root/gui/src
diff options
context:
space:
mode:
authorOskar <oskar@mullvad.net>2024-09-26 08:53:07 +0200
committerOskar <oskar@mullvad.net>2024-09-30 09:14:13 +0200
commit4749221bb3c4984b75e1bd42bf67c9d4ac362d01 (patch)
treea33cf2e04de39b3db77b39d357beb99306ea14a7 /gui/src
parent9c611e94f6d1771a548a0732e78c5e5424aa5d4f (diff)
downloadmullvadvpn-4749221bb3c4984b75e1bd42bf67c9d4ac362d01.tar.xz
mullvadvpn-4749221bb3c4984b75e1bd42bf67c9d4ac362d01.zip
Split DaemonRpc up into two parts
Diffstat (limited to 'gui/src')
-rw-r--r--gui/src/main/daemon-rpc.ts296
-rw-r--r--gui/src/main/grpc-client.ts246
-rw-r--r--gui/src/main/index.ts3
3 files changed, 279 insertions, 266 deletions
diff --git a/gui/src/main/daemon-rpc.ts b/gui/src/main/daemon-rpc.ts
index 721551d5b4..76f1f7f112 100644
--- a/gui/src/main/daemon-rpc.ts
+++ b/gui/src/main/daemon-rpc.ts
@@ -1,11 +1,6 @@
import * as grpc from '@grpc/grpc-js';
import { Empty } from 'google-protobuf/google/protobuf/empty_pb.js';
-import {
- BoolValue,
- StringValue,
- UInt32Value,
-} from 'google-protobuf/google/protobuf/wrappers_pb.js';
-import { promisify } from 'util';
+import { BoolValue, StringValue } from 'google-protobuf/google/protobuf/wrappers_pb.js';
import {
AccessMethodSetting,
@@ -32,7 +27,7 @@ import {
TunnelState,
VoucherResponse,
} from '../shared/daemon-rpc-types';
-import log from '../shared/logging';
+import { GrpcClient, noConnectionError } from './grpc-client';
import {
convertFromApiAccessMethodSetting,
convertFromDaemonEvent,
@@ -49,36 +44,8 @@ import {
convertToRelayConstraints,
ensureExists,
} from './grpc-type-convertions';
-import { ManagementServiceClient } from './management_interface/management_interface_grpc_pb';
import * as grpcTypes from './management_interface/management_interface_pb';
-const DAEMON_RPC_PATH =
- process.platform === 'win32' ? 'unix:////./pipe/Mullvad VPN' : 'unix:///var/run/mullvad-vpn';
-
-const NETWORK_CALL_TIMEOUT = 10000;
-const CHANNEL_STATE_TIMEOUT = 1000 * 60 * 60;
-
-const noConnectionError = new Error('No connection established to daemon');
-
-export class ConnectionObserver {
- constructor(
- private openHandler: () => void,
- private closeHandler: (wasConnected: boolean, error?: Error) => void,
- ) {}
-
- // Only meant to be called by DaemonRpc
- // @internal
- public onOpen = () => {
- this.openHandler();
- };
-
- // Only meant to be called by DaemonRpc
- // @internal
- public onClose = (wasConnected: boolean, error?: Error) => {
- this.closeHandler(wasConnected, error);
- };
-}
-
export class SubscriptionListener<T> {
// Only meant to be used by DaemonRpc
// @internal
@@ -102,79 +69,47 @@ export class SubscriptionListener<T> {
}
}
-type CallFunctionArgument<T, R> =
- | ((arg: T, callback: (error: Error | null, result: R) => void) => void)
- | undefined;
-
-export class DaemonRpc {
- private client: ManagementServiceClient;
- private isConnectedValue = false;
- private isClosed = false;
+export class DaemonRpc extends GrpcClient {
private nextSubscriptionId = 0;
private subscriptions: Map<number, grpc.ClientReadableStream<grpcTypes.DaemonEvent>> = new Map();
- private reconnectionTimeout?: NodeJS.Timeout;
- constructor(private connectionObserver?: ConnectionObserver) {
- this.client = new ManagementServiceClient(
- DAEMON_RPC_PATH,
- grpc.credentials.createInsecure(),
- this.channelOptions(),
- );
- }
+ public disconnect() {
+ for (const subscriptionId of this.subscriptions.keys()) {
+ this.removeSubscription(subscriptionId);
+ }
- public get isConnected() {
- return this.isConnectedValue;
+ super.disconnect();
}
- public reopen(connectionObserver?: ConnectionObserver) {
- if (this.isClosed) {
- this.isClosed = false;
- this.client = new ManagementServiceClient(
- DAEMON_RPC_PATH,
- grpc.credentials.createInsecure(),
- this.channelOptions(),
- );
-
- this.connectionObserver = connectionObserver;
+ public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) {
+ const call = this.isConnected && this.client.eventsListen(new Empty());
+ if (!call) {
+ throw noConnectionError;
}
- }
-
- public connect(): Promise<void> {
- return new Promise((resolve, reject) => {
- const usedClient = this.client;
- this.client.waitForReady(this.deadlineFromNow(), (error) => {
- if (this.client !== usedClient) {
- reject(new Error('Stale connection attempt'));
- return;
- }
+ const subscriptionId = this.subscriptionId();
+ listener.subscriptionId = subscriptionId;
+ this.subscriptions.set(subscriptionId, call);
- if (error) {
- this.onClose(error);
- this.ensureConnectivity();
- reject(error);
- } else {
- this.reconnectionTimeout = undefined;
- this.isConnectedValue = true;
- this.connectionObserver?.onOpen();
- this.setChannelCallback();
- resolve();
- }
- });
+ call.on('data', (data: grpcTypes.DaemonEvent) => {
+ try {
+ const daemonEvent = convertFromDaemonEvent(data);
+ listener.onEvent(daemonEvent);
+ } catch (e) {
+ const error = e as Error;
+ listener.onError(error);
+ }
});
- }
- public disconnect() {
- this.isConnectedValue = false;
-
- for (const subscriptionId of this.subscriptions.keys()) {
+ call.on('error', (error) => {
+ listener.onError(error);
this.removeSubscription(subscriptionId);
- }
+ });
+ }
- this.isClosed = true;
- this.client.close();
- this.connectionObserver = undefined;
- if (this.reconnectionTimeout) {
- clearTimeout(this.reconnectionTimeout);
+ public unsubscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) {
+ const id = listener.subscriptionId;
+ if (id !== undefined) {
+ this.removeSubscription(id);
}
}
@@ -440,38 +375,6 @@ export class DaemonRpc {
return convertFromSettings(response)!;
}
- public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) {
- const call = this.isConnected && this.client.eventsListen(new Empty());
- if (!call) {
- throw noConnectionError;
- }
- const subscriptionId = this.subscriptionId();
- listener.subscriptionId = subscriptionId;
- this.subscriptions.set(subscriptionId, call);
-
- call.on('data', (data: grpcTypes.DaemonEvent) => {
- try {
- const daemonEvent = convertFromDaemonEvent(data);
- listener.onEvent(daemonEvent);
- } catch (e) {
- const error = e as Error;
- listener.onError(error);
- }
- });
-
- call.on('error', (error) => {
- listener.onError(error);
- this.removeSubscription(subscriptionId);
- });
- }
-
- public unsubscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) {
- const id = listener.subscriptionId;
- if (id !== undefined) {
- this.removeSubscription(id);
- }
- }
-
public async getAccountHistory(): Promise<AccountToken | undefined> {
const response = await this.callEmpty<grpcTypes.AccountHistory>(this.client.getAccountHistory);
return response.getToken()?.getValue();
@@ -679,66 +582,6 @@ export class DaemonRpc {
return current;
}
- private deadlineFromNow() {
- return Date.now() + NETWORK_CALL_TIMEOUT;
- }
-
- private channelStateTimeout(): number {
- return Date.now() + CHANNEL_STATE_TIMEOUT;
- }
-
- private callEmpty<R = Empty>(fn: CallFunctionArgument<Empty, R>): Promise<R> {
- return this.call<Empty, R>(fn, new Empty());
- }
-
- private callString<R = Empty>(
- fn: CallFunctionArgument<StringValue, R>,
- value?: string,
- ): Promise<R> {
- const googleString = new StringValue();
-
- if (value !== undefined) {
- googleString.setValue(value);
- }
-
- return this.call<StringValue, R>(fn, googleString);
- }
-
- private callBool<R>(fn: CallFunctionArgument<BoolValue, R>, value?: boolean): Promise<R> {
- const googleBool = new BoolValue();
-
- if (value !== undefined) {
- googleBool.setValue(value);
- }
-
- return this.call<BoolValue, R>(fn, googleBool);
- }
-
- private callNumber<R>(fn: CallFunctionArgument<UInt32Value, R>, value?: number): Promise<R> {
- const googleNumber = new UInt32Value();
-
- if (value !== undefined) {
- googleNumber.setValue(value);
- }
-
- return this.call<UInt32Value, R>(fn, googleNumber);
- }
-
- private call<T, R>(fn: CallFunctionArgument<T, R>, arg: T): Promise<R> {
- if (fn && this.isConnected) {
- return promisify<T, R>(fn.bind(this.client))(arg);
- } else {
- throw noConnectionError;
- }
- }
-
- private onClose(error?: Error) {
- const wasConnected = this.isConnectedValue;
- this.isConnectedValue = false;
-
- this.connectionObserver?.onClose(wasConnected, error);
- }
-
private removeSubscription(id: number) {
const subscription = this.subscriptions.get(id);
if (subscription !== undefined) {
@@ -758,81 +601,4 @@ export class DaemonRpc {
setImmediate(() => subscription.cancel());
}
}
-
- private channelOptions(): grpc.ClientOptions {
- return {
- 'grpc.max_reconnect_backoff_ms': 3000,
- 'grpc.initial_reconnect_backoff_ms': 3000,
- 'grpc.keepalive_time_ms': Math.pow(2, 30),
- 'grpc.keepalive_timeout_ms': Math.pow(2, 30),
- 'grpc.client_idle_timeout_ms': Math.pow(2, 30),
- };
- }
-
- private connectivityChangeCallback(timeoutErr?: Error) {
- const channel = this.client.getChannel();
- const currentState = channel?.getConnectivityState(true);
- log.verbose(`GRPC Channel connectivity state changed to ${currentState}`);
- if (channel) {
- if (timeoutErr) {
- this.setChannelCallback(currentState);
- return;
- }
- const wasConnected = this.isConnected;
- if (this.channelDisconnected(currentState)) {
- this.onClose();
- // Try and reconnect in case
- void this.connect().catch((error) => {
- log.error(`Failed to reconnect - ${error}`);
- });
- this.setChannelCallback(currentState);
- } else if (!wasConnected && currentState === grpc.connectivityState.READY) {
- this.isConnectedValue = true;
- this.connectionObserver?.onOpen();
- this.setChannelCallback(currentState);
- }
- }
- }
-
- private channelDisconnected(state: grpc.connectivityState): boolean {
- return (
- (state === grpc.connectivityState.SHUTDOWN ||
- state === grpc.connectivityState.TRANSIENT_FAILURE ||
- state === grpc.connectivityState.IDLE) &&
- this.isConnected
- );
- }
-
- private setChannelCallback(currentState?: grpc.connectivityState) {
- const channel = this.client.getChannel();
- if (currentState === undefined && channel) {
- currentState = channel?.getConnectivityState(false);
- }
- if (currentState) {
- channel.watchConnectivityState(currentState, this.channelStateTimeout(), (error) =>
- this.connectivityChangeCallback(error),
- );
- }
- }
-
- // Since grpc.Channel.watchConnectivityState() isn't always running as intended, whenever the
- // client fails to connect at first, `ensureConnectivity()` should be called so that it tries to
- // check the connectivity state and nudge the client into connecting.
- // `grpc.Channel.getConnectivityState(true)` should make it attempt to connect.
- private ensureConnectivity() {
- if (this.reconnectionTimeout) {
- clearTimeout(this.reconnectionTimeout);
- }
- this.reconnectionTimeout = setTimeout(() => {
- const lastState = this.client.getChannel().getConnectivityState(true);
- if (this.channelDisconnected(lastState)) {
- this.onClose();
- }
- if (!this.isConnected) {
- void this.connect().catch((error) => {
- log.error(`Failed to reconnect - ${error}`);
- });
- }
- }, 3000);
- }
}
diff --git a/gui/src/main/grpc-client.ts b/gui/src/main/grpc-client.ts
new file mode 100644
index 0000000000..3096522a06
--- /dev/null
+++ b/gui/src/main/grpc-client.ts
@@ -0,0 +1,246 @@
+import * as grpc from '@grpc/grpc-js';
+import { Empty } from 'google-protobuf/google/protobuf/empty_pb.js';
+import {
+ BoolValue,
+ StringValue,
+ UInt32Value,
+} from 'google-protobuf/google/protobuf/wrappers_pb.js';
+import { promisify } from 'util';
+
+import log from '../shared/logging';
+import { ManagementServiceClient } from './management_interface/management_interface_grpc_pb';
+
+const DAEMON_RPC_PATH =
+ process.platform === 'win32' ? 'unix:////./pipe/Mullvad VPN' : 'unix:///var/run/mullvad-vpn';
+
+const NETWORK_CALL_TIMEOUT = 10000;
+const CHANNEL_STATE_TIMEOUT = 1000 * 60 * 60;
+
+type CallFunctionArgument<T, R> =
+ | ((arg: T, callback: (error: Error | null, result: R) => void) => void)
+ | undefined;
+
+export const noConnectionError = new Error('No connection established to daemon');
+
+export class ConnectionObserver {
+ constructor(
+ private openHandler: () => void,
+ private closeHandler: (wasConnected: boolean, error?: Error) => void,
+ ) {}
+
+ // Only meant to be called by DaemonRpc
+ // @internal
+ public onOpen = () => {
+ this.openHandler();
+ };
+
+ // Only meant to be called by DaemonRpc
+ // @internal
+ public onClose = (wasConnected: boolean, error?: Error) => {
+ this.closeHandler(wasConnected, error);
+ };
+}
+
+export class GrpcClient {
+ protected client: ManagementServiceClient;
+ private isConnectedValue = false;
+ private isClosed = false;
+ private reconnectionTimeout?: NodeJS.Timeout;
+
+ constructor(private connectionObserver?: ConnectionObserver) {
+ this.client = new ManagementServiceClient(
+ DAEMON_RPC_PATH,
+ grpc.credentials.createInsecure(),
+ this.channelOptions(),
+ );
+ }
+
+ public get isConnected() {
+ return this.isConnectedValue;
+ }
+
+ public reopen(connectionObserver?: ConnectionObserver) {
+ if (this.isClosed) {
+ this.isClosed = false;
+ this.client = new ManagementServiceClient(
+ DAEMON_RPC_PATH,
+ grpc.credentials.createInsecure(),
+ this.channelOptions(),
+ );
+
+ this.connectionObserver = connectionObserver;
+ }
+ }
+
+ public connect(): Promise<void> {
+ return new Promise((resolve, reject) => {
+ const usedClient = this.client;
+ this.client.waitForReady(this.deadlineFromNow(), (error) => {
+ if (this.client !== usedClient) {
+ reject(new Error('Stale connection attempt'));
+ return;
+ }
+
+ if (error) {
+ this.onClose(error);
+ this.ensureConnectivity();
+ reject(error);
+ } else {
+ this.reconnectionTimeout = undefined;
+ this.isConnectedValue = true;
+ this.connectionObserver?.onOpen();
+ this.setChannelCallback();
+ resolve();
+ }
+ });
+ });
+ }
+
+ public disconnect() {
+ this.isConnectedValue = false;
+
+ this.isClosed = true;
+ this.client.close();
+ this.connectionObserver = undefined;
+ if (this.reconnectionTimeout) {
+ clearTimeout(this.reconnectionTimeout);
+ }
+ }
+
+ protected callEmpty<R = Empty>(fn: CallFunctionArgument<Empty, R>): Promise<R> {
+ return this.call<Empty, R>(fn, new Empty());
+ }
+
+ protected callString<R = Empty>(
+ fn: CallFunctionArgument<StringValue, R>,
+ value?: string,
+ ): Promise<R> {
+ const googleString = new StringValue();
+
+ if (value !== undefined) {
+ googleString.setValue(value);
+ }
+
+ return this.call<StringValue, R>(fn, googleString);
+ }
+
+ protected callBool<R>(fn: CallFunctionArgument<BoolValue, R>, value?: boolean): Promise<R> {
+ const googleBool = new BoolValue();
+
+ if (value !== undefined) {
+ googleBool.setValue(value);
+ }
+
+ return this.call<BoolValue, R>(fn, googleBool);
+ }
+
+ protected callNumber<R>(fn: CallFunctionArgument<UInt32Value, R>, value?: number): Promise<R> {
+ const googleNumber = new UInt32Value();
+
+ if (value !== undefined) {
+ googleNumber.setValue(value);
+ }
+
+ return this.call<UInt32Value, R>(fn, googleNumber);
+ }
+
+ protected call<T, R>(fn: CallFunctionArgument<T, R>, arg: T): Promise<R> {
+ if (fn && this.isConnected) {
+ return promisify<T, R>(fn.bind(this.client))(arg);
+ } else {
+ throw noConnectionError;
+ }
+ }
+
+ private deadlineFromNow() {
+ return Date.now() + NETWORK_CALL_TIMEOUT;
+ }
+
+ private channelStateTimeout(): number {
+ return Date.now() + CHANNEL_STATE_TIMEOUT;
+ }
+
+ private onClose(error?: Error) {
+ const wasConnected = this.isConnectedValue;
+ this.isConnectedValue = false;
+
+ this.connectionObserver?.onClose(wasConnected, error);
+ }
+
+ private channelOptions(): grpc.ClientOptions {
+ return {
+ 'grpc.max_reconnect_backoff_ms': 3000,
+ 'grpc.initial_reconnect_backoff_ms': 3000,
+ 'grpc.keepalive_time_ms': Math.pow(2, 30),
+ 'grpc.keepalive_timeout_ms': Math.pow(2, 30),
+ 'grpc.client_idle_timeout_ms': Math.pow(2, 30),
+ };
+ }
+
+ private connectivityChangeCallback(timeoutErr?: Error) {
+ const channel = this.client.getChannel();
+ const currentState = channel?.getConnectivityState(true);
+ log.verbose(`GRPC Channel connectivity state changed to ${currentState}`);
+ if (channel) {
+ if (timeoutErr) {
+ this.setChannelCallback(currentState);
+ return;
+ }
+ const wasConnected = this.isConnected;
+ if (this.channelDisconnected(currentState)) {
+ this.onClose();
+ // Try and reconnect in case
+ void this.connect().catch((error) => {
+ log.error(`Failed to reconnect - ${error}`);
+ });
+ this.setChannelCallback(currentState);
+ } else if (!wasConnected && currentState === grpc.connectivityState.READY) {
+ this.isConnectedValue = true;
+ this.connectionObserver?.onOpen();
+ this.setChannelCallback(currentState);
+ }
+ }
+ }
+
+ private channelDisconnected(state: grpc.connectivityState): boolean {
+ return (
+ (state === grpc.connectivityState.SHUTDOWN ||
+ state === grpc.connectivityState.TRANSIENT_FAILURE ||
+ state === grpc.connectivityState.IDLE) &&
+ this.isConnected
+ );
+ }
+
+ private setChannelCallback(currentState?: grpc.connectivityState) {
+ const channel = this.client.getChannel();
+ if (currentState === undefined && channel) {
+ currentState = channel?.getConnectivityState(false);
+ }
+ if (currentState) {
+ channel.watchConnectivityState(currentState, this.channelStateTimeout(), (error) =>
+ this.connectivityChangeCallback(error),
+ );
+ }
+ }
+
+ // Since grpc.Channel.watchConnectivityState() isn't always running as intended, whenever the
+ // client fails to connect at first, `ensureConnectivity()` should be called so that it tries to
+ // check the connectivity state and nudge the client into connecting.
+ // `grpc.Channel.getConnectivityState(true)` should make it attempt to connect.
+ private ensureConnectivity() {
+ if (this.reconnectionTimeout) {
+ clearTimeout(this.reconnectionTimeout);
+ }
+ this.reconnectionTimeout = setTimeout(() => {
+ const lastState = this.client.getChannel().getConnectivityState(true);
+ if (this.channelDisconnected(lastState)) {
+ this.onClose();
+ }
+ if (!this.isConnected) {
+ void this.connect().catch((error) => {
+ log.error(`Failed to reconnect - ${error}`);
+ });
+ }
+ }, 3000);
+ }
+}
diff --git a/gui/src/main/index.ts b/gui/src/main/index.ts
index 2401e70b69..3eccc403b1 100644
--- a/gui/src/main/index.ts
+++ b/gui/src/main/index.ts
@@ -36,8 +36,9 @@ import {
printCommandLineOptions,
printElectronOptions,
} from './command-line-options';
-import { ConnectionObserver, DaemonRpc, SubscriptionListener } from './daemon-rpc';
+import { DaemonRpc, SubscriptionListener } from './daemon-rpc';
import Expectation from './expectation';
+import { ConnectionObserver } from './grpc-client';
import { IpcMainEventChannel } from './ipc-event-channel';
import { findIconPath } from './linux-desktop-entry';
import { loadTranslations } from './load-translations';