From bc3e7f60574dd25b0d3511c56d09d233d0c15f42 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Thu, 10 Oct 2024 12:30:16 -0700 Subject: [PATCH] [fix][broker] Allow broker to handle non-recoverable schema error only if SchemaLedgerForceRecovery flag is enabled (#23428) --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 ++- .../pulsar/broker/service/schema/ClientGetSchemaTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 76dd277159cf4..11f00fb28e34b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -671,7 +671,8 @@ public CompletableFuture hasSchema() { return brokerService.pulsar().getSchemaRegistryService().getSchema(getSchemaId()).thenApply(Objects::nonNull) .exceptionally(e -> { Throwable ex = e.getCause(); - if (ex instanceof SchemaException || !((SchemaException) ex).isRecoverable()) { + if (brokerService.pulsar().getConfig().isSchemaLedgerForceRecovery() + && (ex instanceof SchemaException && !((SchemaException) ex).isRecoverable())) { return false; } throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java index ec81f39fef92c..f9c1042b0e97e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java @@ -186,7 +186,7 @@ public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception { final String topicOne = "test-deleted-schema-ledger"; final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); - //pulsar.getConfig().setManagedLedgerForceRecovery(true); + pulsar.getConfig().setSchemaLedgerForceRecovery(true); admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test")); // (1) create topic with schema