diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index ab26949c04fc6..f624b0105341b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -261,6 +262,67 @@ public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception { consumer.close(); } + @Test + public void testDeadLetterTopicMessagesWithEventTime() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + long testEventTime = Instant.now().toEpochMilli(); + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .eventTime(testEventTime) + .value(String.format("Hello Pulsar, eventTime: [%d]", testEventTime).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), + new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getEventTime(), testEventTime); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + public void testDeadLetterTopicWithProducerName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final String subscription = "my-subscription"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 2b897760b6f00..8cb595a685408 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.time.Instant; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -300,6 +301,7 @@ public void testRetryTopicProperties() throws Exception { byte[] key = "key".getBytes(); byte[] orderingKey = "orderingKey".getBytes(); + long eventTime = Instant.now().toEpochMilli(); final int maxRedeliveryCount = 3; @@ -333,6 +335,7 @@ public void testRetryTopicProperties() throws Exception { .value(String.format("Hello Pulsar [%d]", i).getBytes()) .keyBytes(key) .orderingKey(orderingKey) + .eventTime(eventTime) .send(); originMessageIds.add(msgId.toString()); } @@ -350,6 +353,7 @@ public void testRetryTopicProperties() throws Exception { assertEquals(message.getKeyBytes(), key); assertTrue(message.hasOrderingKey()); assertEquals(message.getOrderingKey(), orderingKey); + assertEquals(message.getEventTime(), eventTime); retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); @@ -373,6 +377,7 @@ public void testRetryTopicProperties() throws Exception { assertEquals(message.getKeyBytes(), key); assertTrue(message.hasOrderingKey()); assertEquals(message.getOrderingKey(), orderingKey); + assertEquals(message.getEventTime(), eventTime); deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } deadLetterConsumer.acknowledge(message); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4691c402b2fef..26414583b90a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -717,6 +717,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .value(retryMessage.getData()) .properties(propertiesMap); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { consumerDlqMessagesCounter.increment(); @@ -749,6 +750,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a typedMessageBuilderNew.deliverAfter(delayTime, unit); } copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose( __ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) @@ -824,6 +826,11 @@ private MessageImpl getMessageImpl(Message message) { return null; } + private static void copyMessageEventTime(Message message, + TypedMessageBuilder typedMessageBuilderNew) { + typedMessageBuilderNew.eventTime(message.getEventTime()); + } + @Override public void negativeAcknowledge(MessageId messageId) { consumerNacksCounter.increment(); @@ -2242,6 +2249,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(messageId); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index d90c2e8828364..8ef9079091aa2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -186,7 +186,6 @@ public TypedMessageBuilder properties(Map properties) { @Override public TypedMessageBuilder eventTime(long timestamp) { - checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); msgMetadata.setEventTime(timestamp); return this; }