diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2020-05-08 15:19:09 +0200 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2020-05-08 15:34:45 +0200 |
| commit | 36b4a9d258ee2d02307b41c8f9def15e82abb883 (patch) | |
| tree | a69bbc8b3ddea9362a88251f0528cf94043ba9d9 /ios | |
| parent | f6308d6de1d4a012d1676a6701ce95ca3a4a829a (diff) | |
| download | mullvadvpn-36b4a9d258ee2d02307b41c8f9def15e82abb883.tar.xz mullvadvpn-36b4a9d258ee2d02307b41c8f9def15e82abb883.zip | |
Rework cancellable delay
Diffstat (limited to 'ios')
| -rw-r--r-- | ios/MullvadVPN/CancellableDelayPublisher.swift | 138 |
1 files changed, 13 insertions, 125 deletions
diff --git a/ios/MullvadVPN/CancellableDelayPublisher.swift b/ios/MullvadVPN/CancellableDelayPublisher.swift index cc2ce9d62f..c177bd4251 100644 --- a/ios/MullvadVPN/CancellableDelayPublisher.swift +++ b/ios/MullvadVPN/CancellableDelayPublisher.swift @@ -9,136 +9,24 @@ import Foundation import Combine -extension Publishers { - - class CancellableDelay<Upstream, Context>: Publisher where Upstream: Publisher, Context: Scheduler - { - typealias Output = Upstream.Output - typealias Failure = Upstream.Failure - - private let scheduler: Context - private let delay: Context.SchedulerTimeType.Stride - - private let upstream: Upstream - - init(upstream: Upstream, scheduler: Context, delay: Context.SchedulerTimeType.Stride) { - self.scheduler = scheduler - self.delay = delay - self.upstream = upstream - } - - func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { - let subscription = Subscription( - upstream: upstream, - downstream: subscriber, - scheduler: scheduler, - delay: delay) - - subscriber.receive(subscription: subscription) - } - } - -} - -private extension Publishers.CancellableDelay { +extension Publisher { - class Subscription<Upstream, Downstream, Context>: Combine.Subscription, Subscriber - where - Upstream: Publisher, - Downstream: Subscriber, - Context: Scheduler, - Upstream.Output == Downstream.Input, - Upstream.Failure == Downstream.Failure + /// This is a `delay` operator implementation that respects cancellation + func cancellableDelay<S>(for delay: S.SchedulerTimeType.Stride, scheduler: S) + -> Publishers.FlatMap<PassthroughSubject<Self.Output, Self.Failure>, Self> + where S: Scheduler { - typealias Input = Downstream.Input - typealias Failure = Downstream.Failure - - private let upstream: Upstream - private let downstream: Downstream - - private let cancelLock = NSRecursiveLock() - private let scheduler: Context - private let delay: Context.SchedulerTimeType.Stride - private var demand: Subscribers.Demand = .unlimited - private var isCancelled = false - private var innerSubscription: Combine.Subscription? - - init(upstream: Upstream, downstream: Downstream, scheduler: Context, delay: Context.SchedulerTimeType.Stride) { - self.upstream = upstream - self.downstream = downstream - self.scheduler = scheduler - self.delay = delay - } - - func request(_ demand: Subscribers.Demand) { - cancelLock.withCriticalBlock { - guard !self.isCancelled else { return } - - self.demand = demand - self.upstream.subscribe(self) - } - } - - func receive(_ input: Input) -> Subscribers.Demand { - return self.cancelLock.withCriticalBlock { () -> Subscribers.Demand in - delay { [weak self] in - _ = self?.downstream.receive(input) - } - - // Expects the demand to decrease linearly - self.demand -= 1 + return self.flatMap { (value) -> PassthroughSubject<Output, Failure> in + let subject = PassthroughSubject<Output, Failure>() + let date = scheduler.now.advanced(by: delay) - return self.demand + // `PassthroughSubject` does not emit values, nor completion after cancellation + scheduler.schedule(after: date) { + subject.send(value) + subject.send(completion: .finished) } - } - - func receive(completion: Subscribers.Completion<Failure>) { - delay { [weak self] in - self?.downstream.receive(completion: completion) - } - } - func receive(subscription: Combine.Subscription) { - self.cancelLock.withCriticalBlock { - guard !self.isCancelled else { return } - - subscription.request(self.demand) - - self.innerSubscription = subscription - } + return subject } - - func cancel() { - cancelLock.withCriticalBlock { - isCancelled = true - - innerSubscription?.cancel() - } - } - - private func delay(_ action: @escaping () -> Void) { - let date = self.scheduler.now.advanced(by: self.delay) - - self.scheduler.schedule(after: date) { [weak self] in - guard let self = self else { return } - - self.cancelLock.withCriticalBlock { - if !self.isCancelled { - action() - } - } - } - } - - } -} - -extension Publisher { - - func cancellableDelay<S>(for delay: S.SchedulerTimeType.Stride, scheduler: S) - -> Publishers.CancellableDelay<Self, S> where S: Scheduler - { - return Publishers.CancellableDelay(upstream: self, scheduler: scheduler, delay: delay) } - } |
