From b4f82f0680d0fa65d9ef6e03698f5c2a9359b8e9 Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Wed, 5 Mar 2025 15:22:08 +0800 Subject: [PATCH 1/4] avoid negative estimate entry count --- .../mledger/impl/ManagedCursorImpl.java | 9 +++--- .../mledger/impl/ManagedCursorTest.java | 30 ++++++++++++------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 32d46ff1c3c40..b2ec5bbe554b8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3811,12 +3811,10 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } - int maxEntriesBasedOnSize = - Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue(); - return Math.min(maxEntriesBasedOnSize, maxEntries); + return Math.min(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger), maxEntries); } - static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { + static int estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { Position posToRead = readPosition; if (!ml.isValidPosition(readPosition)) { posToRead = ml.getNextValidPosition(readPosition); @@ -3855,7 +3853,8 @@ static long estimateEntryCountBySize(long bytesSize, Position readPosition, Mana posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE)); } } - return Math.max(result, 1); + int resultInt = Long.valueOf(result).intValue(); + return Math.max(resultInt < 0 ? Integer.MAX_VALUE : resultInt, 1); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 1cb09d995393c..40bdf28181b3f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5173,7 +5173,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional public void testEstimateEntryCountBySize() throws Exception { final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", ""); ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); - long entryCount0 = + int entryCount0 = ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml); assertEquals(entryCount0, 1); // Avoid trimming ledgers. @@ -5206,44 +5206,52 @@ public void testEstimateEntryCountBySize() throws Exception { assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); // Test: the individual ledgers. - long entryCount1 = + int entryCount1 = ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount1, 16); - long entryCount2 = + int entryCount2 = ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml); assertEquals(entryCount2, 8); - long entryCount3 = + int entryCount3 = ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml); assertEquals(entryCount3, 4); // Test: across ledgers. - long entryCount4 = + int entryCount4 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount4, 108); - long entryCount5 = + int entryCount5 = ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml); assertEquals(entryCount5, 104); - long entryCount6 = + int entryCount6 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount6, 204); - long entryCount7 = + int entryCount7 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml); assertEquals(entryCount7, 28); - long entryCount8 = + int entryCount8 = ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml); assertEquals(entryCount8, 24); - long entryCount9 = + int entryCount9 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml); assertEquals(entryCount9, 124); // Test: read more than entries written. - long entryCount10 = + int entryCount10 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount10, 304); // cleanup. ml.delete(); + + // test estimate long value convert to int value + ml = (ManagedLedgerImpl) factory.open(mlName); + ml.addEntry(new byte[1000]); + int entryCount11 = ManagedCursorImpl.estimateEntryCountBySize( + Long.MAX_VALUE, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml); + assertTrue(entryCount11 > 1, "entryCount11 is " + entryCount11); + ml.delete(); } @Test From 5b7fbc91bb3313bc079b76e6ed3cc5c3f46370de Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Wed, 5 Mar 2025 15:30:04 +0800 Subject: [PATCH 2/4] add comment --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 + .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b2ec5bbe554b8..2fb45e0c352a5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3814,6 +3814,7 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { return Math.min(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger), maxEntries); } + // The minimum value is 1 static int estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { Position posToRead = readPosition; if (!ml.isValidPosition(readPosition)) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 40bdf28181b3f..c010190c90dbd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5245,7 +5245,7 @@ public void testEstimateEntryCountBySize() throws Exception { // cleanup. ml.delete(); - // test estimate long value convert to int value + // test estimated long value convert to an int value ml = (ManagedLedgerImpl) factory.open(mlName); ml.addEntry(new byte[1000]); int entryCount11 = ManagedCursorImpl.estimateEntryCountBySize( From 2d3b5ad3382cbbe90d21fb60af02858af2c47032 Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Thu, 6 Mar 2025 11:11:57 +0800 Subject: [PATCH 3/4] address comment --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2fb45e0c352a5..562cf834dc38c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3854,8 +3854,8 @@ static int estimateEntryCountBySize(long bytesSize, Position readPosition, Manag posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE)); } } - int resultInt = Long.valueOf(result).intValue(); - return Math.max(resultInt < 0 ? Integer.MAX_VALUE : resultInt, 1); + int safeInt = result > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) result; + return Math.max(safeInt, 1); } @Override From 475af0bf086504fa19595c7c54af56e549bf7eee Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Thu, 6 Mar 2025 13:54:31 +0800 Subject: [PATCH 4/4] address comment --- .../mledger/impl/ManagedCursorImpl.java | 12 ++++--- .../mledger/impl/ManagedCursorTest.java | 34 ++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 562cf834dc38c..c05fd4908246a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3811,11 +3811,14 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } - return Math.min(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger), maxEntries); + long estimatedEntryCount = estimateEntryCountBySize(maxSizeBytes, readPosition, ledger); + if (estimatedEntryCount > Integer.MAX_VALUE) { + return maxEntries; + } + return Math.min((int) estimatedEntryCount, maxEntries); } - // The minimum value is 1 - static int estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { + static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { Position posToRead = readPosition; if (!ml.isValidPosition(readPosition)) { posToRead = ml.getNextValidPosition(readPosition); @@ -3854,8 +3857,7 @@ static int estimateEntryCountBySize(long bytesSize, Position readPosition, Manag posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE)); } } - int safeInt = result > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) result; - return Math.max(safeInt, 1); + return Math.max(result, 1); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c010190c90dbd..90a5dadbef06e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5173,7 +5173,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional public void testEstimateEntryCountBySize() throws Exception { final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", ""); ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); - int entryCount0 = + long entryCount0 = ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml); assertEquals(entryCount0, 1); // Avoid trimming ledgers. @@ -5206,51 +5206,53 @@ public void testEstimateEntryCountBySize() throws Exception { assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); // Test: the individual ledgers. - int entryCount1 = + long entryCount1 = ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount1, 16); - int entryCount2 = + long entryCount2 = ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml); assertEquals(entryCount2, 8); - int entryCount3 = + long entryCount3 = ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml); assertEquals(entryCount3, 4); // Test: across ledgers. - int entryCount4 = + long entryCount4 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount4, 108); - int entryCount5 = + long entryCount5 = ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml); assertEquals(entryCount5, 104); - int entryCount6 = + long entryCount6 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount6, 204); - int entryCount7 = + long entryCount7 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml); assertEquals(entryCount7, 28); - int entryCount8 = + long entryCount8 = ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml); assertEquals(entryCount8, 24); - int entryCount9 = + long entryCount9 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml); assertEquals(entryCount9, 124); // Test: read more than entries written. - int entryCount10 = + long entryCount10 = ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml); assertEquals(entryCount10, 304); // cleanup. ml.delete(); + } - // test estimated long value convert to an int value - ml = (ManagedLedgerImpl) factory.open(mlName); + @Test + public void testApplyMaxSizeCap() throws Exception { + var ml = factory.open("testApplyMaxSizeCap"); + var cursor = ml.openCursor("c1"); ml.addEntry(new byte[1000]); - int entryCount11 = ManagedCursorImpl.estimateEntryCountBySize( - Long.MAX_VALUE, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml); - assertTrue(entryCount11 > 1, "entryCount11 is " + entryCount11); + assertEquals(cursor.applyMaxSizeCap(200, Long.MAX_VALUE), 200); + ml.deleteCursor("c1"); ml.delete(); }