diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/AccessHelper.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/AccessHelper.java index d0800b5ce4a..ebb061ebb96 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/AccessHelper.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/AccessHelper.java @@ -15,6 +15,7 @@ package com.google.firebase.firestore; import android.content.Context; +import androidx.core.util.Supplier; import com.google.firebase.FirebaseApp; import com.google.firebase.firestore.auth.CredentialsProvider; import com.google.firebase.firestore.auth.User; @@ -31,8 +32,8 @@ public static FirebaseFirestore newFirebaseFirestore( Context context, DatabaseId databaseId, String persistenceKey, - CredentialsProvider authProvider, - CredentialsProvider appCheckProvider, + Supplier> authProviderFactory, + Supplier> appCheckTokenProviderFactory, Function componentProviderFactory, FirebaseApp firebaseApp, FirebaseFirestore.InstanceRegistry instanceRegistry) { @@ -40,8 +41,8 @@ public static FirebaseFirestore newFirebaseFirestore( context, databaseId, persistenceKey, - authProvider, - appCheckProvider, + authProviderFactory, + appCheckTokenProviderFactory, componentProviderFactory, firebaseApp, instanceRegistry, diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreClientProviderTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreClientProviderTest.java new file mode 100644 index 00000000000..afac6732375 --- /dev/null +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreClientProviderTest.java @@ -0,0 +1,11 @@ +package com.google.firebase.firestore; + +import androidx.test.ext.junit.runners.AndroidJUnit4; +import org.junit.runner.RunWith; + +@RunWith(AndroidJUnit4.class) +public class FirestoreClientProviderTest { + + // TODO(requires backend/emulator support) + +} diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/remote/StreamTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/remote/StreamTest.java index 0e07233d1be..7c24fc3b01a 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/remote/StreamTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/remote/StreamTest.java @@ -38,12 +38,17 @@ import com.google.firebase.firestore.testutil.IntegrationTestUtil; import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.TimerId; +import com.google.firestore.v1.InitResponse; +import com.google.protobuf.ByteString; import io.grpc.Status; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; class MockCredentialsProvider extends EmptyCredentialsProvider { @@ -68,6 +73,9 @@ public List observedStates() { @RunWith(AndroidJUnit4.class) public class StreamTest { + + @Rule public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + /** Single mutation to send to the write stream. */ private static final List mutations = Collections.singletonList(setMutation("foo/bar", map())); @@ -96,7 +104,7 @@ public void onClose(Status status) { } @Override - public void onHandshakeComplete() { + public void onHandshake(InitResponse initResponse) { handshakeSemaphore.release(); } @@ -131,7 +139,7 @@ private void waitForWriteStreamOpen( AsyncQueue testQueue, WriteStream writeStream, StreamStatusCallback callback) { testQueue.enqueueAndForget(writeStream::start); waitFor(callback.openSemaphore); - testQueue.enqueueAndForget(writeStream::writeHandshake); + testQueue.enqueueAndForget(() -> writeStream.sendHandshake(ByteString.EMPTY)); waitFor(callback.handshakeSemaphore); } @@ -180,9 +188,9 @@ public void testWriteStreamStopAfterHandshake() throws Exception { StreamStatusCallback streamCallback = new StreamStatusCallback() { @Override - public void onHandshakeComplete() { + public void onHandshake(InitResponse initResponse) { assertThat(writeStreamWrapper[0].getLastStreamToken()).isNotEmpty(); - super.onHandshakeComplete(); + super.onHandshake(initResponse); } @Override @@ -202,7 +210,7 @@ public void onWriteResponse( () -> assertThrows(Throwable.class, () -> writeStream.writeMutations(mutations))); // Handshake should always be called - testQueue.enqueueAndForget(writeStream::writeHandshake); + testQueue.enqueueAndForget(() -> writeStream.sendHandshake(ByteString.EMPTY)); waitFor(streamCallback.handshakeSemaphore); // Now writes should succeed diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java index a7417d96563..76a5a10fe99 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java @@ -47,7 +47,6 @@ import com.google.firebase.firestore.core.DatabaseInfo; import com.google.firebase.firestore.model.DatabaseId; import com.google.firebase.firestore.testutil.provider.FirestoreProvider; -import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.Listener; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Logger.Level; @@ -311,15 +310,13 @@ public static FirebaseFirestore testFirestore( ensureStrictMode(); - AsyncQueue asyncQueue = new AsyncQueue(); - FirebaseFirestore firestore = AccessHelper.newFirebaseFirestore( context, databaseId, persistenceKey, - MockCredentialsProvider.instance(), - new EmptyAppCheckTokenProvider(), + MockCredentialsProvider::instance, + EmptyAppCheckTokenProvider::new, ComponentProvider::defaultFactory, /* firebaseApp= */ null, /* instanceRegistry= */ (dbId) -> {}); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java index 88617cb2052..8e9bb4ee844 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java @@ -20,10 +20,12 @@ import android.annotation.SuppressLint; import android.app.Activity; import android.content.Context; +import androidx.annotation.GuardedBy; import androidx.annotation.Keep; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.VisibleForTesting; +import androidx.core.util.Supplier; import com.google.android.gms.tasks.Task; import com.google.android.gms.tasks.TaskCompletionSource; import com.google.android.gms.tasks.Tasks; @@ -56,6 +58,7 @@ import com.google.firebase.firestore.util.Logger.Level; import com.google.firebase.firestore.util.Preconditions; import com.google.firebase.inject.Deferred; +import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.ByteBuffer; @@ -76,6 +79,10 @@ public class FirebaseFirestore { private final Function componentProviderFactory; + private volatile ByteString sessionToken; + + @GuardedBy("clientProvider") + private boolean networkEnabled; /** * Provides a registry management interface for {@code FirebaseFirestore} instances. @@ -93,8 +100,8 @@ public interface InstanceRegistry { // databaseId itself that needs locking; it just saves us creating a separate lock object. private final DatabaseId databaseId; private final String persistenceKey; - private final CredentialsProvider authProvider; - private final CredentialsProvider appCheckProvider; + private final Supplier> authProviderFactory; + private final Supplier> appCheckTokenProviderFactory; private final FirebaseApp firebaseApp; private final UserDataReader userDataReader; // When user requests to terminate, use this to notify `FirestoreMultiDbComponent` to deregister @@ -105,6 +112,8 @@ public interface InstanceRegistry { final FirestoreClientProvider clientProvider; private final GrpcMetadataProvider metadataProvider; + @VisibleForTesting Function> clearPersistenceMethod; + @Nullable private PersistentCacheIndexManager persistentCacheIndexManager; @NonNull @@ -193,46 +202,44 @@ static FirebaseFirestore newInstance( } DatabaseId databaseId = DatabaseId.forDatabase(projectId, database); - CredentialsProvider authProvider = - new FirebaseAuthCredentialsProvider(deferredAuthProvider); - CredentialsProvider appCheckProvider = - new FirebaseAppCheckTokenProvider(deferredAppCheckTokenProvider); - // Firestore uses a different database for each app name. Note that we don't use // app.getPersistenceKey() here because it includes the application ID which is related // to the project ID. We already include the project ID when resolving the database, // so there is no need to include it in the persistence key. String persistenceKey = app.getName(); - return new FirebaseFirestore( - context, - databaseId, - persistenceKey, - authProvider, - appCheckProvider, - ComponentProvider::defaultFactory, - app, - instanceRegistry, - metadataProvider); + FirebaseFirestore firestore = + new FirebaseFirestore( + context, + databaseId, + persistenceKey, + () -> new FirebaseAuthCredentialsProvider(deferredAuthProvider), + () -> new FirebaseAppCheckTokenProvider(deferredAppCheckTokenProvider), + ComponentProvider::defaultFactory, + app, + instanceRegistry, + metadataProvider); + return firestore; } @VisibleForTesting FirebaseFirestore( Context context, DatabaseId databaseId, - String persistenceKey, - CredentialsProvider authProvider, - CredentialsProvider appCheckProvider, + @NonNull String persistenceKey, + @NonNull Supplier> authProviderFactory, + @NonNull Supplier> appCheckTokenProviderFactory, @NonNull Function componentProviderFactory, @Nullable FirebaseApp firebaseApp, InstanceRegistry instanceRegistry, @Nullable GrpcMetadataProvider metadataProvider) { + this.networkEnabled = true; this.context = checkNotNull(context); this.databaseId = checkNotNull(checkNotNull(databaseId)); this.userDataReader = new UserDataReader(databaseId); this.persistenceKey = checkNotNull(persistenceKey); - this.authProvider = checkNotNull(authProvider); - this.appCheckProvider = checkNotNull(appCheckProvider); + this.authProviderFactory = checkNotNull(authProviderFactory); + this.appCheckTokenProviderFactory = checkNotNull(appCheckTokenProviderFactory); this.componentProviderFactory = checkNotNull(componentProviderFactory); this.clientProvider = new FirestoreClientProvider(this::newClient); // NOTE: We allow firebaseApp to be null in tests only. @@ -241,6 +248,21 @@ static FirebaseFirestore newInstance( this.metadataProvider = metadataProvider; this.settings = new FirebaseFirestoreSettings.Builder().build(); + + this.clearPersistenceMethod = + executor -> { + final TaskCompletionSource source = new TaskCompletionSource<>(); + executor.execute( + () -> { + try { + SQLitePersistence.clearPersistence(context, databaseId, persistenceKey); + source.setResult(null); + } catch (FirebaseFirestoreException e) { + source.setException(e); + } + }); + return source.getTask(); + }; } /** Returns the settings used by this {@code FirebaseFirestore} object. */ @@ -296,14 +318,36 @@ private FirestoreClient newClient(AsyncQueue asyncQueue) { DatabaseInfo databaseInfo = new DatabaseInfo(databaseId, persistenceKey, settings.getHost(), settings.isSslEnabled()); - return new FirestoreClient( - context, - databaseInfo, - authProvider, - appCheckProvider, - asyncQueue, - metadataProvider, - componentProviderFactory.apply(settings)); + FirestoreClient client = + new FirestoreClient( + context, + databaseInfo, + authProviderFactory.get(), + appCheckTokenProviderFactory.get(), + asyncQueue, + metadataProvider, + componentProviderFactory.apply(settings)); + + client.setClearPersistenceCallback( + sessionToken -> { + synchronized (clientProvider) { + if (client.isTerminated()) return; + this.sessionToken = sessionToken; + clearPersistence(); + } + }); + + // Session token must be set before we enable network, since it is part of stream handshake. + if (sessionToken != null) { + client.setSessionToken(sessionToken); + sessionToken = null; + } + + if (networkEnabled) { + client.enableNetwork(); + } + + return client; } } @@ -624,7 +668,14 @@ public Task waitForPendingWrites() { */ @NonNull public Task enableNetwork() { - return clientProvider.call(FirestoreClient::enableNetwork); + synchronized (clientProvider) { + networkEnabled = true; + if (clientProvider.isConfigured()) { + return clientProvider.call(FirestoreClient::enableNetwork); + } else { + return Tasks.forResult(null); + } + } } /** @@ -636,7 +687,14 @@ public Task enableNetwork() { */ @NonNull public Task disableNetwork() { - return clientProvider.call(FirestoreClient::disableNetwork); + synchronized (clientProvider) { + networkEnabled = false; + if (clientProvider.isConfigured()) { + return clientProvider.call(FirestoreClient::disableNetwork); + } else { + return Tasks.forResult(null); + } + } } /** Globally enables / disables Cloud Firestore logging for the SDK. */ @@ -669,7 +727,7 @@ public static void setLoggingEnabled(boolean loggingEnabled) { @NonNull public Task clearPersistence() { return clientProvider.executeIfShutdown( - this::clearPersistence, + clearPersistenceMethod, executor -> Tasks.forException( new FirebaseFirestoreException( @@ -677,22 +735,6 @@ public Task clearPersistence() { FirebaseFirestoreException.Code.FAILED_PRECONDITION))); } - @NonNull - private Task clearPersistence(Executor executor) { - final TaskCompletionSource source = new TaskCompletionSource<>(); - executor.execute( - () -> { - try { - SQLitePersistence.clearPersistence(context, databaseId, persistenceKey); - source.setResult(null); - } catch (FirebaseFirestoreException e) { - source.setException(e); - } - }); - return source.getTask(); - } - ; - /** * Attaches a listener for a snapshots-in-sync event. The snapshots-in-sync event indicates that * all listeners affected by a given change have fired, even if a single server-generated change diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirestoreClientProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirestoreClientProvider.java index bebd57e3160..4c3037eaad2 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirestoreClientProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirestoreClientProvider.java @@ -120,11 +120,23 @@ synchronized void procedure(Consumer call) { */ synchronized T executeIfShutdown( Function callIf, Function callElse) { - Executor executor = command -> asyncQueue.enqueueAndForgetEvenAfterShutdown(command); if (client == null || client.isTerminated()) { - return callIf.apply(executor); + return callIf.apply(asyncQueue.getExecutor()); } else { - return callElse.apply(executor); + return callElse.apply(asyncQueue.getExecutor()); + } + } + + synchronized T executeWhileShutdown(Function call) { + // This will block asyncQueue, prevent a new client from being started. + if (client == null || client.isTerminated()) { + return call.apply(asyncQueue.getExecutor()); + } else { + client.shutdown(); + asyncQueue = asyncQueue.reincarnate(); + T result = call.apply(asyncQueue.getExecutor()); + client = clientFactory.apply(asyncQueue); + return result; } } @@ -136,10 +148,10 @@ synchronized Task terminate() { // The client must be initialized to ensure that all subsequent API usage throws an exception. ensureConfigured(); - Task terminate = client.terminate(); + Task terminate = client.shutdown(); // Will cause the executor to de-reference all threads, the best we can do - asyncQueue.shutdown(); + asyncQueue.terminate(); return terminate; } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/ComponentProvider.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/ComponentProvider.java index ea1654d605f..dd06eaba596 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/ComponentProvider.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/ComponentProvider.java @@ -163,7 +163,6 @@ public void initialize(Configuration configuration) { syncEngine = createSyncEngine(configuration); eventManager = createEventManager(configuration); localStore.start(); - remoteStore.start(); garbageCollectionScheduler = createGarbageCollectionScheduler(configuration); indexBackfiller = createIndexBackfiller(configuration); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java index 7409af54bd9..4ee4ef5c806 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java @@ -47,9 +47,11 @@ import com.google.firebase.firestore.remote.RemoteSerializer; import com.google.firebase.firestore.remote.RemoteStore; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.Consumer; import com.google.firebase.firestore.util.Function; import com.google.firebase.firestore.util.Logger; import com.google.firestore.v1.Value; +import com.google.protobuf.ByteString; import java.io.InputStream; import java.util.List; import java.util.Map; @@ -75,7 +77,6 @@ public final class FirestoreClient { private RemoteStore remoteStore; private SyncEngine syncEngine; private EventManager eventManager; - // LRU-related @Nullable private Scheduler indexBackfillScheduler; @Nullable private Scheduler gcScheduler; @@ -95,6 +96,8 @@ public FirestoreClient( this.bundleSerializer = new BundleSerializer(new RemoteSerializer(databaseInfo.getDatabaseId())); + asyncQueue.setOnShutdown(this::onAsyncQueueShutdown); + TaskCompletionSource firstUser = new TaskCompletionSource<>(); final AtomicBoolean initialized = new AtomicBoolean(false); @@ -134,6 +137,23 @@ public FirestoreClient( }); } + public void setClearPersistenceCallback(Consumer clearPersistenceCallback) { + this.verifyNotTerminated(); + asyncQueue.enqueueAndForget( + () -> remoteStore.setClearPersistenceCallback(clearPersistenceCallback)); + } + + private void onAsyncQueueShutdown() { + remoteStore.shutdown(); + persistence.shutdown(); + if (gcScheduler != null) { + gcScheduler.stop(); + } + if (indexBackfillScheduler != null) { + indexBackfillScheduler.stop(); + } + } + public Task disableNetwork() { this.verifyNotTerminated(); return asyncQueue.enqueue(() -> remoteStore.disableNetwork()); @@ -145,21 +165,11 @@ public Task enableNetwork() { } /** Terminates this client, cancels all writes / listeners, and releases all resources. */ - public Task terminate() { + public Task shutdown() { authProvider.removeChangeListener(); appCheckProvider.removeChangeListener(); - asyncQueue.enqueueAndForgetEvenAfterShutdown(() -> eventManager.abortAllTargets()); - return asyncQueue.enqueueAndInitiateShutdown( - () -> { - remoteStore.shutdown(); - persistence.shutdown(); - if (gcScheduler != null) { - gcScheduler.stop(); - } - if (indexBackfillScheduler != null) { - indexBackfillScheduler.stop(); - } - }); + asyncQueue.enqueueAndForget(() -> eventManager.abortAllTargets()); + return asyncQueue.shutdown(); } /** Returns true if this client has been terminated. */ @@ -226,13 +236,25 @@ public Task write(final List mutations) { return source.getTask(); } - /** Tries to execute the transaction in updateFunction. */ + /** + * Takes an updateFunction in which a set of reads and writes can be performed atomically. In the + * updateFunction, the client can read and write values using the supplied transaction object. + * After the updateFunction, all changes will be committed. If a retryable error occurs (ex: some + * other client has changed any of the data referenced), then the updateFunction will be called + * again after a backoff. If the updateFunction still fails after all retries, then the + * transaction will be rejected. + * + *

The transaction object passed to the updateFunction contains methods for accessing documents + * and collections. Unlike other datastore access, data accessed with the transaction will not + * reflect local changes that have not been committed. For this reason, it is required that all + * reads are performed before any writes. Transactions must be performed while online. + * + *

The Task returned is resolved when the transaction is fully committed. + */ public Task transaction( TransactionOptions options, Function> updateFunction) { this.verifyNotTerminated(); - return AsyncQueue.callTask( - asyncQueue.getExecutor(), - () -> syncEngine.transaction(asyncQueue, options, updateFunction)); + return new TransactionRunner<>(asyncQueue, remoteStore, options, updateFunction).run(); } // TODO(b/261013682): Use an explicit executor in continuations. @@ -243,7 +265,7 @@ public Task> runAggregateQuery( final TaskCompletionSource> result = new TaskCompletionSource<>(); asyncQueue.enqueueAndForget( () -> - syncEngine + remoteStore .runAggregateQuery(query, aggregateFields) .addOnSuccessListener(data -> result.setResult(data)) .addOnFailureListener(e -> result.setException(e))); @@ -362,4 +384,14 @@ private void verifyNotTerminated() { throw new IllegalStateException("The client has already been terminated"); } } + + public void setSessionToken(ByteString sessionToken) { + verifyNotTerminated(); + asyncQueue.enqueueAndForget(() -> localStore.setSessionsToken(sessionToken)); + } + + public ByteString getSessionToken() { + verifyNotTerminated(); + return localStore.getSessionToken(); + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java index e9bb7bafea6..9c51418fae7 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java @@ -19,15 +19,12 @@ import androidx.annotation.Nullable; import androidx.annotation.VisibleForTesting; -import com.google.android.gms.tasks.Task; import com.google.android.gms.tasks.TaskCompletionSource; import com.google.firebase.database.collection.ImmutableSortedMap; import com.google.firebase.database.collection.ImmutableSortedSet; -import com.google.firebase.firestore.AggregateField; import com.google.firebase.firestore.FirebaseFirestoreException; import com.google.firebase.firestore.LoadBundleTask; import com.google.firebase.firestore.LoadBundleTaskProgress; -import com.google.firebase.firestore.TransactionOptions; import com.google.firebase.firestore.auth.User; import com.google.firebase.firestore.bundle.BundleElement; import com.google.firebase.firestore.bundle.BundleLoader; @@ -51,11 +48,8 @@ import com.google.firebase.firestore.remote.RemoteEvent; import com.google.firebase.firestore.remote.RemoteStore; import com.google.firebase.firestore.remote.TargetChange; -import com.google.firebase.firestore.util.AsyncQueue; -import com.google.firebase.firestore.util.Function; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Util; -import com.google.firestore.v1.Value; import com.google.protobuf.ByteString; import io.grpc.Status; import java.io.IOException; @@ -337,33 +331,6 @@ private void addUserCallback(int batchId, TaskCompletionSource userTask) { userTasks.put(batchId, userTask); } - /** - * Takes an updateFunction in which a set of reads and writes can be performed atomically. In the - * updateFunction, the client can read and write values using the supplied transaction object. - * After the updateFunction, all changes will be committed. If a retryable error occurs (ex: some - * other client has changed any of the data referenced), then the updateFunction will be called - * again after a backoff. If the updateFunction still fails after all retries, then the - * transaction will be rejected. - * - *

The transaction object passed to the updateFunction contains methods for accessing documents - * and collections. Unlike other datastore access, data accessed with the transaction will not - * reflect local changes that have not been committed. For this reason, it is required that all - * reads are performed before any writes. Transactions must be performed while online. - * - *

The Task returned is resolved when the transaction is fully committed. - */ - public Task transaction( - AsyncQueue asyncQueue, - TransactionOptions options, - Function> updateFunction) { - return new TransactionRunner(asyncQueue, remoteStore, options, updateFunction).run(); - } - - public Task> runAggregateQuery( - Query query, List aggregateFields) { - return remoteStore.runAggregateQuery(query, aggregateFields); - } - /** Called by FirestoreClient to notify us of a new remote event. */ @Override public void handleRemoteEvent(RemoteEvent event) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java index b9c89d3cb8b..f222f8f6236 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java @@ -242,6 +242,8 @@ T runTransaction(String action, Supplier operation) { public static void clearPersistence(Context context, DatabaseId databaseId, String persistenceKey) throws FirebaseFirestoreException { + // TODO Could we change this with SQLiteDatabase.deleteDatabase(). + String databaseName = SQLitePersistence.databaseName(persistenceKey, databaseId); String sqLitePath = context.getDatabasePath(databaseName).getPath(); String journalPath = sqLitePath + "-journal"; diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index d2a139d4b6f..9a775b3f55e 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -41,8 +41,10 @@ import com.google.firebase.firestore.remote.WatchChange.WatchTargetChange; import com.google.firebase.firestore.remote.WatchChange.WatchTargetChangeType; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.Consumer; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Util; +import com.google.firestore.v1.InitResponse; import com.google.firestore.v1.Value; import com.google.protobuf.ByteString; import io.grpc.Status; @@ -57,7 +59,7 @@ * RemoteStore handles all interaction with the backend through a simple, clean interface. This * class is not thread safe and should be only called from the worker AsyncQueue. */ -public final class RemoteStore implements WatchChangeAggregator.TargetMetadataProvider { +public class RemoteStore implements WatchChangeAggregator.TargetMetadataProvider { /** The maximum number of pending writes to allow. TODO: Negotiate this value with the backend. */ private static final int MAX_PENDING_WRITES = 10; @@ -65,6 +67,8 @@ public final class RemoteStore implements WatchChangeAggregator.TargetMetadataPr /** The log tag to use for this class. */ private static final String LOG_TAG = "RemoteStore"; + private Consumer clearPersistenceCallback; + /** The database ID of the Firestore instance. */ private final DatabaseId databaseId; @@ -178,9 +182,20 @@ public RemoteStore( watchStream = datastore.createWatchStream( new WatchStream.Callback() { + @Override + public void onHandshake(InitResponse initResponse) { + if (initResponse.getClearCache()) { + handleClearCache(initResponse.getSessionToken()); + } else { + handleWatchStreamHandshakeComplete(initResponse.getSessionToken()); + } + } + @Override public void onOpen() { - handleWatchStreamOpen(); + if (!writeStream.isHandshakeInProgress()) { + watchStream.sendHandshake(localStore.getSessionToken()); + } } @Override @@ -198,13 +213,19 @@ public void onClose(Status status) { datastore.createWriteStream( new WriteStream.Callback() { @Override - public void onOpen() { - writeStream.writeHandshake(); + public void onHandshake(InitResponse initResponse) { + if (initResponse.getClearCache()) { + handleClearCache(initResponse.getSessionToken()); + } else { + handleWriteStreamHandshakeComplete(initResponse.getSessionToken()); + } } @Override - public void onHandshakeComplete() { - handleWriteStreamHandshakeComplete(); + public void onOpen() { + if (!watchStream.isHandshakeInProgress()) { + writeStream.sendHandshake(localStore.getSessionToken()); + } } @Override @@ -253,6 +274,18 @@ public void onClose(Status status) { }); } + private void handleClearCache(ByteString sessionToken) { + hardAssert(clearPersistenceCallback != null, "Cannot clear persistence without callback"); + if (sessionToken.isEmpty()) { + sessionToken = localStore.getSessionToken(); + } + clearPersistenceCallback.accept(sessionToken); + } + + public void setClearPersistenceCallback(Consumer clearPersistenceCallback) { + this.clearPersistenceCallback = clearPersistenceCallback; + } + /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */ public void enableNetwork() { networkEnabled = true; @@ -312,15 +345,6 @@ private void restartNetwork() { enableNetwork(); } - /** - * Starts up the remote store, creating streams, restoring state from LocalStore, etc. This should - * called before using any other API endpoints in this class. - */ - public void start() { - // For now, all setup is handled by enableNetwork(). We might expand on this in the future. - enableNetwork(); - } - /** * Shuts down the remote store, tearing down connections and otherwise cleaning up. This is not * reversible and renders the Remote Store unusable. @@ -370,7 +394,7 @@ public void listen(TargetData targetData) { if (shouldStartWatchStream()) { startWatchStream(); - } else if (watchStream.isOpen()) { + } else if (watchStream.isHandshakeComplete()) { sendWatchRequest(targetData); } } @@ -400,7 +424,7 @@ public void stopListening(int targetId) { targetData != null, "stopListening called on target no currently watched: %d", targetId); // The watch stream might not be started if we're in a disconnected state - if (watchStream.isOpen()) { + if (watchStream.isHandshakeComplete()) { sendUnwatchRequest(targetId); } @@ -454,7 +478,19 @@ private void startWatchStream() { onlineStateTracker.handleWatchStreamStart(); } - private void handleWatchStreamOpen() { + private void handleWatchStreamHandshakeComplete(ByteString sessionToken) { + if (sessionToken.isEmpty()) { + sessionToken = localStore.getSessionToken(); + } else { + localStore.setSessionsToken(sessionToken); + } + + // If write stream started handshake, but was waiting for listen handshake to complete, we + // can continue write handshake now. + if (writeStream.isHandshakeInProgress()) { + writeStream.sendHandshake(sessionToken); + } + // Restore any existing watches. for (TargetData targetData : listenTargets.values()) { sendWatchRequest(targetData); @@ -645,7 +681,7 @@ private void addToWritePipeline(MutationBatch mutationBatch) { writePipeline.add(mutationBatch); - if (writeStream.isOpen() && writeStream.isHandshakeComplete()) { + if (writeStream.isHandshakeComplete()) { writeStream.writeMutations(mutationBatch.getMutations()); } } @@ -661,7 +697,19 @@ private void startWriteStream() { * Handles a successful handshake response from the server, which is our cue to send any pending * writes. */ - private void handleWriteStreamHandshakeComplete() { + private void handleWriteStreamHandshakeComplete(ByteString sessionToken) { + if (!sessionToken.isEmpty()) { + localStore.setSessionsToken(sessionToken); + } else { + sessionToken = localStore.getSessionToken(); + } + + // If listen stream started handshake, but was waiting for write handshake to complete, we + // can continue listen handshake now. + if (watchStream.isHandshakeInProgress()) { + watchStream.sendHandshake(sessionToken); + } + // Record the stream token. localStore.setLastStreamToken(writeStream.getLastStreamToken()); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WatchStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WatchStream.java index 515fefa1099..868055757ed 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WatchStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WatchStream.java @@ -21,6 +21,8 @@ import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firestore.v1.FirestoreGrpc; +import com.google.firestore.v1.InitRequest; +import com.google.firestore.v1.InitResponse; import com.google.firestore.v1.ListenRequest; import com.google.firestore.v1.ListenResponse; import com.google.protobuf.ByteString; @@ -45,11 +47,16 @@ public class WatchStream /** A callback interface for the set of events that can be emitted by the WatchStream */ interface Callback extends AbstractStream.StreamCallback { + + /** The handshake for this write stream has completed */ + void onHandshake(InitResponse initResponse); + /** A new change from the watch stream. Snapshot version will ne non-null if it was set */ void onWatchChange(SnapshotVersion snapshotVersion, WatchChange watchChange); } private final RemoteSerializer serializer; + protected boolean handshakeComplete = false; WatchStream( FirestoreChannel channel, @@ -67,6 +74,42 @@ interface Callback extends AbstractStream.StreamCallback { this.serializer = serializer; } + @Override + public void start() { + this.handshakeComplete = false; + super.start(); + } + + /** + * Sends an InitRequest to the server. + */ + void sendHandshake(ByteString sessionToken) { + hardAssert(isOpen(), "Writing handshake requires an opened stream"); + hardAssert(!handshakeComplete, "Handshake already completed"); + + InitRequest.Builder initRequest = InitRequest.newBuilder(); + if (sessionToken != null) initRequest.setSessionToken(sessionToken); + + ListenRequest.Builder request = + ListenRequest.newBuilder() + .setDatabase(serializer.databaseName()) + .setInitRequest(initRequest); + + writeRequest(request.build()); + } + + boolean isHandshakeInProgress() { + return isOpen() && !handshakeComplete; + } + + /** + * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to + * accept watch queries. + */ + boolean isHandshakeComplete() { + return isOpen() && handshakeComplete; + } + /** * Registers interest in the results of the given query. If the query includes a resumeToken it * will be included in the request. Results that affect the query will be streamed back as @@ -74,6 +117,7 @@ interface Callback extends AbstractStream.StreamCallback { */ public void watchQuery(TargetData targetData) { hardAssert(isOpen(), "Watching queries requires an open stream"); + hardAssert(handshakeComplete, "Handshake must be complete before watching queries"); ListenRequest.Builder request = ListenRequest.newBuilder() .setDatabase(serializer.databaseName()) @@ -100,17 +144,22 @@ public void unwatchTarget(int targetId) { } @Override - public void onFirst(ListenResponse listenResponse) { - onNext(listenResponse); + public void onFirst(ListenResponse response) { + hardAssert(response.hasInitResponse(), "InitResponse expected as part of Handshake response"); + + // The first response is the handshake response + handshakeComplete = true; + + listener.onHandshake(response.getInitResponse()); } @Override - public void onNext(ListenResponse listenResponse) { + public void onNext(ListenResponse response) { // A successful response means the stream is healthy backoff.reset(); - WatchChange watchChange = serializer.decodeWatchChange(listenResponse); - SnapshotVersion snapshotVersion = serializer.decodeVersionFromListenResponse(listenResponse); + WatchChange watchChange = serializer.decodeWatchChange(response); + SnapshotVersion snapshotVersion = serializer.decodeVersionFromListenResponse(response); listener.onWatchChange(snapshotVersion, watchChange); } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WriteStream.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WriteStream.java index e32545f1f5c..94b00d0bea6 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WriteStream.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WriteStream.java @@ -23,6 +23,8 @@ import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.AsyncQueue.TimerId; import com.google.firestore.v1.FirestoreGrpc; +import com.google.firestore.v1.InitRequest; +import com.google.firestore.v1.InitResponse; import com.google.firestore.v1.WriteRequest; import com.google.firestore.v1.WriteResponse; import com.google.protobuf.ByteString; @@ -54,7 +56,7 @@ public class WriteStream extends AbstractStream mutationResults); @@ -95,12 +97,16 @@ protected void tearDown() { } } + boolean isHandshakeInProgress() { + return isOpen() && !handshakeComplete; + } + /** * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to * accept mutations. */ boolean isHandshakeComplete() { - return handshakeComplete; + return isOpen() && handshakeComplete; } /** @@ -129,14 +135,21 @@ void setLastStreamToken(ByteString streamToken) { /** * Sends an initial streamToken to the server, performing the handshake required to make the * StreamingWrite RPC work. Subsequent {@link #writeMutations} calls should wait until a response - * has been delivered to {@link WriteStream.Callback#onHandshakeComplete}. + * has been delivered to {@link WriteStream.Callback#onHandshake}. */ - void writeHandshake() { + void sendHandshake(ByteString sessionToken) { hardAssert(isOpen(), "Writing handshake requires an opened stream"); hardAssert(!handshakeComplete, "Handshake already completed"); // TODO: Support stream resumption. We intentionally do not set the stream token on the // handshake, ignoring any stream token we might have. - WriteRequest.Builder request = WriteRequest.newBuilder().setDatabase(serializer.databaseName()); + + InitRequest.Builder initRequest = InitRequest.newBuilder(); + if (sessionToken != null) initRequest.setSessionToken(sessionToken); + + WriteRequest.Builder request = + WriteRequest.newBuilder() + .setDatabase(serializer.databaseName()) + .setInitRequest(initRequest); writeRequest(request.build()); } @@ -161,12 +174,13 @@ void writeMutations(List mutations) { @Override public void onFirst(WriteResponse response) { + hardAssert(response.hasInitResponse(), "InitResponse expected as part of Handshake response"); lastStreamToken = response.getStreamToken(); // The first response is the handshake response handshakeComplete = true; - listener.onHandshakeComplete(); + listener.onHandshake(response.getInitResponse()); } @Override diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java index 54629b03e0f..71307945d03 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java @@ -14,13 +14,11 @@ package com.google.firebase.firestore.util; -import static com.google.firebase.firestore.util.Assert.fail; import static com.google.firebase.firestore.util.Assert.hardAssert; import android.annotation.SuppressLint; import android.os.Handler; import android.os.Looper; -import androidx.annotation.NonNull; import androidx.annotation.VisibleForTesting; import com.google.android.gms.tasks.Continuation; import com.google.android.gms.tasks.Task; @@ -29,17 +27,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; @@ -191,203 +181,6 @@ public static Task callTask(Executor executor, Callable - *

  • Synchronized task scheduling. This is different from function 3, which is about task - * execution in a single thread. - *
  • Ability to do soft-shutdown: only critical tasks related to shutting Firestore SDK down - * can be executed once the shutdown process initiated. - *
  • Single threaded execution service, no concurrent execution among the `Runnable`s - * scheduled in this Executor. - * - */ - private class SynchronizedShutdownAwareExecutor implements Executor { - /** - * The single threaded executor that is backing this Executor. This is also the executor used - * when some tasks explicitly request to run after shutdown has been initiated. - */ - private final ScheduledThreadPoolExecutor internalExecutor; - - /** Whether the shutdown process has initiated, once it is started, it is not revertable. */ - private boolean isShuttingDown; - - /** - * The single thread that will be used by the executor. This is created early and managed - * directly so that it's possible later to make assertions about executing on the correct - * thread. - */ - private final Thread thread; - - /** A ThreadFactory for a single, pre-created thread. */ - private class DelayedStartFactory implements Runnable, ThreadFactory { - private final CountDownLatch latch = new CountDownLatch(1); - private Runnable delegate; - - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - delegate.run(); - } - - @Override - public Thread newThread(@NonNull Runnable runnable) { - hardAssert(delegate == null, "Only one thread may be created in an AsyncQueue."); - delegate = runnable; - latch.countDown(); - return thread; - } - } - - // TODO(b/258277574): Migrate to go/firebase-android-executors - @SuppressLint("ThreadPoolCreation") - SynchronizedShutdownAwareExecutor() { - DelayedStartFactory threadFactory = new DelayedStartFactory(); - - thread = Executors.defaultThreadFactory().newThread(threadFactory); - thread.setName("FirestoreWorker"); - thread.setDaemon(true); - thread.setUncaughtExceptionHandler((crashingThread, throwable) -> panic(throwable)); - - internalExecutor = - new ScheduledThreadPoolExecutor(1, threadFactory) { - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - if (t == null && r instanceof Future) { - Future future = (Future) r; - try { - // Not all Futures will be done, for example when used with scheduledAtFixedRate. - if (future.isDone()) { - future.get(); - } - } catch (CancellationException ce) { - // Cancellation exceptions are okay, we expect them to happen sometimes - } catch (ExecutionException ee) { - t = ee.getCause(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (t != null) { - panic(t); - } - } - }; - - // Core threads don't time out, this only takes effect when we drop the number of required - // core threads - internalExecutor.setKeepAliveTime(3, TimeUnit.SECONDS); - - isShuttingDown = false; - } - - /** Synchronized access to isShuttingDown */ - private synchronized boolean isShuttingDown() { - return isShuttingDown; - } - - /** - * Check if shutdown is initiated before scheduling. If it is initiated, the command will not be - * executed. - */ - @Override - public synchronized void execute(Runnable command) { - if (!isShuttingDown) { - internalExecutor.execute(command); - } - } - - /** Execute the command, regardless if shutdown has been initiated. */ - public void executeEvenAfterShutdown(Runnable command) { - try { - internalExecutor.execute(command); - } catch (RejectedExecutionException e) { - // The only way we can get here is if the AsyncQueue has panicked and we're now racing with - // the post to the main looper that will crash the app. - Logger.warn(AsyncQueue.class.getSimpleName(), "Refused to enqueue task after panic"); - } - } - - /** - * Run a given `Callable` on this executor, and report the result of the `Callable` in a {@link - * Task}. The `Callable` will not be run if the executor started shutting down already. - * - * @return A {@link Task} resolves when the requested `Callable` completes, or reports error - * when the `Callable` runs into exceptions. - */ - private Task executeAndReportResult(Callable task) { - final TaskCompletionSource completionSource = new TaskCompletionSource<>(); - try { - this.execute( - () -> { - try { - completionSource.setResult(task.call()); - } catch (Exception e) { - completionSource.setException(e); - throw new RuntimeException(e); - } - }); - } catch (RejectedExecutionException e) { - // The only way we can get here is if the AsyncQueue has panicked and we're now racing with - // the post to the main looper that will crash the app. - Logger.warn(AsyncQueue.class.getSimpleName(), "Refused to enqueue task after panic"); - } - return completionSource.getTask(); - } - - /** - * Initiate the shutdown process. Once called, the only possible way to run `Runnable`s are by - * holding the `internalExecutor` reference. - */ - private synchronized Task executeAndInitiateShutdown(Runnable task) { - if (isShuttingDown()) { - TaskCompletionSource source = new TaskCompletionSource<>(); - source.setResult(null); - return source.getTask(); - } - - // Not shutting down yet, execute and return a Task. - Task t = - executeAndReportResult( - () -> { - task.run(); - return null; - }); - - // Mark the initiation of shut down. - isShuttingDown = true; - - return t; - } - - /** - * Wraps {@link ScheduledThreadPoolExecutor#schedule(Runnable, long, TimeUnit)} and provides - * shutdown state check: the command will not be scheduled if the shutdown has been initiated. - */ - private synchronized ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (!isShuttingDown) { - return internalExecutor.schedule(command, delay, unit); - } - return null; - } - - /** Wraps around {@link ScheduledThreadPoolExecutor#shutdownNow()}. */ - private void shutdownNow() { - internalExecutor.shutdownNow(); - } - - /** Wraps around {@link ScheduledThreadPoolExecutor#setCorePoolSize(int)}. */ - private void setCorePoolSize(int size) { - internalExecutor.setCorePoolSize(size); - } - } - /** The executor backing this AsyncQueue. */ private final SynchronizedShutdownAwareExecutor executor; @@ -395,29 +188,31 @@ private void setCorePoolSize(int size) { // or canceled. // NOTE: We disallow duplicates currently, so this could be a Set<> which might have better // theoretical removal speed, except this list will always be small so ArrayList is fine. - private final ArrayList delayedTasks; + private final ArrayList delayedTasks = new ArrayList<>(); // List of TimerIds to fast-forward delays for. private final ArrayList timerIdsToSkip = new ArrayList<>(); public AsyncQueue() { - delayedTasks = new ArrayList<>(); - executor = new SynchronizedShutdownAwareExecutor(); + this(new SynchronizedShutdownAwareExecutor()); + } + + private AsyncQueue(SynchronizedShutdownAwareExecutor executor) { + this.executor = executor; + } + + public void setOnShutdown(Runnable onShutdown) { + executor.setOnShutdown(onShutdown); } public Executor getExecutor() { - return executor; + // Returns internal executor that will continue to work, even after shutdown() + return executor.internalExecutor; } /** Verifies that the current thread is the managed AsyncQueue thread. */ public void verifyIsCurrentThread() { - Thread current = Thread.currentThread(); - if (executor.thread != current) { - throw fail( - "We are running on the wrong thread. Expected to be on the AsyncQueue " - + "thread %s/%d but was %s/%d", - executor.thread.getName(), executor.thread.getId(), current.getName(), current.getId()); - } + executor.verifyIsCurrentThread(); } /** @@ -447,23 +242,6 @@ public Task enqueue(Runnable task) { }); } - /** - * Queue a Runnable and immediately mark the initiation of shutdown process. Tasks queued after - * this method is called are not run unless they explicitly are requested via {@link - * AsyncQueue#enqueueAndForgetEvenAfterShutdown(Runnable)}. - */ - public Task enqueueAndInitiateShutdown(Runnable task) { - return executor.executeAndInitiateShutdown(task); - } - - /** - * Queue and run this Runnable task immediately after every other already queued task, regardless - * if shutdown has been initiated. - */ - public void enqueueAndForgetEvenAfterShutdown(Runnable task) { - executor.executeEvenAfterShutdown(task); - } - /** Has the shutdown process been initiated. */ public boolean isShuttingDown() { return executor.isShuttingDown(); @@ -523,7 +301,10 @@ public void skipDelaysForTimerId(TimerId timerId) { */ public void panic(Throwable t) { executor.shutdownNow(); + halt(t); + } + static void halt(Throwable t) { // TODO(b/258277574): Migrate to go/firebase-android-executors @SuppressLint("ThreadPoolCreation") Handler handler = new Handler(Looper.getMainLooper()); @@ -613,13 +394,23 @@ public void runDelayedTasksUntil(TimerId lastTimerId) throws InterruptedExceptio }); } + /** + * Shuts down the AsyncQueue after which no progress will ever be made again. + */ + public Task shutdown() { + return executor.shutdown(); + } + /** * Shuts down the AsyncQueue and releases resources after which no progress will ever be made - * again. + * again. Attempts to use executor will throw exception. */ - public void shutdown() { - // Will cause the executor to de-reference all threads, the best we can do - executor.setCorePoolSize(0); + public void terminate() { + executor.terminate(); + } + + public AsyncQueue reincarnate() { + return new AsyncQueue(executor.reincarnate()); } /** diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/util/SynchronizedShutdownAwareExecutor.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/SynchronizedShutdownAwareExecutor.java new file mode 100644 index 00000000000..39a78cc5e0e --- /dev/null +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/util/SynchronizedShutdownAwareExecutor.java @@ -0,0 +1,270 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.firebase.firestore.util; + +import static com.google.firebase.firestore.util.Assert.fail; +import static com.google.firebase.firestore.util.Assert.hardAssert; + +import android.annotation.SuppressLint; +import androidx.annotation.NonNull; +import com.google.android.gms.tasks.Task; +import com.google.android.gms.tasks.TaskCompletionSource; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * A wrapper around a {@link ScheduledThreadPoolExecutor} class that provides: + * + *
      + *
    1. Synchronized task scheduling. This is different from function 3, which is about task + * execution in a single thread. + *
    2. Ability to do soft-shutdown: only critical tasks related to shutting Firestore SDK down + * can be executed once the shutdown process initiated. + *
    3. Single threaded execution service, no concurrent execution among the `Runnable`s + * scheduled in this Executor. + *
    + */ +class SynchronizedShutdownAwareExecutor implements Executor { + public static final String ASYNC_QUEUE_IS_SHUTDOWN = "AsyncQueue is shutdown"; + + /** + * The single threaded executor that is backing this Executor. This is also the executor used + * when some tasks explicitly request to run after shutdown has been initiated. + */ + final ScheduledThreadPoolExecutor internalExecutor; + + /** + * Task ss assigned when the shutdown process has been initiated, once it is started, it is not revertable. + */ + private Task shutdownTask; + + private Runnable onShutdown = null; + + /** + * The single thread that will be used by the executor. This is created early and managed + * directly so that it's possible later to make assertions about executing on the correct + * thread. + */ + private final Thread thread; + + /** + * A ThreadFactory for a single, pre-created thread. + */ + private class DelayedStartFactory implements Runnable, ThreadFactory { + private final CountDownLatch latch = new CountDownLatch(1); + private Runnable delegate; + + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + delegate.run(); + } + + @Override + public Thread newThread(@NonNull Runnable runnable) { + hardAssert(delegate == null, "Only one thread may be created in an AsyncQueue."); + delegate = runnable; + latch.countDown(); + return thread; + } + } + + // TODO(b/258277574): Migrate to go/firebase-android-executors + @SuppressLint("ThreadPoolCreation") + public SynchronizedShutdownAwareExecutor() { + DelayedStartFactory threadFactory = new DelayedStartFactory(); + + thread = Executors.defaultThreadFactory().newThread(threadFactory); + thread.setName("FirestoreWorker"); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler( + (crashingThread, throwable) -> { + shutdownNow(); + AsyncQueue.halt(throwable); + }); + + internalExecutor = + new ScheduledThreadPoolExecutor(1, threadFactory) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future) { + Future future = (Future) r; + try { + // Not all Futures will be done, for example when used with scheduledAtFixedRate. + if (future.isDone()) { + future.get(); + } + } catch (CancellationException ce) { + // Cancellation exceptions are okay, we expect them to happen sometimes + } catch (ExecutionException ee) { + t = ee.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (t != null && !ASYNC_QUEUE_IS_SHUTDOWN.equals(t.getMessage())) { + shutdownNow(); + AsyncQueue.halt(t); + } + } + }; + + // Core threads don't time out, this only takes effect when we drop the number of required + // core threads + internalExecutor.setKeepAliveTime(3, TimeUnit.SECONDS); + } + + private SynchronizedShutdownAwareExecutor( + ScheduledThreadPoolExecutor internalExecutor, Thread thread) { + this.internalExecutor = internalExecutor; + this.thread = thread; + } + + synchronized void verifyNotShutdown() { + if (shutdownTask != null) { + throw new RejectedExecutionException(ASYNC_QUEUE_IS_SHUTDOWN); + } + } + + void setOnShutdown(Runnable onShutdown) { + verifyNotShutdown(); + hardAssert(this.onShutdown == null, "setOnShutdown can only be called once."); + this.onShutdown = onShutdown; + } + + /** + * Synchronized access to isShuttingDown + */ + synchronized boolean isShuttingDown() { + return shutdownTask != null; + } + + /** + * Check if shutdown is initiated before scheduling. If it is initiated, the command will not be + * executed. + */ + void verifyIsCurrentThread() { + Thread current = Thread.currentThread(); + if (thread != current) { + throw fail( + "We are running on the wrong thread. Expected to be on the AsyncQueue " + + "thread %s/%d but was %s/%d", + thread.getName(), thread.getId(), current.getName(), current.getId()); + } + } + + @Override + public synchronized void execute(Runnable command) { + verifyNotShutdown(); + internalExecutor.execute(command); + } + + /** + * Run a given `Callable` on this executor, and report the result of the `Callable` in a {@link + * Task}. The `Callable` will not be run if the executor started shutting down already. + * + * @return A {@link Task} resolves when the requested `Callable` completes, or reports error + * when the `Callable` runs into exceptions. + */ + Task executeAndReportResult(Callable task) { + final TaskCompletionSource completionSource = new TaskCompletionSource<>(); + try { + this.execute( + () -> { + try { + completionSource.setResult(task.call()); + } catch (Exception e) { + completionSource.setException(e); + } + }); + } catch (RejectedExecutionException e) { + // The only way we can get here is if the AsyncQueue has panicked and we're now racing with + // the post to the main looper that will crash the app. + Logger.warn(AsyncQueue.class.getSimpleName(), "Refused to enqueue task after panic"); + completionSource.setException(e); + } + return completionSource.getTask(); + } + + /** + * Initiate the shutdown process. Once called, the only possible way to run `Runnable`s are by + * holding the `internalExecutor` reference. + */ + synchronized Task shutdown() { + if (shutdownTask == null) { + shutdownTask = + executeAndReportResult( + () -> { + if (this.onShutdown != null) { + this.onShutdown.run(); + } + return null; + }); + } + return shutdownTask; + } + + /** + * Initiate the shutdown process and reduce thread pool to 0. + */ + synchronized void terminate() { + shutdown(); + + // Will cause the executor to de-reference all threads, the best we can do + internalExecutor.setCorePoolSize(0); + } + + synchronized SynchronizedShutdownAwareExecutor reincarnate() { + hardAssert( + isShuttingDown(), "Executor must be shutting down to be eligible for reincarnation."); + hardAssert(!isTerminated(), "Cannot reincarnate executor that is terminated."); + return new SynchronizedShutdownAwareExecutor(internalExecutor, thread); + } + + private boolean isTerminated() { + return internalExecutor.getCorePoolSize() == 0; + } + + /** + * Wraps {@link ScheduledThreadPoolExecutor#schedule(Runnable, long, TimeUnit)} and provides + * shutdown state check: the command will not be scheduled if the shutdown has been initiated. + */ + synchronized ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + verifyNotShutdown(); + return internalExecutor.schedule(command, delay, unit); + } + + /** + * Wraps around {@link ScheduledThreadPoolExecutor#shutdownNow()}. + */ + void shutdownNow() { + internalExecutor.shutdownNow(); + } +} diff --git a/firebase-firestore/src/proto/google/firestore/v1/firestore.proto b/firebase-firestore/src/proto/google/firestore/v1/firestore.proto index 1bf75ea3c15..c3e7e82b846 100644 --- a/firebase-firestore/src/proto/google/firestore/v1/firestore.proto +++ b/firebase-firestore/src/proto/google/firestore/v1/firestore.proto @@ -566,6 +566,39 @@ message RunAggregationQueryResponse { google.protobuf.Timestamp read_time = 3; } +// New message +message InitRequest { + // Token for synchronization. + // + // The `session_token`, that was received previously as part of InitResponse, should be + // passed back in the next `InitRequest`. + // + // If this is the first time SDK connects, then the `session_token` should be empty. + // + // The token contains database information used to determine whether SDK is out of + // sync. Contents are opaque and can change in the future. + // + // The `session_token` on the ListenStream has the same contents as the WriteStream. + // Whichever stream was last to receive a `session_token`, is the `session_token` that should + // be used as part of the InitRequest, regardless of whether it was from the other + // stream. + // + // The InitResponse will signal whether to `clear_cache`. + bytes session_token = 1; +} + +// New message +message InitResponse { + // Token for synchronization + // + // The `session_token` should be returned as part of the next InitRequest. + bytes session_token = 1; + + // Depending on `session_token`, changes may have occurred that require SDK to clear + // cache. + bool clear_cache = 2; +} + // The request for [Firestore.Write][google.firestore.v1.Firestore.Write]. // // The first request creates a stream, or resumes an existing one from a token. @@ -613,6 +646,8 @@ message WriteRequest { // Labels associated with this write request. map labels = 5; + + InitRequest init_request = 6; } // The response for [Firestore.Write][google.firestore.v1.Firestore.Write]. @@ -635,6 +670,8 @@ message WriteResponse { // The time at which the commit occurred. google.protobuf.Timestamp commit_time = 4; + + InitResponse init_response = 5; } // A request for [Firestore.Listen][google.firestore.v1.Firestore.Listen] @@ -650,6 +687,8 @@ message ListenRequest { // The ID of a target to remove from this stream. int32 remove_target = 3; + + InitRequest init_request = 5; } // Labels associated with this target change. @@ -679,6 +718,8 @@ message ListenResponse { // Returned when documents may have been removed from the given target, but // the exact documents are unknown. ExistenceFilter filter = 5; + + InitResponse init_response = 7; } } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/FirebaseFirestoreIntegrationTestFactory.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/FirebaseFirestoreIntegrationTestFactory.java index a8afe3d52d3..c94c372c147 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/FirebaseFirestoreIntegrationTestFactory.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/FirebaseFirestoreIntegrationTestFactory.java @@ -30,11 +30,13 @@ import com.google.firebase.firestore.remote.RemoteComponenetProvider; import com.google.firebase.firestore.testutil.EmptyAppCheckTokenProvider; import com.google.firebase.firestore.testutil.EmptyCredentialsProvider; +import com.google.firebase.firestore.util.Function; import com.google.firestore.v1.FirestoreGrpc; import com.google.firestore.v1.ListenRequest; import com.google.firestore.v1.ListenResponse; import com.google.firestore.v1.WriteRequest; import com.google.firestore.v1.WriteResponse; +import java.util.concurrent.Executor; /** * Factory for producing FirebaseFirestore instances that has mocked gRPC layer. @@ -71,6 +73,9 @@ public static class Instance { private final AsyncTaskAccumulator> writes = new AsyncTaskAccumulator<>(); + /** Mockito mock of GrpcCallProvider. */ + public GrpcCallProvider mockGrpcCallProvider; + private Instance(ComponentProvider componentProvider) { this.componentProvider = componentProvider; } @@ -91,7 +96,7 @@ public Task enqueue(Runnable runnable) { * `FirebaseFirestoreIntegrationTestFactory` will set `Instance.configuration` from within * the ComponentProvider override. */ - private ComponentProvider.Configuration configuration; + public ComponentProvider.Configuration configuration; /** Every listen stream created */ public Task> getListenClient(int i) { @@ -122,8 +127,8 @@ public FirebaseFirestoreIntegrationTestFactory(DatabaseId databaseId) { ApplicationProvider.getApplicationContext(), databaseId, "k", - new EmptyCredentialsProvider(), - new EmptyAppCheckTokenProvider(), + EmptyCredentialsProvider::new, + EmptyAppCheckTokenProvider::new, this::componentProvider, null, instanceRegistry, @@ -137,12 +142,17 @@ public void useMemoryCache() { firestore.setFirestoreSettings(builder.build()); } + public void setClearPersistenceMethod(Function> clearPersistenceMethod) { + firestore.clearPersistenceMethod = clearPersistenceMethod; + } + private GrpcCallProvider mockGrpcCallProvider(Instance instance) { GrpcCallProvider mockGrpcCallProvider = mock(GrpcCallProvider.class); when(mockGrpcCallProvider.createClientCall(eq(FirestoreGrpc.getListenMethod()))) .thenAnswer(invocation -> Tasks.forResult(new TestClientCall<>(instance.listens.next()))); when(mockGrpcCallProvider.createClientCall(eq(FirestoreGrpc.getWriteMethod()))) .thenAnswer(invocation -> Tasks.forResult(new TestClientCall<>(instance.writes.next()))); + instance.mockGrpcCallProvider = mockGrpcCallProvider; return mockGrpcCallProvider; } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java index adaee0dcf87..0dd96d74620 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/FirebaseFirestoreTest.java @@ -17,14 +17,21 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.firebase.firestore.testutil.TestUtil.map; import static com.google.firebase.firestore.util.Executors.BACKGROUND_EXECUTOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import androidx.annotation.NonNull; import com.google.android.gms.tasks.Task; +import com.google.android.gms.tasks.TaskCompletionSource; import com.google.firebase.firestore.CollectionReference; import com.google.firebase.firestore.DocumentReference; import com.google.firebase.firestore.FirebaseFirestore; +import com.google.firebase.firestore.FirebaseFirestoreException; import com.google.firebase.firestore.FirebaseFirestoreIntegrationTestFactory; +import com.google.firebase.firestore.QuerySnapshot; import com.google.firebase.firestore.UserDataReader; import com.google.firebase.firestore.core.UserData; import com.google.firebase.firestore.model.DatabaseId; @@ -33,10 +40,17 @@ import com.google.firebase.firestore.model.mutation.Precondition; import com.google.firebase.firestore.model.mutation.SetMutation; import com.google.firebase.firestore.remote.RemoteSerializer; +import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firestore.v1.InitRequest; +import com.google.firestore.v1.InitResponse; +import com.google.firestore.v1.ListenRequest; +import com.google.firestore.v1.ListenResponse; import com.google.firestore.v1.Write; import com.google.firestore.v1.WriteRequest; import com.google.firestore.v1.WriteResponse; import com.google.firestore.v1.WriteResult; +import com.google.protobuf.ByteString; +import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.Status; import java.util.ArrayList; @@ -45,6 +59,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -70,6 +85,12 @@ private static T waitForResult(Task task) throws InterruptedException { return waitFor(task).getResult(); } + private static Exception waitForException(Task task) throws InterruptedException { + Exception exception = waitFor(task).getException(); + assertThat(exception).isNotNull(); + return exception; + } + @NonNull public static String getResourcePrefixValue(DatabaseId databaseId) { return String.format( @@ -85,6 +106,11 @@ private static Task waitFor(Task task) throws InterruptedException { return task; } + private T waitForException(Task task, Class clazz) + throws InterruptedException { + return clazz.cast(waitForException(task)); + } + @Before public void before() { databaseId = DatabaseId.forDatabase("p", "d"); @@ -127,10 +153,13 @@ public void preserveWritesWhenDisconnectedWithInternalError() throws Exception { // Wait for WriteRequest handshake. // We expect an empty init request because the database is fresh. - assertThat(waitForResult(requests.next())).isEqualTo(writeRequestHandshake()); + assertThat(waitForResult(requests.next())) + .isEqualTo(writeRequest(InitRequest.getDefaultInstance())); // Simulate a successful InitResponse from server. - waitForSuccess(instance.enqueue(() -> callback.listener.onMessage(writeResponse()))); + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token1"))))); // Expect first write request. Write write1 = serializer.encodeMutation(setMutation(doc1, map("foo", "A"))); @@ -160,10 +189,12 @@ public void preserveWritesWhenDisconnectedWithInternalError() throws Exception { // Wait for WriteRequest handshake. // We expect FirestoreClient to send InitRequest with previous token. - assertThat(waitForResult(requests.next())).isEqualTo(writeRequestHandshake()); + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(initRequest("token1"))); // Simulate a successful InitResponse from server. - waitForSuccess(instance.enqueue(() -> callback.listener.onMessage(writeResponse()))); + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token2"))))); // Expect second write to be retried. Write write2 = serializer.encodeMutation(setMutation(doc2, map("foo", "B"))); @@ -180,6 +211,365 @@ public void preserveWritesWhenDisconnectedWithInternalError() throws Exception { } } + @Test() + public void clearPersistanceAfterStartupShouldRestartFirestoreClient() throws Exception { + // Trigger instantiation of FirestoreClient + firestore.collection("col"); + + FirebaseFirestoreIntegrationTestFactory.Instance first = + waitForResult(factory.instances.get(0)); + + AsyncQueue firstAsyncQueue = first.configuration.asyncQueue; + + assertFalse(firstAsyncQueue.isShuttingDown()); + + // Clearing persistence will require restarting FirestoreClient. + waitForSuccess(firestore.clearPersistence()); + + // Now we have a history of 2 instances. + FirebaseFirestoreIntegrationTestFactory.Instance second = + waitForResult(factory.instances.get(1)); + AsyncQueue secondAsyncQueue = second.configuration.asyncQueue; + + assertEquals(firstAsyncQueue.getExecutor(), secondAsyncQueue.getExecutor()); + + assertTrue(firstAsyncQueue.isShuttingDown()); + assertFalse(secondAsyncQueue.isShuttingDown()); + + // AsyncQueue of first instance should reject tasks. + Exception firstTask = waitForException(firstAsyncQueue.enqueue(() -> "Hi")); + assertThat(firstTask).isInstanceOf(RejectedExecutionException.class); + assertThat(firstTask).hasMessageThat().isEqualTo("AsyncQueue is shutdown"); + + // AsyncQueue of second instance should be functional. + assertThat(waitFor(secondAsyncQueue.enqueue(() -> "Hello")).getResult()).isEqualTo("Hello"); + + waitForSuccess(firestore.terminate()); + + // After terminate the second instance should also reject tasks. + Exception afterTerminate = waitForException(secondAsyncQueue.enqueue(() -> "Uh oh")); + assertThat(afterTerminate).isInstanceOf(RejectedExecutionException.class); + assertThat(afterTerminate).hasMessageThat().isEqualTo("AsyncQueue is shutdown"); + } + + @Test + public void clearPersistenceDueToInitResponse() throws Exception { + // Create a snapshot listener that will be active during handshake clearing of cache. + TestEventListener snapshotListener1 = new TestEventListener<>(); + firestore.collection("col").addSnapshotListener(BACKGROUND_EXECUTOR, snapshotListener1); + Iterator> snapshots = snapshotListener1.iterator(); + Task firstSnapshot = snapshots.next(); + + // Wait for first FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance first = + waitForResult(factory.instances.get(0)); + + // Wait for Listen CallClient to be created. + TestClientCall callback1 = + waitForResult(first.getListenClient(0)); + + // Wait for ListenRequest handshake. + // We expect an empty init request because the database is fresh. + assertThat(waitForResult(callback1.getRequest(0))) + .isEqualTo(listenRequest(InitRequest.getDefaultInstance())); + + // Simulate a successful InitResponse from server. + waitForSuccess( + first.enqueue(() -> callback1.listener.onMessage(listenResponse(initResponse("token1"))))); + + // We expect previous addSnapshotListener to cause a, AddTarget request. + assertTrue(waitForResult(callback1.getRequest(1)).hasAddTarget()); + + // TODO(does this make sense?) + // We have a 10 second timeout on raising snapshot from Cache, that is triggered when Listen + // connection is closed. + assertFalse(firstSnapshot.isComplete()); + + // Simulate Database deletion by closing connection with NOT_FOUND. + waitForSuccess( + first.enqueue(() -> callback1.listener.onClose(Status.NOT_FOUND, new Metadata()))); + + // First snapshot is raised from cache immediately after connection is closed. + assertTrue(waitForResult(firstSnapshot).getMetadata().isFromCache()); + + // We expect client to reconnect Listen stream. + TestClientCall callback2 = + waitForResult(first.getListenClient(1)); + + // Wait for ListenRequest. + // We expect FirestoreClient to send InitRequest with previous token. + assertThat(waitForResult(callback2.getRequest(0))) + .isEqualTo(listenRequest(initRequest("token1"))); + + // This task will complete when clearPersistence is invoked on FirebaseFirestore. + Task clearPersistenceTask = setupClearPersistenceTask(); + + // Simulate a clear cache InitResponse from server. + waitForSuccess( + first.enqueue( + () -> callback2.listener.onMessage(listenResponse(initResponse("token2", true))))); + + // Wait for cleanPersistence to be run. + waitForSuccess(clearPersistenceTask); + + // Verify that the first FirestoreClient was shutdown. If the GrpcCallProvider component has + // has it's shutdown method called, then we know shutdown was triggered. + verify(first.mockGrpcCallProvider, times(1)).shutdown(); + + // Snapshot listeners should fail with ABORTED + FirebaseFirestoreException exception = + waitForException(snapshots.next(), FirebaseFirestoreException.class); + assertThat(exception.getCode()).isEqualTo(FirebaseFirestoreException.Code.ABORTED); + + // Start another snapshot listener + TestEventListener snapshotListener2 = new TestEventListener<>(); + firestore.collection("col").addSnapshotListener(BACKGROUND_EXECUTOR, snapshotListener2); + + // Wait for second FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance second = + waitForResult(factory.instances.get(1)); + + // Wait for Listen CallClient to be created. + TestClientCall callback3 = + waitForResult(second.getListenClient(0)); + + // Wait for ListenRequest. + // We expect FirestoreClient to send InitRequest with previous token. + assertThat(waitForResult(callback3.getRequest(0))) + .isEqualTo(listenRequest(initRequest("token2"))); + } + + @Test + public void preserveWritesWhenDisconnectedWithNotFound() throws Exception { + CollectionReference col = firestore.collection("col"); + DocumentReference doc1 = col.document(); + DocumentReference doc2 = col.document(); + DocumentReference doc3 = col.document(); + doc1.set(map("foo", "A")); + doc2.set(map("foo", "B")); + doc3.set(map("foo", "C")); + + // 1st FirestoreClient instance. + { + // Wait for first FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance instance = + waitForResult(factory.instances.get(0)); + RemoteSerializer serializer = instance.componentProvider.getRemoteSerializer(); + + // First Write stream connection + { + // Wait for Write CallClient to be created. + TestClientCall callback = + waitForResult(instance.getWriteClient(0)); + Iterator> requests = callback.requestIterator(); + + // Wait for WriteRequest handshake. + // We expect an empty init request because the database is fresh. + assertThat(waitForResult(requests.next())) + .isEqualTo(writeRequest(InitRequest.getDefaultInstance())); + + // Simulate a successful InitResponse from server. + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token1"))))); + + // Expect first write request. + Write write1 = serializer.encodeMutation(setMutation(doc1, map("foo", "A"))); + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(write1)); + + // Simulate write acknowledgement. + waitForSuccess( + instance.enqueue( + () -> + callback.listener.onMessage(writeResponse(WriteResult.getDefaultInstance())))); + + // Expect second write request. + Write write2 = serializer.encodeMutation(setMutation(doc2, map("foo", "B"))); + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(write2)); + + // Simulate NOT_FOUND error that was NOT due to database name reuse. ( + waitForSuccess( + instance.enqueue(() -> callback.listener.onClose(Status.NOT_FOUND, new Metadata()))); + } + + // Second Write Stream connection + // Previous connection was closed by server with NOT_FOUND error. + { + // Wait for Write CallClient to be created. + TestClientCall callback = + waitForResult(instance.getWriteClient(1)); + Iterator> requests = callback.requestIterator(); + + // Wait for WriteRequest handshake. + // We expect FirestoreClient to send InitRequest with previous token. + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(initRequest("token1"))); + + // Simulate a successful InitResponse from server. + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token2"))))); + + // Expect second write to be retried. + Write write2 = serializer.encodeMutation(setMutation(doc2, map("foo", "B"))); + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(write2)); + + // Simulate write acknowledgement. + waitForSuccess( + instance.enqueue( + () -> + callback.listener.onMessage(writeResponse(WriteResult.getDefaultInstance())))); + + // Simulate NOT_FOUND error. This time we will clear cache. + waitForSuccess( + instance.enqueue(() -> callback.listener.onClose(Status.NOT_FOUND, new Metadata()))); + } + + // Third Write Stream connection + // Previous connection was closed by server with NOT_FOUND error. + { + // Wait for Write CallClient to be created. + TestClientCall callback = + waitForResult(instance.getWriteClient(2)); + Iterator> requests = callback.requestIterator(); + + // Wait for WriteRequest. + // We expect FirestoreClient to send InitRequest with previous token. + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(initRequest("token2"))); + + // Simulate a clear cache InitResponse from server. + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token3", true))))); + } + } + + // Interaction with 2nd FirestoreClient instance. + // Previous instance was shutdown due to clear cache command from server. + { + // Wait for second FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance instance = + waitForResult(factory.instances.get(1)); + RemoteSerializer serializer = instance.componentProvider.getRemoteSerializer(); + + // The writes should have been cleared, so we will have to create a new one. + DocumentReference doc4 = col.document(); + doc4.set(map("foo", "D")); + + // Wait for Write CallClient to be created. + TestClientCall callback = + waitForResult(instance.getWriteClient(0)); + Iterator> requests = callback.requestIterator(); + + // Wait for WriteRequest. + // We expect FirestoreClient to send InitRequest with previous token. + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(initRequest("token3"))); + + // Simulate a successful InitResponse from server. + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(initResponse("token4"))))); + + // Expect the new write request. + Write write4 = serializer.encodeMutation(setMutation(doc4, map("foo", "D"))); + assertThat(waitForResult(requests.next())).isEqualTo(writeRequest(write4)); + + // Simulate write acknowledgement. + waitForSuccess( + instance.enqueue( + () -> callback.listener.onMessage(writeResponse(WriteResult.getDefaultInstance())))); + } + } + + @Test + public void listenHandshakeMustWaitForWriteHandshakeToComplete() throws Exception { + CollectionReference col = firestore.collection("col"); + + // Wait for FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance instance = + waitForResult(factory.instances.get(0)); + + // Trigger Write Stream First + col.document().set(map("foo", "A")); + + TestClientCall write = waitForResult(instance.getWriteClient(0)); + ClientCall.Listener writeResponses = write.listener; + Iterator> writeRequests = write.requestIterator(); + + // Then Trigger Listen Stream; + TestEventListener snapshotListener = new TestEventListener<>(); + firestore.collection("col").addSnapshotListener(BACKGROUND_EXECUTOR, snapshotListener); + Iterator> snapshots = snapshotListener.iterator(); + + TestClientCall listen = + waitForResult(instance.getListenClient(0)); + Iterator> listenRequests = listen.requestIterator(); + ClientCall.Listener listenResponses = listen.listener; + + // Prepare + Task writeInitRequest = writeRequests.next(); + Task listenInitRequest = listenRequests.next(); + + // Expect empty InitRequest from Write stream. + assertThat(waitForResult(writeInitRequest)) + .isEqualTo(writeRequest(InitRequest.getDefaultInstance())); + + // No request should have come from Listen stream yet. + assertFalse(listenInitRequest.isComplete()); + + // Simulate a successful InitResponse from server. + waitForSuccess( + instance.enqueue(() -> writeResponses.onMessage(writeResponse(initResponse("token1"))))); + + // Now that Write handshake is complete, the Listen stream should send a InitRequest with token + // from Write handshake. + assertThat(waitForResult(listenInitRequest)).isEqualTo(listenRequest(initRequest("token1"))); + } + + @Test + public void writeHandshakeMustWaitForListenHandshakeToComplete() throws Exception { + CollectionReference col = firestore.collection("col"); + + // Wait for FirestoreClient to instantiate + FirebaseFirestoreIntegrationTestFactory.Instance instance = + waitForResult(factory.instances.get(0)); + + // Trigger Listen Stream First + TestEventListener snapshotListener = new TestEventListener<>(); + firestore.collection("col").addSnapshotListener(BACKGROUND_EXECUTOR, snapshotListener); + Iterator> snapshots = snapshotListener.iterator(); + + TestClientCall listen = + waitForResult(instance.getListenClient(0)); + Iterator> listenRequests = listen.requestIterator(); + ClientCall.Listener listenResponses = listen.listener; + + // Then Trigger Write Stream; + col.document().set(map("foo", "A")); + + TestClientCall write = waitForResult(instance.getWriteClient(0)); + ClientCall.Listener writeResponses = write.listener; + Iterator> writeRequests = write.requestIterator(); + + // Prepare + Task writeInitRequest = writeRequests.next(); + Task listenInitRequest = listenRequests.next(); + + // Expect empty InitRequest from Listen stream. + assertThat(waitForResult(listenInitRequest)) + .isEqualTo(listenRequest(InitRequest.getDefaultInstance())); + + // No request should have come from Listen stream yet. + assertFalse(writeInitRequest.isComplete()); + + // Simulate a successful InitResponse from server. + waitForSuccess( + instance.enqueue(() -> listenResponses.onMessage(listenResponse(initResponse("token1"))))); + + // Now that Write handshake is complete, the Listen stream should send a InitRequest with token + // from Write handshake. + assertThat(waitForResult(writeInitRequest)).isEqualTo(writeRequest(initRequest("token1"))); + } + @NonNull private DocumentKey key(DocumentReference doc) { return DocumentKey.fromPathString(doc.getPath()); @@ -199,8 +589,24 @@ public SetMutation setMutation(DocumentReference doc, Map values } @NonNull - private WriteRequest writeRequestHandshake() { - return WriteRequest.newBuilder().setDatabase(getResourcePrefixValue(databaseId)).build(); + private ListenRequest listenRequest(InitRequest initRequest) { + return ListenRequest.newBuilder() + .setDatabase(getResourcePrefixValue(databaseId)) + .setInitRequest(initRequest) + .build(); + } + + @NonNull + private static ListenResponse listenResponse(InitResponse initResponse) { + return ListenResponse.newBuilder().setInitResponse(initResponse).build(); + } + + @NonNull + private WriteRequest writeRequest(InitRequest initRequest) { + return WriteRequest.newBuilder() + .setDatabase(getResourcePrefixValue(databaseId)) + .setInitRequest(initRequest) + .build(); } @NonNull @@ -212,6 +618,11 @@ private WriteRequest writeRequest(Write... writes) { return builder.build(); } + @NonNull + private static WriteResponse writeResponse(InitResponse initResponse) { + return WriteResponse.newBuilder().setInitResponse(initResponse).build(); + } + @NonNull private static WriteResponse writeResponse(WriteResult... writeResults) { WriteResponse.Builder builder = WriteResponse.newBuilder(); @@ -220,4 +631,33 @@ private static WriteResponse writeResponse(WriteResult... writeResults) { } return builder.build(); } + + @NonNull + private static InitResponse initResponse(String token) { + return InitResponse.newBuilder().setSessionToken(ByteString.copyFromUtf8(token)).build(); + } + + @NonNull + private static InitResponse initResponse(String token, boolean clearCache) { + return InitResponse.newBuilder() + .setSessionToken(ByteString.copyFromUtf8(token)) + .setClearCache(clearCache) + .build(); + } + + @NonNull + private static InitRequest initRequest(String token) { + return InitRequest.newBuilder().setSessionToken(ByteString.copyFromUtf8(token)).build(); + } + + @NonNull + private Task setupClearPersistenceTask() { + TaskCompletionSource clearPersistenceTask = new TaskCompletionSource<>(); + factory.setClearPersistenceMethod( + executor -> { + executor.execute(() -> clearPersistenceTask.setResult(null)); + return clearPersistenceTask.getTask(); + }); + return clearPersistenceTask.getTask(); + } } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/TestEventListener.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/TestEventListener.java new file mode 100644 index 00000000000..7fbeca8f3ca --- /dev/null +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/integration/TestEventListener.java @@ -0,0 +1,49 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.firebase.firestore.integration; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import com.google.android.gms.tasks.Task; +import com.google.firebase.firestore.EventListener; +import com.google.firebase.firestore.FirebaseFirestoreException; +import java.util.Iterator; + +/** + * EventListener test harness. + */ +class TestEventListener implements EventListener { + + AsyncTaskAccumulator events = new AsyncTaskAccumulator<>(); + + @Override + public synchronized void onEvent(@Nullable T value, @Nullable FirebaseFirestoreException error) { + if (error == null) { + events.onResult(value); + } else { + events.onException(error); + } + } + + @NonNull + public synchronized Task get(int index) { + return events.get(index); + } + + @NonNull + public Iterator> iterator() { + return events.iterator(); + } +} diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java index 320a5edb1be..f067991cf5b 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java @@ -25,6 +25,8 @@ import com.google.firebase.firestore.spec.SpecTestCase; import com.google.firebase.firestore.util.AsyncQueue; import com.google.firebase.firestore.util.Util; +import com.google.firestore.v1.InitResponse; +import com.google.protobuf.ByteString; import io.grpc.Status; import java.util.ArrayList; import java.util.HashMap; @@ -53,6 +55,7 @@ private class MockWatchStream extends WatchStream { @Override public void start() { hardAssert(!open, "Trying to start already started watch stream"); + handshakeComplete = false; open = true; listener.onOpen(); } @@ -62,6 +65,7 @@ public void stop() { super.stop(); activeTargets.clear(); open = false; + handshakeComplete = false; } @Override @@ -74,6 +78,18 @@ public boolean isOpen() { return open; } + @Override + void sendHandshake(ByteString sessionToken) { + hardAssert(!handshakeComplete, "Handshake already completed"); + handshakeComplete = true; + InitResponse initResponse = + InitResponse.newBuilder() + .setSessionToken(sessionToken == null ? ByteString.EMPTY : sessionToken) + .setClearCache(false) + .build(); + getWorkerQueue().enqueue(() -> listener.onHandshake(initResponse)); + } + @Override public void watchQuery(TargetData targetData) { String resumeToken = Util.toDebugString(targetData.getResumeToken()); @@ -172,11 +188,16 @@ public boolean isOpen() { } @Override - public void writeHandshake() { + public void sendHandshake(ByteString sessionToken) { hardAssert(!handshakeComplete, "Handshake already completed"); writeStreamRequestCount += 1; handshakeComplete = true; - listener.onHandshakeComplete(); + InitResponse initResponse = + InitResponse.newBuilder() + .setSessionToken(sessionToken == null ? ByteString.EMPTY : sessionToken) + .setClearCache(false) + .build(); + getWorkerQueue().enqueue(() -> listener.onHandshake(initResponse)); } @Override diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java index 8814b42baef..e482b5bbc35 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java @@ -171,7 +171,7 @@ public abstract class SpecTestCase implements RemoteStoreCallback { private boolean useEagerGcForMemory; private int maxConcurrentLimboResolutions; - private boolean networkEnabled = true; + private boolean networkEnabled = false; // // Parts of the Firestore system that the spec tests need to control. @@ -351,6 +351,7 @@ protected Datastore createDatastore(ComponentProvider.Configuration configuratio localStore = provider.getLocalStore(); syncEngine = provider.getSyncEngine(); eventManager = provider.getEventManager(); + remoteStore.enableNetwork(); } @Override @@ -1304,6 +1305,10 @@ private void runSteps(JSONArray steps, JSONObject config) throws Exception { backgroundExecutor.execute(() -> drainBackgroundQueue.setResult(null)); waitFor(drainBackgroundQueue.getTask()); + // while (!queue.isIdle()) { + // Thread.sleep(1); + // } + if (expectedSnapshotEvents != null) { log(" Validating expected snapshot events " + expectedSnapshotEvents); } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/util/AsyncQueueTest.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/util/AsyncQueueTest.java index b6f817ef4d8..d0aa8923ccb 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/util/AsyncQueueTest.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/util/AsyncQueueTest.java @@ -131,11 +131,9 @@ public void tasksAreScheduledWithRespectToShutdown() { // From this point on, `normal` tasks are not scheduled. Only those who explicitly request to // run after shutdown initiated will run. - queue.enqueueAndInitiateShutdown(runnableForStep(2)); + queue.shutdown(); queue.enqueueAndForget(runnableForStep(3)); - queue.enqueueAndForgetEvenAfterShutdown(runnableForStep(4)); - queue.getExecutor().execute(runnableForStep(5)); waitForExpectedSteps(); }