Skip to content

Commit

Permalink
[fix][broker] fix delay queue sequence issue. (apache#24035)
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled authored Mar 3, 2025
1 parent 1eb7866 commit 998bb51
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
Expand All @@ -42,7 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack

// timestamp -> ledgerId -> entryId
// AVL tree -> OpenHashMap -> RoaringBitmap
protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap = new Long2ObjectAVLTreeMap<>();

// If we detect that all messages have fixed delay time, such that the delivery is
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
}

long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
updateTimer();
Expand Down Expand Up @@ -173,7 +173,7 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
}

LongSet ledgerIdToDelete = new LongOpenHashSet();
Long2ObjectMap<Roaring64Bitmap> ledgerMap = delayedMessageMap.get(timestamp);
Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap = delayedMessageMap.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -249,4 +252,27 @@ public void run(Timeout timeout) throws Exception {

timer.stop();
}

@Test(dataProvider = "delayedTracker")
public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exception {
assertFalse(tracker.hasMessageAvailable());

int messageCount = 5;
for(int i = 1; i <= messageCount; i++) {
assertTrue(tracker.addMessage(i, i, 1));
}
clockTime.set(10);
assertTrue(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), messageCount);

for (int i = 1; i <= messageCount; i++) {
Set<Position> scheduled = tracker.getScheduledMessages(1);
assertEquals(scheduled.size(), 1);
Position position = scheduled.iterator().next();
assertEquals(position.getLedgerId(), i);
assertEquals(position.getEntryId(), i);
}
tracker.close();
}

}

0 comments on commit 998bb51

Please sign in to comment.