diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift index c45ba79650..d5dae69b37 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift @@ -43,7 +43,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { private let awsAuthService: AWSAuthServiceBehavior private let consistencyQueue: DispatchQueue - + private let taskQueue: TaskQueue private let modelName: ModelName init(modelSchema: ModelSchema, @@ -58,6 +58,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { self.consistencyQueue = DispatchQueue( label: "com.amazonaws.Amplify.RemoteSyncEngine.\(modelSchema.name)" ) + self.taskQueue = TaskQueue() self.modelName = modelSchema.name self.connectionStatusQueue = OperationQueue() @@ -170,7 +171,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { func sendConnectionEventIfConnected(event: Event) { if combinedConnectionStatusIsConnected { - incomingSubscriptionEvents.send(event) + send(event) } } @@ -178,18 +179,18 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { if case .connection = event { connectionStatusQueue.addOperation(cancelAwareBlock) } else { - incomingSubscriptionEvents.send(event) + send(event) } } func genericCompletionListenerHandler(result: Result) { switch result { case .success: - incomingSubscriptionEvents.send(completion: .finished) + send(completion: .finished) case .failure(let apiError): log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)") let dataStoreError = DataStoreError(error: apiError) - incomingSubscriptionEvents.send(completion: .failure(dataStoreError)) + send(completion: .failure(dataStoreError)) } } @@ -237,6 +238,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { incomingSubscriptionEvents.subscribe(subscriber) } + func send(_ event: Event) { + taskQueue.async { [weak self] in + guard let self else { return } + self.incomingSubscriptionEvents.send(event) + } + } + + func send(completion: Subscribers.Completion) { + taskQueue.async { [weak self] in + guard let self else { return } + self.incomingSubscriptionEvents.send(completion: completion) + } + } + func cancel() { consistencyQueue.sync { genericCompletionListenerHandler(result: .successfulVoid) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift new file mode 100644 index 0000000000..48100ba687 --- /dev/null +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift @@ -0,0 +1,109 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import XCTest +@testable import Amplify +@testable import AmplifyTestCommon +@testable import AWSPluginsCore +@testable import AWSDataStorePlugin + +final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase { + var apiPlugin: MockAPICategoryPlugin! + override func setUp() { + apiPlugin = MockAPICategoryPlugin() + ModelRegistry.register(modelType: Post.self) + } + + /// This test was written to to reproduce a bug where the subscribe would miss events emitted by the publisher. + /// The pattern in this test using the publisher (`IncomingAsyncSubscriptionEventPublisher`) and subscriber + /// (`IncomingAsyncSubscriptionEventToAnyModelMapper`) are identical to the usage in `AWSModelReconciliationQueue.init()`. + /// + /// See the changes in this PR: https://github.com/aws-amplify/amplify-swift/pull/3489 + /// + /// Before the PR changes, the publisher would emit events concurrently which caused some of them to be missed + /// by the subscriber even though the subscriber applied back pressure to process one event at a time (demand + /// of `max(1)`). For more details regarding back-pressure, see + /// https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers + /// + /// The change, to publish the events though the same TaskQueue ensures that the events are properly buffered + /// and sent only when the subscriber demands for it. + func testSubscriberRecievedEvents() async throws { + let expectedEvents = expectation(description: "Expected number of ") + let numberOfEvents = 50 + expectedEvents.expectedFulfillmentCount = numberOfEvents + let asyncEvents = await IncomingAsyncSubscriptionEventPublisher( + modelSchema: Post.schema, + api: apiPlugin, + modelPredicate: nil, + auth: nil, + authModeStrategy: AWSDefaultAuthModeStrategy(), + awsAuthService: nil) + let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper() + asyncEvents.subscribe(subscriber: mapper) + let sink = mapper + .publisher + .sink( + receiveCompletion: { _ in }, + receiveValue: { _ in + expectedEvents.fulfill() + } + ) + DispatchQueue.concurrentPerform(iterations: numberOfEvents) { index in + asyncEvents.send(.connection(.connected)) + } + + await fulfillment(of: [expectedEvents], timeout: 2) + sink.cancel() + } + + /// Ensure that the publisher-subscriber with back pressure is receiving all the events in the order in which they were sent. + func testSubscriberRecievedEventsInOrder() async throws { + let expectedEvents = expectation(description: "Expected number of ") + let expectedOrder = AtomicValue<[String]>(initialValue: []) + let actualOrder = AtomicValue<[String]>(initialValue: []) + let numberOfEvents = 50 + expectedEvents.expectedFulfillmentCount = numberOfEvents + let asyncEvents = await IncomingAsyncSubscriptionEventPublisher( + modelSchema: Post.schema, + api: apiPlugin, + modelPredicate: nil, + auth: nil, + authModeStrategy: AWSDefaultAuthModeStrategy(), + awsAuthService: nil) + let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper() + asyncEvents.subscribe(subscriber: mapper) + let sink = mapper + .publisher + .sink( + receiveCompletion: { _ in }, + receiveValue: { event in + switch event { + case .payload(let mutationSync): + actualOrder.append(mutationSync.syncMetadata.modelId) + default: + break + } + expectedEvents.fulfill() + } + ) + + for index in 0..