Skip to content

Commit

Permalink
[improve][client] PIP-409: support producer configuration for retry/d…
Browse files Browse the repository at this point in the history
…ead letter topic producer (#24020)

Co-authored-by: Lari Hotari <lhotari@apache.org>
  • Loading branch information
thetumbled and lhotari authored Mar 6, 2025
1 parent 3f45154 commit 302c1d5
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 20 deletions.
93 changes: 79 additions & 14 deletions pip/pip-409.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,90 @@ retry/dead letter topic.

## Design & Implementation Details

- Add two new configurations in `DeadLetterPolicy`:
- Add `DeadLetterProducerBuilderContext` and `DeadLetterProducerBuilderCustomizer` interface:
```java
public interface DeadLetterProducerBuilderContext {
/**
* Returns the default name of topic for the dead letter or retry letter producer. This topic name is used
* unless the DeadLetterProducerBuilderCustomizer overrides it.
*
* @return a {@code String} representing the input topic name
*/
String getDefaultTopicName();

/**
* Returns the name of the input topic for which the dead letter or retry letter producer is being configured.
*
* @return a {@code String} representing the input topic name
*/
String getInputTopicName();

/**
* Returns the name of the subscription for which the dead letter or retry letter producer is being configured.
*
* @return a {@code String} representing the subscription name
*/
String getInputTopicSubscriptionName();

/**
* Returns the name of the consumer for which the dead letter or
* retry letter producer is being configured.
* @return a {@code String} representing the consumer name
*/
String getInputTopicConsumerName();
}

public interface DeadLetterProducerBuilderCustomizer {
/**
* Customize the given producer builder with settings specific to the topic context provided.
*
* @param context the context containing information about the input topic and the subscription
* @param producerBuilder the producer builder instance to be customized
*/
void customize(DeadLetterProducerBuilderContext context, ProducerBuilder<byte[]> producerBuilder);
}
```

- Add two fields in `DeadLetterPolicy`:
```java
public class DeadLetterPolicy implements Serializable {
/**
* Function to build the producer for the retry letter topic.
* The input parameter is the topic name.
*/
private Function<String, ProducerBuilder<byte[]>> retryLetterProducerBuilder;

/**
* Function to build the producer for the dead letter topic.
* The input parameter is the topic name.
*/
private Function<String, ProducerBuilder<byte[]>> deadLetterProducerBuilder;
/**
* Customizer for configuring the producer builder for the retry letter topic.
*
* <p>This field holds a function that allows the caller to customize the producer builder
* settings for the retry letter topic before the producer is created. The customization logic
* can use the provided context (which includes input topic and subscription details) to adjust
* configurations such as timeouts, batching, or message routing.
*/
private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer;
/**
* Customizer for configuring the producer builder for the dead letter topic.
*
* <p>This field holds a function that allows the caller to customize the producer builder
* settings for the dead letter topic before the producer is created. Using the provided context,
* implementations can perform specific adjustments that ensure the dead letter queue operates
* with the appropriate configurations tailored for handling undeliverable messages.
*/
private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer;
}
```

- use the `retryLetterProducerBuilder` to build the producer for retry topic, and use the
`deadLetterProducerBuilder` to build the producer for dead letter topic.
- use the `DeadLetterProducerBuilderCustomizer` to customize the producer of retry/dead letter topic like this:
```java
// enable batch
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
producerBuilder.enableBatching(true);
};
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscriptionName)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
.build())
.subscribe();
```


# Backward & Forward Compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,94 @@ public void testDeadLetterTopicWithInitialSubscription() throws Exception {
consumer.close();
}

@Test()
public void testDeadLetterTopicWithProducerBuilder() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-producer-builder";
final int maxRedeliveryCount = 2;
final int sendMessages = 100;

// enable batch
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
producerBuilder.enableBatching(true);
};
String subscriptionName = "my-subscription";
String subscriptionNameDLQ = "my-subscription-DLQ";
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.initialSubscriptionName(subscriptionNameDLQ)
.deadLetterProducerBuilderCustomizer(producerBuilderCustomizer)
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
.build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic(topic + "-" + subscriptionName + "-DLQ")
.subscriptionName(subscriptionNameDLQ)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.enableBatching(true)
.create();

Map<Integer, String> messageContent = new HashMap<>();
for (int i = 0; i < sendMessages; i++) {
String data = String.format("Hello Pulsar [%d]", i);
producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send();
messageContent.put(i, data);
}
producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(message, "The consumer should be able to receive messages.");
log.info("consumer received message : {}", message.getMessageId());
totalReceived++;
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
assertNotNull(message, "the deadLetterConsumer should receive messages.");
assertEquals(new String(message.getData()), messageContent.get(Integer.parseInt(message.getKey())));
messageContent.remove(Integer.parseInt(message.getKey()));
log.info("dead letter consumer received message : {}", message.getMessageId());
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
assertTrue(messageContent.isEmpty());

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
}

private CompletableFuture<Void> consumerReceiveForDLQ(Consumer<byte[]> consumer, AtomicInteger totalReceived,
int sendMessages, int maxRedeliveryCount) {
return CompletableFuture.runAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,83 @@ public void testRetryTopic() throws Exception {
checkConsumer.close();
}

@Test
public void testRetryTopicWithProducerBuilder() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic-with-producer-builder";
final int maxRedeliveryCount = 2;
final int sendMessages = 100;

// enable batch
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
producerBuilder.enableBatching(true);
};
String subscriptionName = "my-subscription";
String subscriptionNameDLQ = "my-subscription-DLQ";
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
.build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic(topic + "-" + subscriptionName + "-DLQ")
.subscriptionName(subscriptionNameDLQ)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message<byte[]> message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
}

/**
* Retry topic feature relies on the delay queue feature when consumer produce a delayed message
* to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,24 @@ public class DeadLetterPolicy implements Serializable {
* to be created.
*/
private String initialSubscriptionName;

/**
* Customizer for configuring the producer builder for the retry letter topic.
*
* <p>This field holds a function that allows the caller to customize the producer builder
* settings for the retry letter topic before the producer is created. The customization logic
* can use the provided context (which includes input topic and subscription details) to adjust
* configurations such as timeouts, batching, or message routing.
*/
private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer;

/**
* Customizer for configuring the producer builder for the dead letter topic.
*
* <p>This field holds a function that allows the caller to customize the producer builder
* settings for the dead letter topic before the producer is created. Using the provided context,
* implementations can perform specific adjustments that ensure the dead letter queue operates
* with the appropriate configurations tailored for handling undeliverable messages.
*/
private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Provides context information required for customizing a producer builder.
*
* <p>This interface supplies relevant details such as the name of the input topic and associated subscription name.
* This contextual information helps in correctly configuring the producer for the appropriate topic.
*/
public interface DeadLetterProducerBuilderContext {
/**
* Returns the default name of topic for the dead letter or retry letter producer. This topic name is used
* unless the ProducerBuilderCustomizer overrides it.
*
* @return a {@code String} representing the input topic name
*/
String getDefaultTopicName();

/**
* Returns the name of the input topic for which the dead letter or retry letter producer is being configured.
*
* @return a {@code String} representing the input topic name
*/
String getInputTopicName();

/**
* Returns the name of the subscription for which the dead letter or retry letter producer is being configured.
*
* @return a {@code String} representing the subscription name
*/
String getInputTopicSubscriptionName();

/**
* Returns the name of the consumer for which the dead letter or
* retry letter producer is being configured.
* @return a {@code String} representing the consumer name
*/
String getInputTopicConsumerName();
}


Loading

0 comments on commit 302c1d5

Please sign in to comment.