summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrej Mihajlov <and@mullvad.net>2019-12-23 13:14:39 +0100
committerAndrej Mihajlov <and@mullvad.net>2020-01-03 11:40:30 +0100
commit5db5a72c8269a7b5fdf3613d73f402df4aae9303 (patch)
tree571b44aad5df096b79df801315959a56dff7f98e
parent40665ef1b9bbb6b538ed4e9ff5f652dc745ab32c (diff)
downloadmullvadvpn-5db5a72c8269a7b5fdf3613d73f402df4aae9303.tar.xz
mullvadvpn-5db5a72c8269a7b5fdf3613d73f402df4aae9303.zip
Rework MutallyExclusivePublisher to improve thread safety and Combine integration
-rw-r--r--ios/MullvadVPN/MutuallyExclusive.swift108
1 files changed, 92 insertions, 16 deletions
diff --git a/ios/MullvadVPN/MutuallyExclusive.swift b/ios/MullvadVPN/MutuallyExclusive.swift
index cb3803805a..c93cc5362e 100644
--- a/ios/MullvadVPN/MutuallyExclusive.swift
+++ b/ios/MullvadVPN/MutuallyExclusive.swift
@@ -13,13 +13,16 @@ extension Publishers {
/// A publisher that blocks the given DispatchQueue until the produced publisher reported the
/// completion.
- final class MutuallyExclusive<PublisherType, Context>: Publisher where PublisherType: Publisher, Context: Scheduler {
+ final class MutuallyExclusive<PublisherType, Context>: Publisher
+ where
+ PublisherType: Publisher,
+ Context: Scheduler
+ {
+ typealias MakePublisherBlock = () -> PublisherType
typealias Output = PublisherType.Output
typealias Failure = PublisherType.Failure
- typealias MakePublisherBlock = () -> PublisherType
-
private let exclusivityQueue: Context
private let executionQueue: Context
@@ -32,27 +35,100 @@ extension Publishers {
}
func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output {
- exclusivityQueue.schedule {
- let sema = DispatchSemaphore(value: 0)
- let releaseLock = {
- _ = sema.signal()
- }
+ let subscription = MutuallyExclusive.Subscription(
+ subscriber: subscriber,
+ createPublisher: createPublisher,
+ exclusivityQueue: exclusivityQueue,
+ executionQueue: executionQueue)
+
+ subscriber.receive(subscription: subscription)
+ }
+ }
+}
+
+private extension Publishers.MutuallyExclusive {
+
+ /// A subscription used by `MutuallyExclusive` publisher
+ final class Subscription<SubscriberType, PublisherType, Context>: Combine.Subscription
+ where
+ SubscriberType: Subscriber, PublisherType: Publisher,
+ PublisherType.Output == SubscriberType.Input,
+ PublisherType.Failure == SubscriberType.Failure,
+ Context: Scheduler
+ {
+ typealias MakePublisherBlock = () -> PublisherType
+
+ private let subscriber: SubscriberType
+ private var innerSubscriber: AnyCancellable?
+ private let createPublisher: MakePublisherBlock
+
+ private let exclusivityQueue: Context
+ private let executionQueue: Context
+ private let sema = DispatchSemaphore(value: 0)
+
+ private let cancelLock = NSLock()
+ private var isCancelled = false
+
+ init(subscriber: SubscriberType,
+ createPublisher: @escaping MakePublisherBlock,
+ exclusivityQueue: Context,
+ executionQueue: Context)
+ {
+ self.subscriber = subscriber
+ self.createPublisher = createPublisher
+ self.exclusivityQueue = exclusivityQueue
+ self.executionQueue = executionQueue
+ }
+ func request(_ demand: Subscribers.Demand) {
+ self.exclusivityQueue.schedule {
self.executionQueue.schedule {
- self.createPublisher()
- .handleEvents(receiveCompletion: { _ in
- releaseLock()
- }, receiveCancel: {
- releaseLock()
- })
- .subscribe(subscriber)
+ self.cancelLock.withCriticalBlock {
+ guard !self.isCancelled else { return }
+
+ self.innerSubscriber = self.createPublisher()
+ .sink(receiveCompletion: { [weak self] (completion) in
+ guard let self = self else { return }
+
+ self.subscriber.receive(completion: completion)
+ self.signalSemaphore()
+ }, receiveValue: { [weak self] (output) in
+ _ = self?.subscriber.receive(output)
+ })
+ }
}
+ self.sema.wait()
+ }
+ }
+
+ func cancel() {
+ cancelLock.withCriticalBlock {
+ guard !isCancelled else { return }
- sema.wait()
+ isCancelled = true
+
+ innerSubscriber?.cancel()
+ innerSubscriber = nil
+
+ signalSemaphore()
}
}
+
+ private func signalSemaphore() {
+ _ = sema.signal()
+ }
+
}
}
+private extension NSLock {
+ func withCriticalBlock<T>(_ body: () -> T) -> T {
+ lock()
+ defer { unlock() }
+
+ return body()
+ }
+}
+
typealias MutuallyExclusive = Publishers.MutuallyExclusive