diff options
| author | Oskar <oskar@mullvad.net> | 2024-09-26 08:53:07 +0200 |
|---|---|---|
| committer | Oskar <oskar@mullvad.net> | 2024-09-30 09:14:13 +0200 |
| commit | 4749221bb3c4984b75e1bd42bf67c9d4ac362d01 (patch) | |
| tree | a33cf2e04de39b3db77b39d357beb99306ea14a7 /gui/src | |
| parent | 9c611e94f6d1771a548a0732e78c5e5424aa5d4f (diff) | |
| download | mullvadvpn-4749221bb3c4984b75e1bd42bf67c9d4ac362d01.tar.xz mullvadvpn-4749221bb3c4984b75e1bd42bf67c9d4ac362d01.zip | |
Split DaemonRpc up into two parts
Diffstat (limited to 'gui/src')
| -rw-r--r-- | gui/src/main/daemon-rpc.ts | 296 | ||||
| -rw-r--r-- | gui/src/main/grpc-client.ts | 246 | ||||
| -rw-r--r-- | gui/src/main/index.ts | 3 |
3 files changed, 279 insertions, 266 deletions
diff --git a/gui/src/main/daemon-rpc.ts b/gui/src/main/daemon-rpc.ts index 721551d5b4..76f1f7f112 100644 --- a/gui/src/main/daemon-rpc.ts +++ b/gui/src/main/daemon-rpc.ts @@ -1,11 +1,6 @@ import * as grpc from '@grpc/grpc-js'; import { Empty } from 'google-protobuf/google/protobuf/empty_pb.js'; -import { - BoolValue, - StringValue, - UInt32Value, -} from 'google-protobuf/google/protobuf/wrappers_pb.js'; -import { promisify } from 'util'; +import { BoolValue, StringValue } from 'google-protobuf/google/protobuf/wrappers_pb.js'; import { AccessMethodSetting, @@ -32,7 +27,7 @@ import { TunnelState, VoucherResponse, } from '../shared/daemon-rpc-types'; -import log from '../shared/logging'; +import { GrpcClient, noConnectionError } from './grpc-client'; import { convertFromApiAccessMethodSetting, convertFromDaemonEvent, @@ -49,36 +44,8 @@ import { convertToRelayConstraints, ensureExists, } from './grpc-type-convertions'; -import { ManagementServiceClient } from './management_interface/management_interface_grpc_pb'; import * as grpcTypes from './management_interface/management_interface_pb'; -const DAEMON_RPC_PATH = - process.platform === 'win32' ? 'unix:////./pipe/Mullvad VPN' : 'unix:///var/run/mullvad-vpn'; - -const NETWORK_CALL_TIMEOUT = 10000; -const CHANNEL_STATE_TIMEOUT = 1000 * 60 * 60; - -const noConnectionError = new Error('No connection established to daemon'); - -export class ConnectionObserver { - constructor( - private openHandler: () => void, - private closeHandler: (wasConnected: boolean, error?: Error) => void, - ) {} - - // Only meant to be called by DaemonRpc - // @internal - public onOpen = () => { - this.openHandler(); - }; - - // Only meant to be called by DaemonRpc - // @internal - public onClose = (wasConnected: boolean, error?: Error) => { - this.closeHandler(wasConnected, error); - }; -} - export class SubscriptionListener<T> { // Only meant to be used by DaemonRpc // @internal @@ -102,79 +69,47 @@ export class SubscriptionListener<T> { } } -type CallFunctionArgument<T, R> = - | ((arg: T, callback: (error: Error | null, result: R) => void) => void) - | undefined; - -export class DaemonRpc { - private client: ManagementServiceClient; - private isConnectedValue = false; - private isClosed = false; +export class DaemonRpc extends GrpcClient { private nextSubscriptionId = 0; private subscriptions: Map<number, grpc.ClientReadableStream<grpcTypes.DaemonEvent>> = new Map(); - private reconnectionTimeout?: NodeJS.Timeout; - constructor(private connectionObserver?: ConnectionObserver) { - this.client = new ManagementServiceClient( - DAEMON_RPC_PATH, - grpc.credentials.createInsecure(), - this.channelOptions(), - ); - } + public disconnect() { + for (const subscriptionId of this.subscriptions.keys()) { + this.removeSubscription(subscriptionId); + } - public get isConnected() { - return this.isConnectedValue; + super.disconnect(); } - public reopen(connectionObserver?: ConnectionObserver) { - if (this.isClosed) { - this.isClosed = false; - this.client = new ManagementServiceClient( - DAEMON_RPC_PATH, - grpc.credentials.createInsecure(), - this.channelOptions(), - ); - - this.connectionObserver = connectionObserver; + public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) { + const call = this.isConnected && this.client.eventsListen(new Empty()); + if (!call) { + throw noConnectionError; } - } - - public connect(): Promise<void> { - return new Promise((resolve, reject) => { - const usedClient = this.client; - this.client.waitForReady(this.deadlineFromNow(), (error) => { - if (this.client !== usedClient) { - reject(new Error('Stale connection attempt')); - return; - } + const subscriptionId = this.subscriptionId(); + listener.subscriptionId = subscriptionId; + this.subscriptions.set(subscriptionId, call); - if (error) { - this.onClose(error); - this.ensureConnectivity(); - reject(error); - } else { - this.reconnectionTimeout = undefined; - this.isConnectedValue = true; - this.connectionObserver?.onOpen(); - this.setChannelCallback(); - resolve(); - } - }); + call.on('data', (data: grpcTypes.DaemonEvent) => { + try { + const daemonEvent = convertFromDaemonEvent(data); + listener.onEvent(daemonEvent); + } catch (e) { + const error = e as Error; + listener.onError(error); + } }); - } - public disconnect() { - this.isConnectedValue = false; - - for (const subscriptionId of this.subscriptions.keys()) { + call.on('error', (error) => { + listener.onError(error); this.removeSubscription(subscriptionId); - } + }); + } - this.isClosed = true; - this.client.close(); - this.connectionObserver = undefined; - if (this.reconnectionTimeout) { - clearTimeout(this.reconnectionTimeout); + public unsubscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) { + const id = listener.subscriptionId; + if (id !== undefined) { + this.removeSubscription(id); } } @@ -440,38 +375,6 @@ export class DaemonRpc { return convertFromSettings(response)!; } - public subscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) { - const call = this.isConnected && this.client.eventsListen(new Empty()); - if (!call) { - throw noConnectionError; - } - const subscriptionId = this.subscriptionId(); - listener.subscriptionId = subscriptionId; - this.subscriptions.set(subscriptionId, call); - - call.on('data', (data: grpcTypes.DaemonEvent) => { - try { - const daemonEvent = convertFromDaemonEvent(data); - listener.onEvent(daemonEvent); - } catch (e) { - const error = e as Error; - listener.onError(error); - } - }); - - call.on('error', (error) => { - listener.onError(error); - this.removeSubscription(subscriptionId); - }); - } - - public unsubscribeDaemonEventListener(listener: SubscriptionListener<DaemonEvent>) { - const id = listener.subscriptionId; - if (id !== undefined) { - this.removeSubscription(id); - } - } - public async getAccountHistory(): Promise<AccountToken | undefined> { const response = await this.callEmpty<grpcTypes.AccountHistory>(this.client.getAccountHistory); return response.getToken()?.getValue(); @@ -679,66 +582,6 @@ export class DaemonRpc { return current; } - private deadlineFromNow() { - return Date.now() + NETWORK_CALL_TIMEOUT; - } - - private channelStateTimeout(): number { - return Date.now() + CHANNEL_STATE_TIMEOUT; - } - - private callEmpty<R = Empty>(fn: CallFunctionArgument<Empty, R>): Promise<R> { - return this.call<Empty, R>(fn, new Empty()); - } - - private callString<R = Empty>( - fn: CallFunctionArgument<StringValue, R>, - value?: string, - ): Promise<R> { - const googleString = new StringValue(); - - if (value !== undefined) { - googleString.setValue(value); - } - - return this.call<StringValue, R>(fn, googleString); - } - - private callBool<R>(fn: CallFunctionArgument<BoolValue, R>, value?: boolean): Promise<R> { - const googleBool = new BoolValue(); - - if (value !== undefined) { - googleBool.setValue(value); - } - - return this.call<BoolValue, R>(fn, googleBool); - } - - private callNumber<R>(fn: CallFunctionArgument<UInt32Value, R>, value?: number): Promise<R> { - const googleNumber = new UInt32Value(); - - if (value !== undefined) { - googleNumber.setValue(value); - } - - return this.call<UInt32Value, R>(fn, googleNumber); - } - - private call<T, R>(fn: CallFunctionArgument<T, R>, arg: T): Promise<R> { - if (fn && this.isConnected) { - return promisify<T, R>(fn.bind(this.client))(arg); - } else { - throw noConnectionError; - } - } - - private onClose(error?: Error) { - const wasConnected = this.isConnectedValue; - this.isConnectedValue = false; - - this.connectionObserver?.onClose(wasConnected, error); - } - private removeSubscription(id: number) { const subscription = this.subscriptions.get(id); if (subscription !== undefined) { @@ -758,81 +601,4 @@ export class DaemonRpc { setImmediate(() => subscription.cancel()); } } - - private channelOptions(): grpc.ClientOptions { - return { - 'grpc.max_reconnect_backoff_ms': 3000, - 'grpc.initial_reconnect_backoff_ms': 3000, - 'grpc.keepalive_time_ms': Math.pow(2, 30), - 'grpc.keepalive_timeout_ms': Math.pow(2, 30), - 'grpc.client_idle_timeout_ms': Math.pow(2, 30), - }; - } - - private connectivityChangeCallback(timeoutErr?: Error) { - const channel = this.client.getChannel(); - const currentState = channel?.getConnectivityState(true); - log.verbose(`GRPC Channel connectivity state changed to ${currentState}`); - if (channel) { - if (timeoutErr) { - this.setChannelCallback(currentState); - return; - } - const wasConnected = this.isConnected; - if (this.channelDisconnected(currentState)) { - this.onClose(); - // Try and reconnect in case - void this.connect().catch((error) => { - log.error(`Failed to reconnect - ${error}`); - }); - this.setChannelCallback(currentState); - } else if (!wasConnected && currentState === grpc.connectivityState.READY) { - this.isConnectedValue = true; - this.connectionObserver?.onOpen(); - this.setChannelCallback(currentState); - } - } - } - - private channelDisconnected(state: grpc.connectivityState): boolean { - return ( - (state === grpc.connectivityState.SHUTDOWN || - state === grpc.connectivityState.TRANSIENT_FAILURE || - state === grpc.connectivityState.IDLE) && - this.isConnected - ); - } - - private setChannelCallback(currentState?: grpc.connectivityState) { - const channel = this.client.getChannel(); - if (currentState === undefined && channel) { - currentState = channel?.getConnectivityState(false); - } - if (currentState) { - channel.watchConnectivityState(currentState, this.channelStateTimeout(), (error) => - this.connectivityChangeCallback(error), - ); - } - } - - // Since grpc.Channel.watchConnectivityState() isn't always running as intended, whenever the - // client fails to connect at first, `ensureConnectivity()` should be called so that it tries to - // check the connectivity state and nudge the client into connecting. - // `grpc.Channel.getConnectivityState(true)` should make it attempt to connect. - private ensureConnectivity() { - if (this.reconnectionTimeout) { - clearTimeout(this.reconnectionTimeout); - } - this.reconnectionTimeout = setTimeout(() => { - const lastState = this.client.getChannel().getConnectivityState(true); - if (this.channelDisconnected(lastState)) { - this.onClose(); - } - if (!this.isConnected) { - void this.connect().catch((error) => { - log.error(`Failed to reconnect - ${error}`); - }); - } - }, 3000); - } } diff --git a/gui/src/main/grpc-client.ts b/gui/src/main/grpc-client.ts new file mode 100644 index 0000000000..3096522a06 --- /dev/null +++ b/gui/src/main/grpc-client.ts @@ -0,0 +1,246 @@ +import * as grpc from '@grpc/grpc-js'; +import { Empty } from 'google-protobuf/google/protobuf/empty_pb.js'; +import { + BoolValue, + StringValue, + UInt32Value, +} from 'google-protobuf/google/protobuf/wrappers_pb.js'; +import { promisify } from 'util'; + +import log from '../shared/logging'; +import { ManagementServiceClient } from './management_interface/management_interface_grpc_pb'; + +const DAEMON_RPC_PATH = + process.platform === 'win32' ? 'unix:////./pipe/Mullvad VPN' : 'unix:///var/run/mullvad-vpn'; + +const NETWORK_CALL_TIMEOUT = 10000; +const CHANNEL_STATE_TIMEOUT = 1000 * 60 * 60; + +type CallFunctionArgument<T, R> = + | ((arg: T, callback: (error: Error | null, result: R) => void) => void) + | undefined; + +export const noConnectionError = new Error('No connection established to daemon'); + +export class ConnectionObserver { + constructor( + private openHandler: () => void, + private closeHandler: (wasConnected: boolean, error?: Error) => void, + ) {} + + // Only meant to be called by DaemonRpc + // @internal + public onOpen = () => { + this.openHandler(); + }; + + // Only meant to be called by DaemonRpc + // @internal + public onClose = (wasConnected: boolean, error?: Error) => { + this.closeHandler(wasConnected, error); + }; +} + +export class GrpcClient { + protected client: ManagementServiceClient; + private isConnectedValue = false; + private isClosed = false; + private reconnectionTimeout?: NodeJS.Timeout; + + constructor(private connectionObserver?: ConnectionObserver) { + this.client = new ManagementServiceClient( + DAEMON_RPC_PATH, + grpc.credentials.createInsecure(), + this.channelOptions(), + ); + } + + public get isConnected() { + return this.isConnectedValue; + } + + public reopen(connectionObserver?: ConnectionObserver) { + if (this.isClosed) { + this.isClosed = false; + this.client = new ManagementServiceClient( + DAEMON_RPC_PATH, + grpc.credentials.createInsecure(), + this.channelOptions(), + ); + + this.connectionObserver = connectionObserver; + } + } + + public connect(): Promise<void> { + return new Promise((resolve, reject) => { + const usedClient = this.client; + this.client.waitForReady(this.deadlineFromNow(), (error) => { + if (this.client !== usedClient) { + reject(new Error('Stale connection attempt')); + return; + } + + if (error) { + this.onClose(error); + this.ensureConnectivity(); + reject(error); + } else { + this.reconnectionTimeout = undefined; + this.isConnectedValue = true; + this.connectionObserver?.onOpen(); + this.setChannelCallback(); + resolve(); + } + }); + }); + } + + public disconnect() { + this.isConnectedValue = false; + + this.isClosed = true; + this.client.close(); + this.connectionObserver = undefined; + if (this.reconnectionTimeout) { + clearTimeout(this.reconnectionTimeout); + } + } + + protected callEmpty<R = Empty>(fn: CallFunctionArgument<Empty, R>): Promise<R> { + return this.call<Empty, R>(fn, new Empty()); + } + + protected callString<R = Empty>( + fn: CallFunctionArgument<StringValue, R>, + value?: string, + ): Promise<R> { + const googleString = new StringValue(); + + if (value !== undefined) { + googleString.setValue(value); + } + + return this.call<StringValue, R>(fn, googleString); + } + + protected callBool<R>(fn: CallFunctionArgument<BoolValue, R>, value?: boolean): Promise<R> { + const googleBool = new BoolValue(); + + if (value !== undefined) { + googleBool.setValue(value); + } + + return this.call<BoolValue, R>(fn, googleBool); + } + + protected callNumber<R>(fn: CallFunctionArgument<UInt32Value, R>, value?: number): Promise<R> { + const googleNumber = new UInt32Value(); + + if (value !== undefined) { + googleNumber.setValue(value); + } + + return this.call<UInt32Value, R>(fn, googleNumber); + } + + protected call<T, R>(fn: CallFunctionArgument<T, R>, arg: T): Promise<R> { + if (fn && this.isConnected) { + return promisify<T, R>(fn.bind(this.client))(arg); + } else { + throw noConnectionError; + } + } + + private deadlineFromNow() { + return Date.now() + NETWORK_CALL_TIMEOUT; + } + + private channelStateTimeout(): number { + return Date.now() + CHANNEL_STATE_TIMEOUT; + } + + private onClose(error?: Error) { + const wasConnected = this.isConnectedValue; + this.isConnectedValue = false; + + this.connectionObserver?.onClose(wasConnected, error); + } + + private channelOptions(): grpc.ClientOptions { + return { + 'grpc.max_reconnect_backoff_ms': 3000, + 'grpc.initial_reconnect_backoff_ms': 3000, + 'grpc.keepalive_time_ms': Math.pow(2, 30), + 'grpc.keepalive_timeout_ms': Math.pow(2, 30), + 'grpc.client_idle_timeout_ms': Math.pow(2, 30), + }; + } + + private connectivityChangeCallback(timeoutErr?: Error) { + const channel = this.client.getChannel(); + const currentState = channel?.getConnectivityState(true); + log.verbose(`GRPC Channel connectivity state changed to ${currentState}`); + if (channel) { + if (timeoutErr) { + this.setChannelCallback(currentState); + return; + } + const wasConnected = this.isConnected; + if (this.channelDisconnected(currentState)) { + this.onClose(); + // Try and reconnect in case + void this.connect().catch((error) => { + log.error(`Failed to reconnect - ${error}`); + }); + this.setChannelCallback(currentState); + } else if (!wasConnected && currentState === grpc.connectivityState.READY) { + this.isConnectedValue = true; + this.connectionObserver?.onOpen(); + this.setChannelCallback(currentState); + } + } + } + + private channelDisconnected(state: grpc.connectivityState): boolean { + return ( + (state === grpc.connectivityState.SHUTDOWN || + state === grpc.connectivityState.TRANSIENT_FAILURE || + state === grpc.connectivityState.IDLE) && + this.isConnected + ); + } + + private setChannelCallback(currentState?: grpc.connectivityState) { + const channel = this.client.getChannel(); + if (currentState === undefined && channel) { + currentState = channel?.getConnectivityState(false); + } + if (currentState) { + channel.watchConnectivityState(currentState, this.channelStateTimeout(), (error) => + this.connectivityChangeCallback(error), + ); + } + } + + // Since grpc.Channel.watchConnectivityState() isn't always running as intended, whenever the + // client fails to connect at first, `ensureConnectivity()` should be called so that it tries to + // check the connectivity state and nudge the client into connecting. + // `grpc.Channel.getConnectivityState(true)` should make it attempt to connect. + private ensureConnectivity() { + if (this.reconnectionTimeout) { + clearTimeout(this.reconnectionTimeout); + } + this.reconnectionTimeout = setTimeout(() => { + const lastState = this.client.getChannel().getConnectivityState(true); + if (this.channelDisconnected(lastState)) { + this.onClose(); + } + if (!this.isConnected) { + void this.connect().catch((error) => { + log.error(`Failed to reconnect - ${error}`); + }); + } + }, 3000); + } +} diff --git a/gui/src/main/index.ts b/gui/src/main/index.ts index 2401e70b69..3eccc403b1 100644 --- a/gui/src/main/index.ts +++ b/gui/src/main/index.ts @@ -36,8 +36,9 @@ import { printCommandLineOptions, printElectronOptions, } from './command-line-options'; -import { ConnectionObserver, DaemonRpc, SubscriptionListener } from './daemon-rpc'; +import { DaemonRpc, SubscriptionListener } from './daemon-rpc'; import Expectation from './expectation'; +import { ConnectionObserver } from './grpc-client'; import { IpcMainEventChannel } from './ipc-event-channel'; import { findIconPath } from './linux-desktop-entry'; import { loadTranslations } from './load-translations'; |
