Skip to content

Commit

Permalink
keep compatibility with old logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Feb 27, 2025
1 parent 9432a60 commit f2e4ac4
Showing 1 changed file with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2294,13 +2294,25 @@ private CompletableFuture<Producer<byte[]>> initDeadLetterProducerIfNeeded() {
p = deadLetterProducer;
if (p == null || p.isCompletedExceptionally()) {
p = createProducerWithBackOff(() -> {
CompletableFuture<Producer<byte[]>> newProducer =
((ProducerBuilderImpl<byte[]>) this.deadLetterPolicy.getDeadLetterProducerBuilder()
.apply(this.deadLetterPolicy.getDeadLetterTopic()))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.createAsync();
ProducerBuilderImpl<byte[]> builder;
if (deadLetterPolicy.getDeadLetterProducerBuilder() != null) {
builder = (ProducerBuilderImpl<byte[]>) deadLetterPolicy.getDeadLetterProducerBuilder()
.apply(deadLetterPolicy.getDeadLetterTopic());
builder.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getDeadLetterTopic());
} else {
// keep compatibility with old configuration
builder = (ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(deadLetterPolicy.getDeadLetterTopic())
.producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName, RandomStringUtils.randomAlphanumeric(5)))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true);
builder.initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName());
}
CompletableFuture<Producer<byte[]>> newProducer = builder.createAsync();
newProducer.whenComplete((producer, ex) -> {
if (ex != null) {
log.error("[{}] [{}] [{}] Failed to create dead letter producer for topic {}",
Expand Down Expand Up @@ -2360,12 +2372,21 @@ private CompletableFuture<Producer<byte[]>> initRetryLetterProducerIfNeeded() {
p = retryLetterProducer;
if (p == null || p.isCompletedExceptionally()) {
p = createProducerWithBackOff(() -> {
CompletableFuture<Producer<byte[]>> newProducer =
((ProducerBuilderImpl<byte[]>) this.deadLetterPolicy.getRetryLetterProducerBuilder()
.apply(this.deadLetterPolicy.getRetryLetterTopic()))
.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.createAsync();
ProducerBuilderImpl<byte[]> builder;
if (deadLetterPolicy.getRetryLetterProducerBuilder() != null) {
builder = (ProducerBuilderImpl<byte[]>) deadLetterPolicy.getRetryLetterProducerBuilder()
.apply(deadLetterPolicy.getRetryLetterTopic());
builder.schema(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic());
} else {
// keep compatibility with old configuration
builder = (ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(deadLetterPolicy.getRetryLetterTopic())
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true);
}
CompletableFuture<Producer<byte[]>> newProducer = builder.createAsync();
newProducer.whenComplete((producer, ex) -> {
if (ex != null) {
log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}",
Expand Down

0 comments on commit f2e4ac4

Please sign in to comment.