Skip to content

Commit

Permalink
update pip.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Mar 6, 2025
1 parent dcdcb35 commit 6064fc2
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions pip/pip-409.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ retry/dead letter topic.

## Design & Implementation Details

- Add `ProducerBuilderContext` and `ProducerBuilderCustomizer` interface:
- Add `DeadLetterProducerBuilderContext` and `DeadLetterProducerBuilderCustomizer` interface:
```java
public interface ProducerBuilderContext {
public interface DeadLetterProducerBuilderContext {
/**
* Returns the default name of topic for the dead letter or retry letter producer. This topic name is used
* unless the ProducerBuilderCustomizer overrides it.
Expand All @@ -57,9 +57,16 @@ public interface ProducerBuilderContext {
* @return a {@code String} representing the subscription name
*/
String getInputTopicSubscriptionName();

/**
* Returns the name of the consumer for which the dead letter or
* retry letter producer is being configured.
* @return a {@code String} representing the consumer name
*/
String getInputTopicConsumerName();
}

public interface ProducerBuilderCustomizer {
public interface DeadLetterProducerBuilderCustomizer {
/**
* Customize the given producer builder with settings specific to the topic context provided.
*
Expand All @@ -81,7 +88,7 @@ public class DeadLetterPolicy implements Serializable {
* can use the provided context (which includes input topic and subscription details) to adjust
* configurations such as timeouts, batching, or message routing.
*/
private ProducerBuilderCustomizer retryLetterProducerBuilderCustomizer = DEFAULT_PRODUCER_BUILDER_CUSTOMIZER;
private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer;
/**
* Customizer for configuring the producer builder for the dead letter topic.
*
Expand All @@ -90,19 +97,14 @@ public class DeadLetterPolicy implements Serializable {
* implementations can perform specific adjustments that ensure the dead letter queue operates
* with the appropriate configurations tailored for handling undeliverable messages.
*/
private ProducerBuilderCustomizer deadLetterProducerBuilderCustomizer = DEFAULT_PRODUCER_BUILDER_CUSTOMIZER;

// keep compatibility with old configuration
private static final ProducerBuilderCustomizer DEFAULT_PRODUCER_BUILDER_CUSTOMIZER =
(context, builder) ->
builder.enableBatching(false).enableChunking(true).blockIfQueueFull(false);
private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer;
}
```

- use the `ProducerBuilderCustomizer` to customize the producer of retry/dead letter topic like this:
- use the `DeadLetterProducerBuilderCustomizer` to customize the producer of retry/dead letter topic like this:
```java
// enable batch
ProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
producerBuilder.enableBatching(true);
};
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
Expand Down

0 comments on commit 6064fc2

Please sign in to comment.