diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index bdc6e4c814e33..5796fcbd78550 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -20,6 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timer; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; import java.time.Clock; import java.util.NavigableSet; import java.util.TreeSet; @@ -29,12 +35,15 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; -import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; +import org.roaringbitmap.longlong.Roaring64Bitmap; @Slf4j public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { - protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); + // timestamp -> ledgerId -> entryId + // AVL tree -> OpenHashMap -> RoaringBitmap + protected final Long2ObjectSortedMap> + delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -52,6 +61,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // Track whether we have seen all messages with fixed delay so far. private boolean messagesHaveFixedDelay = true; + // The bit count to trim to reduce memory occupation. + private final int timestampPrecisionBitCnt; + InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -66,6 +78,35 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum long fixedDelayDetectionLookahead) { super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + } + + /** + * The tick time is used to determine the precision of the delivery time. As the redelivery time + * is not accurate, we can bucket the delivery time and group multiple message ids into the same + * bucket to reduce the memory usage. THe default value is 1 second, which means we accept 1 second + * deviation for the delivery time, so that we can trim the lower 9 bits of the delivery time, because + * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s. + * @param tickTimeMillis + * @return + */ + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt > 0 ? bitCnt - 1 : 0; + } + + /** + * trim the lower bits of the timestamp to reduce the memory usage. + * @param timestamp + * @param bits + * @return + */ + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); } @Override @@ -80,7 +121,10 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { deliverAt - clock.millis()); } - priorityQueue.add(deliverAt, ledgerId, entryId); + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) + .add(entryId); updateTimer(); checkAndUpdateHighest(deliverAt); @@ -105,7 +149,8 @@ private void checkAndUpdateHighest(long deliverAt) { */ @Override public boolean hasMessageAvailable() { - boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime(); + boolean hasMessageAvailable = !delayedMessageMap.isEmpty() + && delayedMessageMap.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); } @@ -121,25 +166,49 @@ public NavigableSet getScheduledMessages(int maxMessages) { NavigableSet positions = new TreeSet<>(); long cutoffTime = getCutoffTime(); - while (n > 0 && !priorityQueue.isEmpty()) { - long timestamp = priorityQueue.peekN1(); + while (n > 0 && !delayedMessageMap.isEmpty()) { + long timestamp = delayedMessageMap.firstLongKey(); if (timestamp > cutoffTime) { break; } - long ledgerId = priorityQueue.peekN2(); - long entryId = priorityQueue.peekN3(); - positions.add(PositionFactory.create(ledgerId, entryId)); - - priorityQueue.pop(); - --n; + LongSet ledgerIdToDelete = new LongOpenHashSet(); + Long2ObjectMap ledgerMap = delayedMessageMap.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + int cardinality = (int) entryIds.getLongCardinality(); + if (cardinality <= n) { + entryIds.forEach(entryId -> { + positions.add(PositionFactory.create(ledgerId, entryId)); + }); + n -= cardinality; + ledgerIdToDelete.add(ledgerId); + } else { + long[] entryIdsArray = entryIds.toArray(); + for (int i = 0; i < n; i++) { + positions.add(PositionFactory.create(ledgerId, entryIdsArray[i])); + entryIds.removeLong(entryIdsArray[i]); + } + n = 0; + } + if (n <= 0) { + break; + } + } + for (long ledgerId : ledgerIdToDelete) { + ledgerMap.remove(ledgerId); + } + if (ledgerMap.isEmpty()) { + delayedMessageMap.remove(timestamp); + } } if (log.isDebugEnabled()) { log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size()); } - if (priorityQueue.isEmpty()) { + if (delayedMessageMap.isEmpty()) { // Reset to initial state highestDeliveryTimeTracked = 0; messagesHaveFixedDelay = true; @@ -151,24 +220,33 @@ public NavigableSet getScheduledMessages(int maxMessages) { @Override public CompletableFuture clear() { - this.priorityQueue.clear(); + this.delayedMessageMap.clear(); return CompletableFuture.completedFuture(null); } @Override public long getNumberOfDelayedMessages() { - return priorityQueue.size(); + return delayedMessageMap.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongCardinality).sum()).sum(); } + /** + * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate the memory usage of the buffer. + * The memory usage of the buffer is not accurate, because Roaring64Bitmap::getLongSizeInBytes will + * overestimate the memory usage of the buffer a lot. + * @return the memory usage of the buffer + */ @Override public long getBufferMemoryUsage() { - return priorityQueue.bytesCapacity(); + return delayedMessageMap.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); } @Override public void close() { super.close(); - priorityQueue.close(); } @Override @@ -181,6 +259,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead } protected long nextDeliveryTime() { - return priorityQueue.peekN1(); + return delayedMessageMap.firstLongKey(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index ff7763927d888..dc6f623c82b57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -92,7 +92,7 @@ public Object[][] provider(Method method) throws Exception { false, 0) }}; case "testAddMessageWithStrictDelay" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{ @@ -100,7 +100,7 @@ public Object[][] provider(Method method) throws Exception { true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100000, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" -> new Object[][]{{ @@ -108,7 +108,7 @@ public Object[][] provider(Method method) throws Exception { true, 0) }}; case "testWithFixedDelays", "testWithMixedDelays","testWithNoDelays" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, clock, true, 100) }}; default -> new Object[][]{{ @@ -230,7 +230,7 @@ public void run(Timeout timeout) throws Exception { return; } try { - this.priorityQueue.peekN1(); + this.delayedMessageMap.firstLongKey(); } catch (Exception e) { e.printStackTrace(); exceptions[0] = e;