Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds (27 Feb) #371

Merged
merged 13 commits into from
Mar 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>develocity-maven-extension</artifactId>
<version>1.22.2</version>
<version>1.23.1</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -3710,26 +3711,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
int maxEntriesBasedOnSize =
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
}

if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) {
Position posToRead = readPosition;
if (!ml.isValidPosition(readPosition)) {
posToRead = ml.getNextValidPosition(readPosition);
}
long result = 0;
long remainingBytesSize = bytesSize;

int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
while (remainingBytesSize > 0) {
// Last ledger.
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
// Only read 1 entry if no entries to read.
return 1;
}
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
result += remainingBytesSize / avg;
break;
}
// Skip empty ledger.
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
continue;
}
// Calculate entries by average of ledgers.
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
result += remainingBytesSize / avg;
break;
} else {
// Calculate for the next ledger.
result += remainEntriesOfLedger;
remainingBytesSize -= remainEntriesOfLedger * avg;
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
}
}

return Math.min(maxEntriesBasedOnSize, maxEntries);
return Math.max(result, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
protected volatile long currentLedgerSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Gauge;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.jctools.queues.SpscArrayQueue;

@Slf4j
public class InflightReadsLimiter {
Expand All @@ -37,6 +37,7 @@ public class InflightReadsLimiter {
.help("Estimated number of bytes retained by data read from storage or cache")
.register();

private final int maxReadsInFlightAcquireQueueSize;
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_available_inflight_bytes")
Expand Down Expand Up @@ -64,9 +65,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui
this.remainingBytes = maxReadsInFlightSize;
this.acquireTimeoutMillis = acquireTimeoutMillis;
this.timeOutExecutor = timeOutExecutor;
this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize;
if (maxReadsInFlightSize > 0) {
enabled = true;
this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
this.queuedHandles = new ArrayDeque<>();
} else {
enabled = false;
this.queuedHandles = null;
Expand Down Expand Up @@ -129,13 +131,14 @@ private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Han
updateMetrics();
return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true));
} else {
if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
scheduleTimeOutCheck(acquireTimeoutMillis);
return Optional.empty();
} else {
if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) {
log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}",
permits, handle.creationTime, remainingBytes);
return Optional.of(new Handle(0, handle.creationTime, false));
} else {
queuedHandles.offer(new QueuedHandle(handle, callback));
scheduleTimeOutCheck(acquireTimeoutMillis);
return Optional.empty();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Posit
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
originalCallback, ctx);
} else {
long estimatedEntrySize = getEstimatedEntrySize();
long estimatedEntrySize = getEstimatedEntrySize(lh);
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
if (log.isDebugEnabled()) {
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
Expand Down Expand Up @@ -418,12 +418,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Pos
}

@VisibleForTesting
public long getEstimatedEntrySize() {
long estimatedEntrySize = getAvgEntrySize();
if (estimatedEntrySize == 0) {
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
public long getEstimatedEntrySize(ReadHandle lh) {
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
// No entries stored.
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}

private long getAvgEntrySize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
Awaitility.await().untilAsserted(() -> {
long remainingBytes =limiter.getRemainingBytes();
long remainingBytes = limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
Expand All @@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
}).start();

long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
Expand All @@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
Expand All @@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception {

readCompleteSignal2.countDown();
cb2.entries.join();
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
Expand All @@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
}

private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
return entriesCount * perEntrySize;
}

class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -681,13 +682,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

for (int i = 0; i < 100; i++) {
ledger.addEntry(new byte[1024]);
ledger.addEntry(new byte[(int) (1024)]);
}

// First time, since we don't have info, we'll get 1 single entry
readAndCheck(cursor, 10, 3 * 1024, 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
readAndCheck(cursor, 10, 3 * avg, 3);
// We should only return 3 entries, based on the max size
readAndCheck(cursor, 20, 3 * 1024, 3);
readAndCheck(cursor, 20, 3 * avg, 3);
// If maxSize is < avg, we should get 1 entry
readAndCheck(cursor, 10, 500, 1);
}
Expand Down Expand Up @@ -3885,13 +3888,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ledger.addEntry(new byte[1024]);
}

// First time, since we don't have info, we'll get 1 single entry
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
assertEquals(entries.size(), 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

// We should only return 3 entries, based on the max size
entries = c.readEntriesOrWait(10, 3 * 1024);
entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

Expand Down Expand Up @@ -4798,5 +4803,82 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
public void testEstimateEntryCountBySize() throws Exception {
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
long entryCount0 =
ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml);
assertEquals(entryCount0, 1);
// Avoid trimming ledgers.
ml.openCursor("c1");

// Build data.
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1});
}
long ledger1 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2});
}
long ledger2 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2, 3, 4});
}
long ledger3 = ml.getCurrentLedger().getId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);

// Test: the individual ledgers.
long entryCount1 =
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount1, 16);
long entryCount2 =
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml);
assertEquals(entryCount2, 8);
long entryCount3 =
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml);
assertEquals(entryCount3, 4);

// Test: across ledgers.
long entryCount4 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount4, 108);
long entryCount5 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml);
assertEquals(entryCount5, 104);
long entryCount6 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount6, 204);

long entryCount7 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml);
assertEquals(entryCount7, 28);
long entryCount8 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml);
assertEquals(entryCount8, 24);
long entryCount9 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml);
assertEquals(entryCount9, 124);

// Test: read more than entries written.
long entryCount10 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml);
assertEquals(entryCount10, 304);

// cleanup.
ml.delete();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,11 @@ flexible messaging model and an intuitive client API.</description>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
<testcontainers.version>1.18.3</testcontainers.version>
<testcontainers.version>1.20.4</testcontainers.version>
<!-- Set docker-java.version to the version of docker-java used in org.testcontainers:testcontainers pom -->
<docker-java.version>3.4.0</docker-java.version>
<hamcrest.version>2.2</hamcrest.version>
<restassured.version>5.4.0</restassured.version>

<!-- Set docker-java.version to the version of docker-java used in Testcontainers -->
<docker-java.version>3.3.0</docker-java.version>
<kerby.version>1.1.1</kerby.version>
<testng.version>7.7.1</testng.version>
<mockito.version>3.12.4</mockito.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
Expand Down
Loading
Loading