Skip to content

Commit

Permalink
[improve][broker] Reduce memory occupation of the delayed message que…
Browse files Browse the repository at this point in the history
…ue (#23611)

(cherry picked from commit d33cc20)
  • Loading branch information
thetumbled authored and lhotari committed Nov 22, 2024
1 parent c73a29d commit fcd4436
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
Expand All @@ -29,12 +35,15 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.longlong.Roaring64Bitmap;

@Slf4j
public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
// timestamp -> ledgerId -> entryId
// AVL tree -> OpenHashMap -> RoaringBitmap
protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
delayedMessageMap = new Long2ObjectAVLTreeMap<>();

// If we detect that all messages have fixed delay time, such that the delivery is
// always going to be in FIFO order, then we can avoid pulling all the messages in
Expand All @@ -52,6 +61,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

// The bit count to trim to reduce memory occupation.
private final int timestampPrecisionBitCnt;

InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand All @@ -66,6 +78,35 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis);
}

/**
* The tick time is used to determine the precision of the delivery time. As the redelivery time
* is not accurate, we can bucket the delivery time and group multiple message ids into the same
* bucket to reduce the memory usage. THe default value is 1 second, which means we accept 1 second
* deviation for the delivery time, so that we can trim the lower 9 bits of the delivery time, because
* 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
* @param tickTimeMillis
* @return
*/
private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
int bitCnt = 0;
while (tickTimeMillis > 0) {
tickTimeMillis >>= 1;
bitCnt++;
}
return bitCnt > 0 ? bitCnt - 1 : 0;
}

/**
* trim the lower bits of the timestamp to reduce the memory usage.
* @param timestamp
* @param bits
* @return
*/
private static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

@Override
Expand All @@ -80,7 +121,10 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
deliverAt - clock.millis());
}

priorityQueue.add(deliverAt, ledgerId, entryId);
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
updateTimer();

checkAndUpdateHighest(deliverAt);
Expand All @@ -105,7 +149,8 @@ private void checkAndUpdateHighest(long deliverAt) {
*/
@Override
public boolean hasMessageAvailable() {
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
&& delayedMessageMap.firstLongKey() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
Expand All @@ -121,25 +166,49 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
NavigableSet<Position> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
long timestamp = priorityQueue.peekN1();
while (n > 0 && !delayedMessageMap.isEmpty()) {
long timestamp = delayedMessageMap.firstLongKey();
if (timestamp > cutoffTime) {
break;
}

long ledgerId = priorityQueue.peekN2();
long entryId = priorityQueue.peekN3();
positions.add(PositionFactory.create(ledgerId, entryId));

priorityQueue.pop();
--n;
LongSet ledgerIdToDelete = new LongOpenHashSet();
Long2ObjectMap<Roaring64Bitmap> ledgerMap = delayedMessageMap.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
int cardinality = (int) entryIds.getLongCardinality();
if (cardinality <= n) {
entryIds.forEach(entryId -> {
positions.add(PositionFactory.create(ledgerId, entryId));
});
n -= cardinality;
ledgerIdToDelete.add(ledgerId);
} else {
long[] entryIdsArray = entryIds.toArray();
for (int i = 0; i < n; i++) {
positions.add(PositionFactory.create(ledgerId, entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
n = 0;
}
if (n <= 0) {
break;
}
}
for (long ledgerId : ledgerIdToDelete) {
ledgerMap.remove(ledgerId);
}
if (ledgerMap.isEmpty()) {
delayedMessageMap.remove(timestamp);
}
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size());
}

if (priorityQueue.isEmpty()) {
if (delayedMessageMap.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
Expand All @@ -151,24 +220,33 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {

@Override
public CompletableFuture<Void> clear() {
this.priorityQueue.clear();
this.delayedMessageMap.clear();
return CompletableFuture.completedFuture(null);
}

@Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
return delayedMessageMap.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

/**
* This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate the memory usage of the buffer.
* The memory usage of the buffer is not accurate, because Roaring64Bitmap::getLongSizeInBytes will
* overestimate the memory usage of the buffer a lot.
* @return the memory usage of the buffer
*/
@Override
public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
return delayedMessageMap.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
}

@Override
public void close() {
super.close();
priorityQueue.close();
}

@Override
Expand All @@ -181,6 +259,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
}

protected long nextDeliveryTime() {
return priorityQueue.peekN1();
return delayedMessageMap.firstLongKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,23 @@ public Object[][] provider(Method method) throws Exception {
false, 0)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, 0)
}};
case "testWithFixedDelays", "testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock,
new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, clock,
true, 100)
}};
default -> new Object[][]{{
Expand Down Expand Up @@ -230,7 +230,7 @@ public void run(Timeout timeout) throws Exception {
return;
}
try {
this.priorityQueue.peekN1();
this.delayedMessageMap.firstLongKey();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
Expand Down

0 comments on commit fcd4436

Please sign in to comment.