diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 5796fcbd78550..a48ed4161386d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -22,7 +22,7 @@ import io.netty.util.Timer; import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.longs.LongSet; @@ -42,7 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // timestamp -> ledgerId -> entryId // AVL tree -> OpenHashMap -> RoaringBitmap - protected final Long2ObjectSortedMap> + protected final Long2ObjectSortedMap> delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is @@ -122,7 +122,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) .add(entryId); updateTimer(); @@ -173,7 +173,7 @@ public NavigableSet getScheduledMessages(int maxMessages) { } LongSet ledgerIdToDelete = new LongOpenHashSet(); - Long2ObjectMap ledgerMap = delayedMessageMap.get(timestamp); + Long2ObjectSortedMap ledgerMap = delayedMessageMap.get(timestamp); for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index dc6f623c82b57..92d4719a5a248 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -34,9 +34,12 @@ import java.lang.reflect.Method; import java.time.Clock; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -249,4 +252,27 @@ public void run(Timeout timeout) throws Exception { timer.stop(); } + + @Test(dataProvider = "delayedTracker") + public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exception { + assertFalse(tracker.hasMessageAvailable()); + + int messageCount = 5; + for(int i = 1; i <= messageCount; i++) { + assertTrue(tracker.addMessage(i, i, 1)); + } + clockTime.set(10); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), messageCount); + + for (int i = 1; i <= messageCount; i++) { + Set scheduled = tracker.getScheduledMessages(1); + assertEquals(scheduled.size(), 1); + Position position = scheduled.iterator().next(); + assertEquals(position.getLedgerId(), i); + assertEquals(position.getEntryId(), i); + } + tracker.close(); + } + }