diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index c81ebe8d6ffcd..bc480635babb3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -152,6 +152,18 @@ protected boolean replicateEntries(List entries) { continue; } + if (msg.isExpired(messageTTLInSeconds)) { + msgExpired.recordEvent(0 /* no value stat */); + if (log.isDebugEnabled()) { + log.debug("[{}] Discarding expired message at position {}, replicateTo {}", + replicatorId, entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bfc95b94e9631..2c6e2dec3ff4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2009,6 +2009,10 @@ public void checkMessageExpiry() { sub.expireMessages(messageTtlInSeconds); } }); + replicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); + shadowReplicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index f8a602f68b908..cb2e0457e36bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -77,6 +77,18 @@ protected boolean replicateEntries(List entries) { continue; } + if (msg.isExpired(messageTTLInSeconds)) { + msgExpired.recordEvent(0 /* no value stat */); + if (log.isDebugEnabled()) { + log.debug("[{}] Discarding expired message at position {}, replicateTo {}", + replicatorId, entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index bf2276fdf4155..d1d7358f346f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1781,36 +1781,36 @@ public void testReplicatorWithTTL() throws Exception { @Cleanup Producer persistentProducer1 = client1.newProducer().topic(topic.toString()).create(); + // Send V1 message, which will be replicated to the remote cluster by the replicator. persistentProducer1.send("V1".getBytes()); - waitReplicateFinish(topic, admin1); + // Pause replicator PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topic.toString()).get(); persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; - // Pause replicator pauseReplicator(persistentReplicator); }); + // Send V2 and V3 messages, then let them expire. These messages will not be replicated to the remote cluster. persistentProducer1.send("V2".getBytes()); persistentProducer1.send("V3".getBytes()); - Thread.sleep(1000); - admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1); + // Start replicator persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; persistentReplicator.startProducer(); }); - waitReplicateFinish(topic, admin1); + // Send V4 message, which will be replicated to the remote cluster. persistentProducer1.send("V4".getBytes()); - waitReplicateFinish(topic, admin1); + // Receive messages from the remote cluster: only V1 and V4 messages should be received. @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -1828,7 +1828,7 @@ public void testReplicatorWithTTL() throws Exception { result.add(new String(receive.getValue())); } - assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); + assertEquals(result, Lists.newArrayList("V1", "V4")); } @Test