From 9432a60d6e07b6a412911601433eb2dd4549ef2c Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 26 Feb 2025 17:17:11 +0800 Subject: [PATCH] refactor implementation. --- .../pulsar/client/api/DeadLetterPolicy.java | 12 ++- .../client/api/DeadLetterProducerConfig.java | 75 ------------------- .../pulsar/client/impl/ConsumerImpl.java | 19 +++-- 3 files changed, 17 insertions(+), 89 deletions(-) delete mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterProducerConfig.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java index c68e3e059ec2b..139b231d0e10b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.api; import java.io.Serializable; +import java.util.function.Function; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -64,12 +66,14 @@ public class DeadLetterPolicy implements Serializable { private String initialSubscriptionName; /** - * Configuration used to create a producer that will send messages to the dead letter topic. + * Function to build the producer for the retry letter topic. + * The input parameter is the topic name. */ - private DeadLetterProducerConfig deadLetterProducerConfig; + private Function> retryLetterProducerBuilder; /** - * Configuration used to create a producer that will send messages to the retry letter topic. + * Function to build the producer for the dead letter topic. + * The input parameter is the topic name. */ - private DeadLetterProducerConfig retryLetterProducerConfig; + private Function> deadLetterProducerBuilder; } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterProducerConfig.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterProducerConfig.java deleted file mode 100644 index b2502afe438d4..0000000000000 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterProducerConfig.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.apache.pulsar.client.api; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -/** - * Configuration used to create a producer that will send messages to - * the dead letter topic and retry topic. - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class DeadLetterProducerConfig implements Serializable { - private static final long serialVersionUID = 1L; - - // refer to default values in ProducerConfigurationData - public static final int DEFAULT_BATCHING_MAX_MESSAGES = 1000; - public static final int DEFAULT_MAX_PENDING_MESSAGES = 0; - - /** - * @see ProducerBuilder#blockIfQueueFull(boolean) - */ - private boolean blockIfQueueFull = false; - - /** - * @see ProducerBuilder#batchingMaxMessages(int) - */ - private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES; - - /** - * @see ProducerBuilder#enableBatching(boolean) - * default is false to keep the same behavior as before - * while the default value in ProducerConfigurationData is true - */ - private boolean batchingEnabled = false; - - /** - * @see ProducerBuilder#batchingMaxMessages(int) - */ - private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES; - - /** - * @see ProducerBuilder#batchingMaxBytes(int) - */ - private int batchingMaxBytes = 128 * 1024; - - /** - * @see ProducerBuilder#enableChunking(boolean) - * default is true to keep the same behavior as before - * while the default value in ProducerConfigurationData is false - */ - private boolean chunkingEnabled = true; - - /** - * @see ProducerBuilder#chunkMaxMessageSize(int) - */ - private int chunkMaxMessageSize = -1; - - public Map toMap() { - Map map = new HashMap<>(); - map.put("blockIfQueueFull", blockIfQueueFull); - map.put("maxPendingMessages", maxPendingMessages); - map.put("batchingEnabled", batchingEnabled); - map.put("batchingMaxMessages", batchingMaxMessages); - map.put("batchingMaxBytes", batchingMaxBytes); - map.put("chunkingEnabled", chunkingEnabled); - map.put("chunkMaxMessageSize", chunkMaxMessageSize); - return map; - } -} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 422dda620a135..e224513093943 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2295,13 +2295,11 @@ private CompletableFuture> initDeadLetterProducerIfNeeded() { if (p == null || p.isCompletedExceptionally()) { p = createProducerWithBackOff(() -> { CompletableFuture> newProducer = - ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) + ((ProducerBuilderImpl) this.deadLetterPolicy.getDeadLetterProducerBuilder() + .apply(this.deadLetterPolicy.getDeadLetterTopic())) .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) + .schema(Schema.AUTO_PRODUCE_BYTES(schema)) .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .producerName( - String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, - this.consumerName, RandomStringUtils.randomAlphanumeric(5))) - .loadConf(this.deadLetterPolicy.getDeadLetterProducerConfig().toMap()) .createAsync(); newProducer.whenComplete((producer, ex) -> { if (ex != null) { @@ -2362,11 +2360,12 @@ private CompletableFuture> initRetryLetterProducerIfNeeded() { p = retryLetterProducer; if (p == null || p.isCompletedExceptionally()) { p = createProducerWithBackOff(() -> { - CompletableFuture> newProducer = client - .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .loadConf(this.deadLetterPolicy.getRetryLetterProducerConfig().toMap()) - .createAsync(); + CompletableFuture> newProducer = + ((ProducerBuilderImpl) this.deadLetterPolicy.getRetryLetterProducerBuilder() + .apply(this.deadLetterPolicy.getRetryLetterTopic())) + .schema(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .createAsync(); newProducer.whenComplete((producer, ex) -> { if (ex != null) { log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}",