diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2021-04-28 18:48:02 +0000 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2021-05-14 20:39:01 +0000 |
| commit | a486e2286ab6685e4e0e6d676587b629fa738404 (patch) | |
| tree | 1c9795dfcc7acf30ccc289f412d32ad6eecf5b7e | |
| parent | 052d6cfe31191da136fdbab748284cabe5923491 (diff) | |
| download | mullvadvpn-a486e2286ab6685e4e0e6d676587b629fa738404.tar.xz mullvadvpn-a486e2286ab6685e4e0e6d676587b629fa738404.zip | |
Create `DispatchingFlow` helper type
| -rw-r--r-- | android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt new file mode 100644 index 0000000000..af66a092ba --- /dev/null +++ b/android/src/main/kotlin/net/mullvad/mullvadvpn/util/DispatchingFlow.kt @@ -0,0 +1,49 @@ +package net.mullvad.mullvadvpn.util + +import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow + +class DispatchingFlow<T : Any>(private val upstream: Flow<T>) : Flow<T> { + private val subscribers = ConcurrentHashMap<KClass<out T>, SendChannel<T>>() + + fun <V : T> subscribe( + variant: KClass<V>, + capacity: Int = Channel.CONFLATED + ): Flow<V> { + val channel = Channel<V>(capacity) + + // This is safe because `collect` will only send to this channel if the instance class is V + @Suppress("UNCHECKED_CAST") + subscribers[variant] = channel as SendChannel<T> + + return channel.consumeAsFlow() + } + + fun <V : T> unsubscribe(variant: KClass<V>) = subscribers.remove(variant) + + @InternalCoroutinesApi + override suspend fun collect(collector: FlowCollector<T>) { + upstream.collect { event -> + try { + subscribers[event::class]?.send(event) + } catch (closedException: ClosedSendChannelException) { + subscribers.remove(event::class) + } + + collector.emit(event) + } + + subscribers.clear() + } +} + +fun <T : Any> Flow<T>.dispatchTo(configureSubscribers: DispatchingFlow<T>.() -> Unit) = + DispatchingFlow(this).also(configureSubscribers) |
