From 5cfcce15031ce1e9ce391f4a4d39fbc8a1fbeac8 Mon Sep 17 00:00:00 2001 From: Sebastian Villena <97059974+ruisebas@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:40:18 -0500 Subject: [PATCH] Using existing TaskQueue, refactoring it to support non-throwing closures. --- Amplify/Core/Support/TaskQueue.swift | 46 ++++++++++++--- .../AWSCognitoAuthPlugin.swift | 2 +- .../CredentialStoreOperationClient.swift | 2 +- .../Sync/RemoteSyncEngine.swift | 2 +- .../Session/PinpointSession.swift | 40 ++++++++----- .../Session/SessionClient.swift | 56 ++++--------------- .../SessionClientTests.swift | 41 -------------- .../CoreTests/AmplifyTaskQueueTests.swift | 2 +- 8 files changed, 79 insertions(+), 112 deletions(-) diff --git a/Amplify/Core/Support/TaskQueue.swift b/Amplify/Core/Support/TaskQueue.swift index f281c57e1c..630fad7458 100644 --- a/Amplify/Core/Support/TaskQueue.swift +++ b/Amplify/Core/Support/TaskQueue.swift @@ -8,34 +8,64 @@ import Foundation /// A helper for executing asynchronous work serially. -public actor TaskQueue { - private var previousTask: Task? - +public actor TaskQueue where Failure: Error { + private var previousTask: Task? + public init() {} +} +public extension TaskQueue where Failure == any Error { /// Serializes asynchronous requests made from an async context /// /// Given an invocation like /// ```swift - /// let tq = TaskQueue() + /// let tq = TaskQueue() /// let v1 = try await tq.sync { try await doAsync1() } /// let v2 = try await tq.sync { try await doAsync2() } /// let v3 = try await tq.sync { try await doAsync3() } /// ``` /// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`, /// which is performed before `doAsync3`. - public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success { - let currentTask: Task = Task { [previousTask] in + func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success { + let currentTask: Task = Task { [previousTask] in _ = await previousTask?.result return try await block() } previousTask = currentTask return try await currentTask.value } - - public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows { + + nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows { Task { try await sync(block: block) } } } + +public extension TaskQueue where Failure == Never { + /// Serializes asynchronous requests made from an async context + /// + /// Given an invocation like + /// ```swift + /// let tq = TaskQueue() + /// let v1 = await tq.sync { await doAsync1() } + /// let v2 = await tq.sync { await doAsync2() } + /// let v3 = await tq.sync { await doAsync3() } + /// ``` + /// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`, + /// which is performed before `doAsync3`. + func sync(block: @Sendable @escaping () async -> Success) async -> Success { + let currentTask: Task = Task { [previousTask] in + _ = await previousTask?.result + return await block() + } + previousTask = currentTask + return await currentTask.value + } + + nonisolated func async(block: @Sendable @escaping () async -> Success) { + Task { + await sync(block: block) + } + } +} diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/AWSCognitoAuthPlugin.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/AWSCognitoAuthPlugin.swift index f32209ba9c..b0f8c94f2b 100644 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/AWSCognitoAuthPlugin.swift +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/AWSCognitoAuthPlugin.swift @@ -28,7 +28,7 @@ public final class AWSCognitoAuthPlugin: AWSCognitoAuthPluginBehavior { var analyticsHandler: UserPoolAnalyticsBehavior! - var taskQueue: TaskQueue! + var taskQueue: TaskQueue! var httpClientEngineProxy: HttpClientEngineProxy? diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/CredentialStorage/CredentialStoreOperationClient.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/CredentialStorage/CredentialStoreOperationClient.swift index d1a0bba2fd..8b3134a5e4 100644 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/CredentialStorage/CredentialStoreOperationClient.swift +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/CredentialStorage/CredentialStoreOperationClient.swift @@ -23,7 +23,7 @@ class CredentialStoreOperationClient: CredentialStoreStateBehavior { // Task queue is being used to manage CRUD operations to the credential store synchronously // This will help us keeping the CRUD methods atomic - private let taskQueue = TaskQueue() + private let taskQueue = TaskQueue() init(credentialStoreStateMachine: CredentialStoreStateMachine) { self.credentialStoreStateMachine = credentialStoreStateMachine diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift index 26bb453571..1b7d8e9d15 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift @@ -46,7 +46,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { } /// Synchronizes startup operations - let taskQueue = TaskQueue() + let taskQueue = TaskQueue() // Assigned at `setUpCloudSubscriptions` var reconciliationQueue: IncomingEventReconciliationQueue? diff --git a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/PinpointSession.swift b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/PinpointSession.swift index b351720d67..3a43fc8bce 100644 --- a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/PinpointSession.swift +++ b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/PinpointSession.swift @@ -11,14 +11,23 @@ import Foundation public struct PinpointSession: Codable { private enum State: Codable { case active - case paused - case stopped + case paused(date: Date) + case stopped(date: Date) } typealias SessionId = String let sessionId: SessionId let startTime: Date - private(set) var stopTime: Date? + var stopTime: Date? { + switch state { + case .active: + return nil + case .paused(let stopTime), + .stopped(let stopTime): + return stopTime + } + } + private var state: State = .active init(appId: String, @@ -26,7 +35,6 @@ public struct PinpointSession: Codable { sessionId = Self.generateSessionId(appId: appId, uniqueId: uniqueId) startTime = Date() - stopTime = nil } init(sessionId: SessionId, @@ -34,18 +42,25 @@ public struct PinpointSession: Codable { stopTime: Date?) { self.sessionId = sessionId self.startTime = startTime - self.stopTime = stopTime - if stopTime != nil { - state = .stopped + if let stopTime { + self.state = .stopped(date: stopTime) } } var isPaused: Bool { - return stopTime != nil && state == .paused + if case .paused = state { + return true + } + + return false } var isStopped: Bool { - return stopTime != nil && state == .stopped + if case .stopped = state { + return true + } + + return false } var duration: Date.Millisecond? { @@ -56,18 +71,15 @@ public struct PinpointSession: Codable { mutating func stop() { guard !isStopped else { return } - stopTime = stopTime ?? Date() - state = .stopped + state = .stopped(date: stopTime ?? Date()) } mutating func pause() { guard !isPaused else { return } - stopTime = Date() - state = .paused + state = .paused(date: Date()) } mutating func resume() { - stopTime = nil state = .active } diff --git a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/SessionClient.swift b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/SessionClient.swift index 761475fb8d..e022c79fb9 100644 --- a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/SessionClient.swift +++ b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Session/SessionClient.swift @@ -6,6 +6,7 @@ // import Amplify +import AWSPluginsCore import Foundation @_spi(InternalAWSPinpoint) @@ -14,7 +15,6 @@ public protocol SessionClientBehaviour: AnyObject { var analyticsClient: AnalyticsClientBehaviour? { get set } func startPinpointSession() - func validateOrRetrieveSession(_ session: PinpointSession?) -> PinpointSession func startTrackingSessions(backgroundTimeout: TimeInterval) } @@ -34,7 +34,7 @@ class SessionClient: SessionClientBehaviour { private let configuration: SessionClientConfiguration private let sessionClientQueue = DispatchQueue(label: Constants.queue, attributes: .concurrent) - private let analyticsTaskQueue = TaskQueue() + private let analyticsTaskQueue = TaskQueue() private let userDefaults: UserDefaultsBehaviour private var sessionBackgroundTimeout: TimeInterval = .zero @@ -82,7 +82,7 @@ class SessionClient: SessionClientBehaviour { sessionBackgroundTimeout = backgroundTimeout activityTracker.backgroundTrackingTimeout = backgroundTimeout activityTracker.beginActivityTracking { [weak self] newState in - guard let self = self else { return } + guard let self else { return } self.log.verbose("New state received: \(newState)") self.sessionClientQueue.sync(flags: .barrier) { self.respond(to: newState) @@ -90,20 +90,6 @@ class SessionClient: SessionClientBehaviour { } } - func validateOrRetrieveSession(_ session: PinpointSession?) -> PinpointSession { - if let session = session, !session.sessionId.isEmpty { - return session - } - - if let storedSession = Self.retrieveStoredSession(from: userDefaults, using: archiver) { - return storedSession - } - - return PinpointSession(sessionId: PinpointSession.Constants.defaultSessionId, - startTime: Date(), - stopTime: Date()) - } - private static func retrieveStoredSession(from userDefaults: UserDefaultsBehaviour, using archiver: AmplifyArchiverBehaviour) -> PinpointSession? { guard let sessionData = userDefaults.data(forKey: Constants.sessionKey), @@ -122,8 +108,8 @@ class SessionClient: SessionClientBehaviour { log.info("Session Started.") // Update Endpoint and record Session Start event - analyticsTaskQueue.task { [weak self] in - guard let self = self else { return } + analyticsTaskQueue.async { [weak self] in + guard let self else { return } try? await self.endpointClient.updateEndpointProfile() self.log.verbose("Firing Session Event: Start") await self.record(eventType: Constants.Events.start) @@ -144,8 +130,8 @@ class SessionClient: SessionClientBehaviour { session.pause() saveSession() log.info("Session Paused.") - analyticsTaskQueue.task { [weak self] in - guard let self = self else { return } + analyticsTaskQueue.async { [weak self] in + guard let self else { return } self.log.verbose("Firing Session Event: Pause") await self.record(eventType: Constants.Events.pause) } @@ -174,8 +160,8 @@ class SessionClient: SessionClientBehaviour { session.resume() saveSession() log.info("Session Resumed.") - analyticsTaskQueue.task { [weak self] in - guard let self = self else { return } + analyticsTaskQueue.async { [weak self] in + guard let self else { return } self.log.verbose("Firing Session Event: Resume") await self.record(eventType: Constants.Events.resume) } @@ -189,7 +175,7 @@ class SessionClient: SessionClientBehaviour { } session.stop() log.info("Session Stopped.") - analyticsTaskQueue.task { [weak self, session] in + analyticsTaskQueue.async { [weak self, session] in guard let self = self, let analyticsClient = self.analyticsClient else { return @@ -236,7 +222,7 @@ class SessionClient: SessionClientBehaviour { case .runningInBackground(let isStale): if isStale { endSession() - analyticsTaskQueue.task { [weak self] in + analyticsTaskQueue.async { [weak self] in _ = try? await self?.analyticsClient?.submitEvents() } } else { @@ -278,23 +264,3 @@ extension SessionClient { extension PinpointSession { static var none = PinpointSession(sessionId: "InvalidId", startTime: Date(), stopTime: nil) } - -/// This actor allows to queue async operations to only run one at a time. -private actor TaskQueue { - private var currentTask: Task? - - nonisolated func task(_ closure: @escaping () async -> ()) { - Task { - await addToQueue(closure) - } - } - - private func addToQueue(_ closure: @escaping () async -> ()) async { - let newTask = Task { [currentTask] in - await currentTask?.value - await closure() - } - currentTask = newTask - await newTask.value - } -} diff --git a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/SessionClientTests.swift b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/SessionClientTests.swift index b2aac76db3..9799d3a172 100644 --- a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/SessionClientTests.swift +++ b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/SessionClientTests.swift @@ -114,47 +114,6 @@ class SessionClientTests: XCTestCase { XCTAssertEqual(userDefaults.saveCount, 0) } - func testValidateSession_withValidSession_andStoredSession_shouldReturnValidSession() async { - storeSession() - await resetCounters() - let session = PinpointSession(sessionId: "valid", startTime: Date(), stopTime: nil) - let retrievedSession = client.validateOrRetrieveSession(session) - - XCTAssertEqual(userDefaults.dataForKeyCount, 0) - XCTAssertEqual(archiver.decodeCount, 0) - XCTAssertEqual(retrievedSession.sessionId, "valid") - } - - func testValidateSession_withInvalidSession_andStoredSession_shouldReturnStoredSession() async { - storeSession() - await resetCounters() - let session = PinpointSession(sessionId: "", startTime: Date(), stopTime: nil) - let retrievedSession = client.validateOrRetrieveSession(session) - - XCTAssertEqual(userDefaults.dataForKeyCount, 1) - XCTAssertEqual(archiver.decodeCount, 1) - XCTAssertEqual(retrievedSession.sessionId, "stored") - } - - func testValidateSession_withInvalidSession_andWithoutStoredSession_shouldCreateDefaultSession() async { - await resetCounters() - let session = PinpointSession(sessionId: "", startTime: Date(), stopTime: nil) - let retrievedSession = client.validateOrRetrieveSession(session) - - XCTAssertEqual(userDefaults.dataForKeyCount, 1) - XCTAssertEqual(archiver.decodeCount, 0) - XCTAssertEqual(retrievedSession.sessionId, PinpointSession.Constants.defaultSessionId) - } - - func testValidateSession_withNilSession_andWithoutStoredSession_shouldCreateDefaultSession() async { - await resetCounters() - let retrievedSession = client.validateOrRetrieveSession(nil) - - XCTAssertEqual(userDefaults.dataForKeyCount, 1) - XCTAssertEqual(archiver.decodeCount, 0) - XCTAssertEqual(retrievedSession.sessionId, PinpointSession.Constants.defaultSessionId) - } - func testStartPinpointSession_shouldRecordStartEvent() async { await resetCounters() let expectationStartSession = expectation(description: "Start event for new session") diff --git a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift index a3c92f8dcc..35f53da13d 100644 --- a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift +++ b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift @@ -47,7 +47,7 @@ final class AmplifyTaskQueueTests: XCTestCase { let expectation2 = expectation(description: "expectation2") let expectation3 = expectation(description: "expectation3") - let taskQueue = TaskQueue() + let taskQueue = TaskQueue() try await taskQueue.sync { try await Task.sleep(nanoseconds: 1) expectation1.fulfill()