summaryrefslogtreecommitdiffhomepage
path: root/android
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2021-04-28 18:48:02 +0000
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2021-05-14 20:39:01 +0000
commita486e2286ab6685e4e0e6d676587b629fa738404 (patch)
tree1c9795dfcc7acf30ccc289f412d32ad6eecf5b7e /android
parent052d6cfe31191da136fdbab748284cabe5923491 (diff)
downloadmullvadvpn-a486e2286ab6685e4e0e6d676587b629fa738404.tar.xz
mullvadvpn-a486e2286ab6685e4e0e6d676587b629fa738404.zip
Create `DispatchingFlow` helper type
Diffstat (limited to 'android')
-rw-r--r--android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt49
1 files changed, 49 insertions, 0 deletions
diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt
new file mode 100644
index 0000000000..af66a092ba
--- /dev/null
+++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt
@@ -0,0 +1,49 @@
+package net.mullvad.mullvadvpn.util
+
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.reflect.KClass
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ClosedSendChannelException
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.FlowCollector
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.consumeAsFlow
+
+class DispatchingFlow<T : Any>(private val upstream: Flow<T>) : Flow<T> {
+ private val subscribers = ConcurrentHashMap<KClass<out T>, SendChannel<T>>()
+
+ fun <V : T> subscribe(
+ variant: KClass<V>,
+ capacity: Int = Channel.CONFLATED
+ ): Flow<V> {
+ val channel = Channel<V>(capacity)
+
+ // This is safe because `collect` will only send to this channel if the instance class is V
+ @Suppress("UNCHECKED_CAST")
+ subscribers[variant] = channel as SendChannel<T>
+
+ return channel.consumeAsFlow()
+ }
+
+ fun <V : T> unsubscribe(variant: KClass<V>) = subscribers.remove(variant)
+
+ @InternalCoroutinesApi
+ override suspend fun collect(collector: FlowCollector<T>) {
+ upstream.collect { event ->
+ try {
+ subscribers[event::class]?.send(event)
+ } catch (closedException: ClosedSendChannelException) {
+ subscribers.remove(event::class)
+ }
+
+ collector.emit(event)
+ }
+
+ subscribers.clear()
+ }
+}
+
+fun <T : Any> Flow<T>.dispatchTo(configureSubscribers: DispatchingFlow<T>.() -> Unit) =
+ DispatchingFlow(this).also(configureSubscribers)