diff options
Diffstat (limited to 'android/src')
| -rw-r--r-- | android/src/main/kotlin/net/mullvad/mullvadvpn/ipc/HandlerFlow.kt | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/ipc/HandlerFlow.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/ipc/HandlerFlow.kt new file mode 100644 index 0000000000..943c55eeff --- /dev/null +++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/ipc/HandlerFlow.kt @@ -0,0 +1,45 @@ +package net.mullvad.mullvadvpn.ipc + +import android.os.Handler +import android.os.Looper +import android.os.Message +import android.util.Log +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.channels.sendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onCompletion + +class HandlerFlow<T>( + looper: Looper, + private val extractor: (Message) -> T +) : Handler(looper), Flow<T> { + private val channel = Channel<T>(Channel.UNLIMITED) + private val flow = channel.consumeAsFlow().onCompletion { + removeCallbacksAndMessages(null) + } + + @InternalCoroutinesApi + override suspend fun collect(collector: FlowCollector<T>) = flow.collect(collector) + + override fun handleMessage(message: Message) { + val extractedData = extractor(message) + + try { + channel.sendBlocking(extractedData) + } catch (exception: Exception) { + when (exception) { + is ClosedSendChannelException, is CancellationException -> { + Log.w("mullvad", "Received a message after HandlerFlow was closed", exception) + removeCallbacksAndMessages(null) + } + else -> throw exception + } + } + } +} |
