Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(DataStore): serialize IncomingAsyncSubscriptionEventPublisher events #3489

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
private let awsAuthService: AWSAuthServiceBehavior

private let consistencyQueue: DispatchQueue

private let taskQueue: TaskQueue<Void>
private let modelName: ModelName

init(modelSchema: ModelSchema,
Expand All @@ -58,6 +58,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
self.consistencyQueue = DispatchQueue(
label: "com.amazonaws.Amplify.RemoteSyncEngine.\(modelSchema.name)"
)
self.taskQueue = TaskQueue()
self.modelName = modelSchema.name

self.connectionStatusQueue = OperationQueue()
Expand Down Expand Up @@ -170,26 +171,26 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {

func sendConnectionEventIfConnected(event: Event) {
if combinedConnectionStatusIsConnected {
incomingSubscriptionEvents.send(event)
send(event)
}
}

func genericValueListenerHandler(event: Event, cancelAwareBlock: CancelAwareBlockOperation) {
if case .connection = event {
connectionStatusQueue.addOperation(cancelAwareBlock)
} else {
incomingSubscriptionEvents.send(event)
send(event)
}
}

func genericCompletionListenerHandler(result: Result<Void, APIError>) {
switch result {
case .success:
incomingSubscriptionEvents.send(completion: .finished)
send(completion: .finished)
case .failure(let apiError):
log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)")
let dataStoreError = DataStoreError(error: apiError)
incomingSubscriptionEvents.send(completion: .failure(dataStoreError))
send(completion: .failure(dataStoreError))
}
}

Expand Down Expand Up @@ -237,6 +238,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
incomingSubscriptionEvents.subscribe(subscriber)
}

func send(_ event: Event) {
taskQueue.async { [weak self] in
guard let self else { return }
self.incomingSubscriptionEvents.send(event)
}
}

func send(completion: Subscribers.Completion<DataStoreError>) {
taskQueue.async { [weak self] in
guard let self else { return }
self.incomingSubscriptionEvents.send(completion: completion)
}
}

func cancel() {
consistencyQueue.sync {
genericCompletionListenerHandler(result: .successfulVoid)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import XCTest
@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSPluginsCore
@testable import AWSDataStorePlugin

final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
var apiPlugin: MockAPICategoryPlugin!
override func setUp() {
apiPlugin = MockAPICategoryPlugin()
}

/// This test was written to to reproduce a bug where the subscribe would miss events emitted by the publisher.
/// The pattern in this test using the publisher (`IncomingAsyncSubscriptionEventPublisher`) and subscriber
/// (`IncomingAsyncSubscriptionEventToAnyModelMapper`) are identical to the usage in `AWSModelReconciliationQueue.init()`.
///
/// See the changes in this PR: https://github.com/aws-amplify/amplify-swift/pull/3489
///
/// Before the PR changes, the publisher would emit events concurrently which caused some of them to be missed
/// by the subscriber even though the subscriber applied back pressure to process one event at a time (demand
/// of `max(1)`). For more details regarding back-pressure, see
/// https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers
///
/// The change, to publish the events though the same TaskQueue ensures that the events are properly buffered
/// and sent only when the subscriber demands for it.
func testSubscriberRecievedEvents() async throws {
let expectedEvents = expectation(description: "Expected number of ")
let numberOfEvents = 50
expectedEvents.expectedFulfillmentCount = numberOfEvents
let asyncEvents = await IncomingAsyncSubscriptionEventPublisher(
modelSchema: Post.schema,
api: apiPlugin,
modelPredicate: nil,
auth: nil,
authModeStrategy: AWSDefaultAuthModeStrategy(),
awsAuthService: nil)
let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
asyncEvents.subscribe(subscriber: mapper)
let sink = mapper
.publisher
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
expectedEvents.fulfill()
}
)
DispatchQueue.concurrentPerform(iterations: numberOfEvents) { index in
asyncEvents.send(.connection(.connected))
}

await fulfillment(of: [expectedEvents], timeout: 2)
sink.cancel()
}
}
Loading