From 76c1b4eb935969c6cc81ba1184a1bbb4475c085b Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Fri, 19 Apr 2024 11:28:19 -0700 Subject: [PATCH] [fix][broker] Use ForkJoinPool.commonPool to handle Metadata operations --- .../metadata/impl/AbstractMetadataStore.java | 28 +++++++++++-------- .../pulsar/metadata/MetadataStoreTest.java | 7 +++-- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 0a35664391455..b0039a3211273 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -542,27 +542,31 @@ public void invalidateCaches(String...paths) { } /** - * Run the task in the executor thread and fail the future if the executor is shutting down. + * Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally. */ @VisibleForTesting public void execute(Runnable task, CompletableFuture future) { - try { - executor.execute(task); - } catch (Throwable t) { - future.completeExceptionally(t); - } + CompletableFuture.runAsync(task).exceptionally(e -> { + if (!future.isDone()) { + future.completeExceptionally(e); + } + return null; + }); } /** - * Run the task in the executor thread and fail the future if the executor is shutting down. + * Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally. */ @VisibleForTesting public void execute(Runnable task, Supplier>> futures) { - try { - executor.execute(task); - } catch (final Throwable t) { - futures.get().forEach(f -> f.completeExceptionally(t)); - } + CompletableFuture.runAsync(task).exceptionally(e -> { + futures.get().forEach(f -> { + if (!f.isDone()) { + f.completeExceptionally(e); + } + }); + return null; + }); } protected static String parent(String path) { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index b1578188c681d..a8c7cf2735090 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -458,11 +458,12 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e @Cleanup ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + String forkJoinPoolNamePrefix = "ForkJoinPool.commonPool"; final Runnable verify = () -> { String currentThreadName = Thread.currentThread().getName(); - String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", - metadataStoreName, currentThreadName); - assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName), errorMessage); + String errorMessage = String.format("Expect to switch to thread %s*, but currently it is thread %s", + forkJoinPoolNamePrefix, currentThreadName); + assertTrue(currentThreadName.startsWith(forkJoinPoolNamePrefix), errorMessage); }; // put with node which has parent(but the parent node is not exists).