Skip to content

Commit

Permalink
Revert "[improve] [broker] Make the estimated entry size more accurate (
Browse files Browse the repository at this point in the history
apache#23931)"

This reverts commit 42aab41.
  • Loading branch information
mukesh-ctds committed Feb 28, 2025
1 parent 261ead1 commit ca3f6c1
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -3711,51 +3710,26 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
int maxEntriesBasedOnSize =
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) {
Position posToRead = readPosition;
if (!ml.isValidPosition(readPosition)) {
posToRead = ml.getNextValidPosition(readPosition);
double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
}
long result = 0;
long remainingBytesSize = bytesSize;

while (remainingBytesSize > 0) {
// Last ledger.
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
// Only read 1 entry if no entries to read.
return 1;
}
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
result += remainingBytesSize / avg;
break;
}
// Skip empty ledger.
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
continue;
}
// Calculate entries by average of ledgers.
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
result += remainingBytesSize / avg;
break;
} else {
// Calculate for the next ledger.
result += remainEntriesOfLedger;
remainingBytesSize -= remainEntriesOfLedger * avg;
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
}
if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
}

int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
}
return Math.max(result, 1);

return Math.min(maxEntriesBasedOnSize, maxEntries);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
protected volatile long currentLedgerSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Posit
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
originalCallback, ctx);
} else {
long estimatedEntrySize = getEstimatedEntrySize(lh);
long estimatedEntrySize = getEstimatedEntrySize();
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
if (log.isDebugEnabled()) {
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
Expand Down Expand Up @@ -418,12 +418,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Pos
}

@VisibleForTesting
public long getEstimatedEntrySize(ReadHandle lh) {
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
// No entries stored.
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
public long getEstimatedEntrySize() {
long estimatedEntrySize = getAvgEntrySize();
if (estimatedEntrySize == 0) {
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
}
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}

private long getAvgEntrySize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ public void testPreciseLimitation(String missingCase) throws Exception {
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
long remainingBytes =limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
Expand All @@ -164,7 +165,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
}).start();

long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
Expand All @@ -177,7 +178,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
Expand All @@ -188,6 +191,8 @@ public void testPreciseLimitation(String missingCase) throws Exception {

readCompleteSignal2.countDown();
cb2.entries.join();
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
Expand All @@ -199,7 +204,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
}

private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
return entriesCount * perEntrySize;
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
}

class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -682,15 +681,13 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

for (int i = 0; i < 100; i++) {
ledger.addEntry(new byte[(int) (1024)]);
ledger.addEntry(new byte[1024]);
}

// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
readAndCheck(cursor, 10, 3 * avg, 3);
// First time, since we don't have info, we'll get 1 single entry
readAndCheck(cursor, 10, 3 * 1024, 1);
// We should only return 3 entries, based on the max size
readAndCheck(cursor, 20, 3 * avg, 3);
readAndCheck(cursor, 20, 3 * 1024, 3);
// If maxSize is < avg, we should get 1 entry
readAndCheck(cursor, 10, 500, 1);
}
Expand Down Expand Up @@ -3888,15 +3885,13 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ledger.addEntry(new byte[1024]);
}

// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
// First time, since we don't have info, we'll get 1 single entry
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
assertEquals(entries.size(), 1);
entries.forEach(Entry::release);

// We should only return 3 entries, based on the max size
entries = c.readEntriesOrWait(10, 3 * avg);
entries = c.readEntriesOrWait(10, 3 * 1024);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

Expand Down Expand Up @@ -4803,82 +4798,5 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
public void testEstimateEntryCountBySize() throws Exception {
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
long entryCount0 =
ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml);
assertEquals(entryCount0, 1);
// Avoid trimming ledgers.
ml.openCursor("c1");

// Build data.
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1});
}
long ledger1 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2});
}
long ledger2 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2, 3, 4});
}
long ledger3 = ml.getCurrentLedger().getId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);

// Test: the individual ledgers.
long entryCount1 =
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount1, 16);
long entryCount2 =
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml);
assertEquals(entryCount2, 8);
long entryCount3 =
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml);
assertEquals(entryCount3, 4);

// Test: across ledgers.
long entryCount4 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount4, 108);
long entryCount5 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml);
assertEquals(entryCount5, 104);
long entryCount6 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount6, 204);

long entryCount7 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml);
assertEquals(entryCount7, 28);
long entryCount8 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml);
assertEquals(entryCount8, 24);
long entryCount9 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml);
assertEquals(entryCount9, 124);

// Test: read more than entries written.
long entryCount10 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount10, 304);

// cleanup.
ml.delete();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testBatchMessageAck() {
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.receiverQueueSize(50)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -114,29 +114,27 @@ public void testBatchMessageAck() {
consumer.acknowledge(receive1);
consumer.acknowledge(receive2);
Awaitility.await().untilAsserted(() -> {
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
// broker will deliver much messages than before. So edit 18 -> 38 here.
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
});
Message<byte[]> receive3 = consumer.receive();
Message<byte[]> receive4 = consumer.receive();
consumer.acknowledge(receive3);
consumer.acknowledge(receive4);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
consumer.pause();
Message<byte[]> receive5 = consumer.receive();
consumer.negativeAcknowledge(receive5);
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
});
// Unblock cmd-flow.
consumer.resume();
consumer.receive();
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,8 @@ public void testAvgMessagesPerEntry() throws Exception {
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
.create();
// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
for (int i = 0; i < 20; i++) {
producer.send("first-message");
}
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".

producer.send("first-message");
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
futures.add(producer.sendAsync("message"));
Expand Down Expand Up @@ -428,7 +423,6 @@ public void testAvgMessagesPerEntry() throws Exception {
metadataConsumer.put("matchValueReschedule", "producer2");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
.receiverQueueSize(20)
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

int counter = 0;
Expand All @@ -443,17 +437,14 @@ public void testAvgMessagesPerEntry() throws Exception {
}
}

assertEquals(40, counter);
assertEquals(21, counter);

ConsumerStats consumerStats =
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);

assertEquals(40, consumerStats.getMsgOutCounter());
assertEquals(21, consumerStats.getMsgOutCounter());

// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
// Math.round(1 * 0.9 + 0.1 * (20 / 1))
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);

Check failure on line 449 in pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 1

ConsumerStatsTest.testAvgMessagesPerEntry

expected:<3> but was:<11>
}
Expand Down

0 comments on commit ca3f6c1

Please sign in to comment.