diff options
Diffstat (limited to 'android/src')
4 files changed, 107 insertions, 10 deletions
diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt index 0e81b87276..a69715653b 100644 --- a/android/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt +++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/service/DaemonInstance.kt @@ -9,11 +9,12 @@ import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.channels.sendBlocking +import net.mullvad.mullvadvpn.util.Intermittent private const val API_IP_ADDRESS_FILE = "api-ip-address.txt" private const val RELAYS_FILE = "relays.json" -class DaemonInstance(val vpnService: MullvadVpnService, val listener: (MullvadDaemon?) -> Unit) { +class DaemonInstance(val vpnService: MullvadVpnService) { private enum class Command { START, STOP, @@ -21,11 +22,12 @@ class DaemonInstance(val vpnService: MullvadVpnService, val listener: (MullvadDa private val commandChannel = spawnActor() - private var daemon by observable<MullvadDaemon?>(null) { _, oldInstance, newInstance -> + private var daemon by observable<MullvadDaemon?>(null) { _, oldInstance, _ -> oldInstance?.onDestroy() - listener(newInstance) } + val intermittentDaemon = Intermittent<MullvadDaemon>() + fun start() { commandChannel.sendBlocking(Command.START) } @@ -36,6 +38,7 @@ class DaemonInstance(val vpnService: MullvadVpnService, val listener: (MullvadDa fun onDestroy() { commandChannel.close() + intermittentDaemon.onDestroy() } private fun spawnActor() = GlobalScope.actor<Command>(Dispatchers.Default, Channel.UNLIMITED) { @@ -91,12 +94,16 @@ class DaemonInstance(val vpnService: MullvadVpnService, val listener: (MullvadDa } } - private fun startDaemon() { - daemon = MullvadDaemon(vpnService).apply { + private suspend fun startDaemon() { + val newDaemon = MullvadDaemon(vpnService).apply { onDaemonStopped = { + intermittentDaemon.spawnUpdate(null) daemon = null } } + + daemon = newDaemon + intermittentDaemon.update(newDaemon) } private fun stopDaemon() { diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/service/MullvadVpnService.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/service/MullvadVpnService.kt index 4f1f291026..052f65ebd3 100644 --- a/android/src/main/kotlin/net/mullvad/mullvadvpn/service/MullvadVpnService.kt +++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/service/MullvadVpnService.kt @@ -104,11 +104,13 @@ class MullvadVpnService : TalpidVpnService() { notificationManager.acknowledgeStartForegroundService() - daemonInstance = DaemonInstance(this) { daemon -> - handleDaemonInstance(daemon) - } + daemonInstance = DaemonInstance(this).apply { + intermittentDaemon.registerListener(this@MullvadVpnService) { daemon -> + handleDaemonInstance(daemon) + } - daemonInstance.start() + start() + } } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/util/Intermittent.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/Intermittent.kt new file mode 100644 index 0000000000..864667700c --- /dev/null +++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/Intermittent.kt @@ -0,0 +1,88 @@ +package net.mullvad.mullvadvpn.util + +import kotlin.properties.Delegates.observable +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.sync.withPermit +import net.mullvad.talpid.util.EventNotifier + +// Wrapper to allow awaiting for intermittent values. +// +// Wraps a property that is changed from time to time and that can become unavailable (null). This +// behaves in a way similar to `CompletableDeferred`, but the value can be set and reset multiple +// times. +// +// Calling `await` will either provide the value if it's available, or suspend until it becomes +// available and then return it. +// +// Calling `update` will set the internal value after it guarantees that no other coroutine is +// currently reading the value (through a permit from the semaphore). After the value is set, it +// provides a permit to the semaphore so that suspended coroutines can use the new value. +// +// Extra initialization can be done on the intermittent value when it becomes available and before +// it is provided to the awaiting coroutines, through the use of listener callbacks. These are +// called after the value is updated but before it is made available to the coroutines. +class Intermittent<T> { + private val notifier = EventNotifier<T?>(null) + private val semaphore = Semaphore(1, 1) + private val writeLock = Mutex() + + private var updateJob: Job? = null + private var value by notifier.notifiable() + + // When the internal value is updated, listeners can be notified before the awaiting coroutines + // resume execution. This allows performing any extra initialization before the value is made + // available for usage. + fun registerListener(id: Any, listener: (T?) -> Unit) = notifier.subscribe(id, listener) + fun unregisterListener(id: Any) = notifier.unsubscribe(id) + + suspend fun await(): T { + return semaphore.withPermit { value!! } + } + + suspend fun update(newValue: T?) { + writeLock.withLock { + if (newValue != value) { + if (value != null) { + semaphore.acquire() + } + + // This will trigger the listeners to run before the awaiting coroutines resume + value = newValue + + if (newValue != null) { + semaphore.release() + } + } + } + } + + // Helper method that spawns a coroutine to update the value. + fun spawnUpdate(newValue: T?) { + synchronized(this@Intermittent) { + val previousUpdate = updateJob + + updateJob = GlobalScope.launch(Dispatchers.Default) { + previousUpdate?.join() + update(newValue) + } + } + } + + // Helper method that provides a simple way to change the wrapped value. + // + // The method returns a property delegate that will spawn a coroutine to update the wrapped + // value every time the property is written to. + fun source() = observable<T?>(null) { _, _, newValue -> + spawnUpdate(newValue) + } + + fun onDestroy() { + notifier.unsubscribeAll() + } +} diff --git a/android/src/main/kotlin/net/mullvad/talpid/util/EventNotifier.kt b/android/src/main/kotlin/net/mullvad/talpid/util/EventNotifier.kt index f0084e25d4..e0dc26f972 100644 --- a/android/src/main/kotlin/net/mullvad/talpid/util/EventNotifier.kt +++ b/android/src/main/kotlin/net/mullvad/talpid/util/EventNotifier.kt @@ -11,7 +11,7 @@ import kotlin.properties.Delegates.observable // If the ID object class (or any of its super-classes) overrides `hashCode` or `equals`, // unsubscribe might not work correctly. class EventNotifier<T>(private val initialValue: T) { - private val listeners = HashMap<Any, (T) -> Unit>() + private val listeners = LinkedHashMap<Any, (T) -> Unit>() var latestEvent = initialValue private set |
