Skip to content

Commit

Permalink
refactor implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Feb 26, 2025
1 parent 315264d commit 9432a60
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.function.Function;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -64,12 +66,14 @@ public class DeadLetterPolicy implements Serializable {
private String initialSubscriptionName;

/**
* Configuration used to create a producer that will send messages to the dead letter topic.
* Function to build the producer for the retry letter topic.
* The input parameter is the topic name.
*/
private DeadLetterProducerConfig deadLetterProducerConfig;
private Function<String, ProducerBuilder<byte[]>> retryLetterProducerBuilder;

/**
* Configuration used to create a producer that will send messages to the retry letter topic.
* Function to build the producer for the dead letter topic.
* The input parameter is the topic name.
*/
private DeadLetterProducerConfig retryLetterProducerConfig;
private Function<String, ProducerBuilder<byte[]>> deadLetterProducerBuilder;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2295,13 +2295,11 @@ private CompletableFuture<Producer<byte[]>> initDeadLetterProducerIfNeeded() {
if (p == null || p.isCompletedExceptionally()) {
p = createProducerWithBackOff(() -> {
CompletableFuture<Producer<byte[]>> newProducer =
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
((ProducerBuilderImpl<byte[]>) this.deadLetterPolicy.getDeadLetterProducerBuilder()
.apply(this.deadLetterPolicy.getDeadLetterTopic()))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.producerName(
String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName, RandomStringUtils.randomAlphanumeric(5)))
.loadConf(this.deadLetterPolicy.getDeadLetterProducerConfig().toMap())
.createAsync();
newProducer.whenComplete((producer, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -2362,11 +2360,12 @@ private CompletableFuture<Producer<byte[]>> initRetryLetterProducerIfNeeded() {
p = retryLetterProducer;
if (p == null || p.isCompletedExceptionally()) {
p = createProducerWithBackOff(() -> {
CompletableFuture<Producer<byte[]>> newProducer = client
.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.loadConf(this.deadLetterPolicy.getRetryLetterProducerConfig().toMap())
.createAsync();
CompletableFuture<Producer<byte[]>> newProducer =
((ProducerBuilderImpl<byte[]>) this.deadLetterPolicy.getRetryLetterProducerBuilder()
.apply(this.deadLetterPolicy.getRetryLetterTopic()))
.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.createAsync();
newProducer.whenComplete((producer, ex) -> {
if (ex != null) {
log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}",
Expand Down

0 comments on commit 9432a60

Please sign in to comment.