diff options
| author | Andrej Mihajlov <and@mullvad.net> | 2019-12-23 13:14:39 +0100 |
|---|---|---|
| committer | Andrej Mihajlov <and@mullvad.net> | 2020-01-03 11:40:30 +0100 |
| commit | 5db5a72c8269a7b5fdf3613d73f402df4aae9303 (patch) | |
| tree | 571b44aad5df096b79df801315959a56dff7f98e | |
| parent | 40665ef1b9bbb6b538ed4e9ff5f652dc745ab32c (diff) | |
| download | mullvadvpn-5db5a72c8269a7b5fdf3613d73f402df4aae9303.tar.xz mullvadvpn-5db5a72c8269a7b5fdf3613d73f402df4aae9303.zip | |
Rework MutallyExclusivePublisher to improve thread safety and Combine integration
| -rw-r--r-- | ios/MullvadVPN/MutuallyExclusive.swift | 108 |
1 files changed, 92 insertions, 16 deletions
diff --git a/ios/MullvadVPN/MutuallyExclusive.swift b/ios/MullvadVPN/MutuallyExclusive.swift index cb3803805a..c93cc5362e 100644 --- a/ios/MullvadVPN/MutuallyExclusive.swift +++ b/ios/MullvadVPN/MutuallyExclusive.swift @@ -13,13 +13,16 @@ extension Publishers { /// A publisher that blocks the given DispatchQueue until the produced publisher reported the /// completion. - final class MutuallyExclusive<PublisherType, Context>: Publisher where PublisherType: Publisher, Context: Scheduler { + final class MutuallyExclusive<PublisherType, Context>: Publisher + where + PublisherType: Publisher, + Context: Scheduler + { + typealias MakePublisherBlock = () -> PublisherType typealias Output = PublisherType.Output typealias Failure = PublisherType.Failure - typealias MakePublisherBlock = () -> PublisherType - private let exclusivityQueue: Context private let executionQueue: Context @@ -32,27 +35,100 @@ extension Publishers { } func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output { - exclusivityQueue.schedule { - let sema = DispatchSemaphore(value: 0) - let releaseLock = { - _ = sema.signal() - } + let subscription = MutuallyExclusive.Subscription( + subscriber: subscriber, + createPublisher: createPublisher, + exclusivityQueue: exclusivityQueue, + executionQueue: executionQueue) + + subscriber.receive(subscription: subscription) + } + } +} + +private extension Publishers.MutuallyExclusive { + + /// A subscription used by `MutuallyExclusive` publisher + final class Subscription<SubscriberType, PublisherType, Context>: Combine.Subscription + where + SubscriberType: Subscriber, PublisherType: Publisher, + PublisherType.Output == SubscriberType.Input, + PublisherType.Failure == SubscriberType.Failure, + Context: Scheduler + { + typealias MakePublisherBlock = () -> PublisherType + + private let subscriber: SubscriberType + private var innerSubscriber: AnyCancellable? + private let createPublisher: MakePublisherBlock + + private let exclusivityQueue: Context + private let executionQueue: Context + private let sema = DispatchSemaphore(value: 0) + + private let cancelLock = NSLock() + private var isCancelled = false + + init(subscriber: SubscriberType, + createPublisher: @escaping MakePublisherBlock, + exclusivityQueue: Context, + executionQueue: Context) + { + self.subscriber = subscriber + self.createPublisher = createPublisher + self.exclusivityQueue = exclusivityQueue + self.executionQueue = executionQueue + } + func request(_ demand: Subscribers.Demand) { + self.exclusivityQueue.schedule { self.executionQueue.schedule { - self.createPublisher() - .handleEvents(receiveCompletion: { _ in - releaseLock() - }, receiveCancel: { - releaseLock() - }) - .subscribe(subscriber) + self.cancelLock.withCriticalBlock { + guard !self.isCancelled else { return } + + self.innerSubscriber = self.createPublisher() + .sink(receiveCompletion: { [weak self] (completion) in + guard let self = self else { return } + + self.subscriber.receive(completion: completion) + self.signalSemaphore() + }, receiveValue: { [weak self] (output) in + _ = self?.subscriber.receive(output) + }) + } } + self.sema.wait() + } + } + + func cancel() { + cancelLock.withCriticalBlock { + guard !isCancelled else { return } - sema.wait() + isCancelled = true + + innerSubscriber?.cancel() + innerSubscriber = nil + + signalSemaphore() } } + + private func signalSemaphore() { + _ = sema.signal() + } + } } +private extension NSLock { + func withCriticalBlock<T>(_ body: () -> T) -> T { + lock() + defer { unlock() } + + return body() + } +} + typealias MutuallyExclusive = Publishers.MutuallyExclusive |
