diff --git a/.mvn/extensions.xml b/.mvn/extensions.xml index 8ceede33b9cdc..22eaa6a1779f4 100644 --- a/.mvn/extensions.xml +++ b/.mvn/extensions.xml @@ -24,7 +24,7 @@ com.gradle develocity-maven-extension - 1.22.2 + 1.23.1 com.gradle diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3ddfc9bdcb57a..f392176e62ae5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -3710,26 +3711,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } + int maxEntriesBasedOnSize = + Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue(); + return Math.min(maxEntriesBasedOnSize, maxEntries); + } - double avgEntrySize = ledger.getStats().getEntrySizeAverage(); - if (!Double.isFinite(avgEntrySize)) { - // We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats - avgEntrySize = (double) entriesReadSize / (double) entriesReadCount; - } - - if (!Double.isFinite(avgEntrySize)) { - // If we still don't have any information, it means this is the first time we attempt reading - // and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats - return 1; + static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) { + Position posToRead = readPosition; + if (!ml.isValidPosition(readPosition)) { + posToRead = ml.getNextValidPosition(readPosition); } + long result = 0; + long remainingBytesSize = bytesSize; - int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize); - if (maxEntriesBasedOnSize < 1) { - // We need to read at least one entry - return 1; + while (remainingBytesSize > 0) { + // Last ledger. + if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) { + if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) { + // Only read 1 entry if no entries to read. + return 1; + } + long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries()) + + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + result += remainingBytesSize / avg; + break; + } + // Skip empty ledger. + LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId()); + if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) { + posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE)); + continue; + } + // Calculate entries by average of ledgers. + long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId(); + if (remainEntriesOfLedger * avg >= remainingBytesSize) { + result += remainingBytesSize / avg; + break; + } else { + // Calculate for the next ledger. + result += remainEntriesOfLedger; + remainingBytesSize -= remainEntriesOfLedger * avg; + posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE)); + } } - - return Math.min(maxEntriesBasedOnSize, maxEntries); + return Math.max(result, 1); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 61e4068393aab..86733aa060116 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -218,6 +218,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); + @VisibleForTesting + @Getter protected volatile LedgerHandle currentLedger; protected volatile long currentLedgerEntries = 0; protected volatile long currentLedgerSize = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 3a6bb3cd039c3..c2b9b82b694a5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -20,13 +20,13 @@ import com.google.common.annotations.VisibleForTesting; import io.prometheus.client.Gauge; +import java.util.ArrayDeque; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; -import org.jctools.queues.SpscArrayQueue; @Slf4j public class InflightReadsLimiter { @@ -37,6 +37,7 @@ public class InflightReadsLimiter { .help("Estimated number of bytes retained by data read from storage or cache") .register(); + private final int maxReadsInFlightAcquireQueueSize; private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge .build() .name("pulsar_ml_reads_available_inflight_bytes") @@ -64,9 +65,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui this.remainingBytes = maxReadsInFlightSize; this.acquireTimeoutMillis = acquireTimeoutMillis; this.timeOutExecutor = timeOutExecutor; + this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize; if (maxReadsInFlightSize > 0) { enabled = true; - this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize); + this.queuedHandles = new ArrayDeque<>(); } else { enabled = false; this.queuedHandles = null; @@ -129,13 +131,14 @@ private synchronized Optional internalAcquire(long permits, Consumer= maxReadsInFlightAcquireQueueSize) { log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", permits, handle.creationTime, remainingBytes); return Optional.of(new Handle(0, handle.creationTime, false)); + } else { + queuedHandles.offer(new QueuedHandle(handle, callback)); + scheduleTimeOutCheck(acquireTimeoutMillis); + return Optional.empty(); } } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index c8d14cebebc88..02718561705fc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -302,7 +302,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Posit doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx); } else { - long estimatedEntrySize = getEstimatedEntrySize(); + long estimatedEntrySize = getEstimatedEntrySize(lh); long estimatedReadSize = numberOfEntries * estimatedEntrySize; if (log.isDebugEnabled()) { log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size", @@ -418,12 +418,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Pos } @VisibleForTesting - public long getEstimatedEntrySize() { - long estimatedEntrySize = getAvgEntrySize(); - if (estimatedEntrySize == 0) { - estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE; + public long getEstimatedEntrySize(ReadHandle lh) { + if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) { + // No entries stored. + return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } - return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } private long getAvgEntrySize() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java index 48f0cf08ddff4..6676baf8b555a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception { SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx); cb0.entries.join(); - Long sizePerEntry1 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue(); Awaitility.await().untilAsserted(() -> { - long remainingBytes =limiter.getRemainingBytes(); + long remainingBytes = limiter.getRemainingBytes(); Assert.assertEquals(remainingBytes, totalCapacity); }); log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); @@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx); }).start(); - long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1); + long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry); long remainingBytesExpected1 = totalCapacity - bytesAcquired1; log.info("acquired : {}", bytesAcquired1); log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1); @@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { Thread.sleep(3000); readCompleteSignal1.countDown(); cb1.entries.join(); - Long sizePerEntry2 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); - long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1); + long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry); long remainingBytesExpected2 = totalCapacity - bytesAcquired2; log.info("acquired : {}", bytesAcquired2); log.info("remainingBytesExpected 1: {}", remainingBytesExpected2); @@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception { readCompleteSignal2.countDown(); cb2.entries.join(); - Long sizePerEntry3 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); Awaitility.await().untilAsserted(() -> { long remainingBytes = limiter.getRemainingBytes(); log.info("remainingBytes 2: {}", remainingBytes); @@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { } private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) { - return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + return entriesCount * perEntrySize; } class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index fe484d62c4eff..62c1539648b75 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -681,13 +682,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception { ManagedCursor cursor = ledger.openCursor("c1"); for (int i = 0; i < 100; i++) { - ledger.addEntry(new byte[1024]); + ledger.addEntry(new byte[(int) (1024)]); } - // First time, since we don't have info, we'll get 1 single entry - readAndCheck(cursor, 10, 3 * 1024, 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024); + readAndCheck(cursor, 10, 3 * avg, 3); // We should only return 3 entries, based on the max size - readAndCheck(cursor, 20, 3 * 1024, 3); + readAndCheck(cursor, 20, 3 * avg, 3); // If maxSize is < avg, we should get 1 entry readAndCheck(cursor, 10, 500, 1); } @@ -3885,13 +3888,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception { ledger.addEntry(new byte[1024]); } - // First time, since we don't have info, we'll get 1 single entry - List entries = c.readEntriesOrWait(10, 3 * 1024); - assertEquals(entries.size(), 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + List entries = c.readEntriesOrWait(10, 3 * avg); + assertEquals(entries.size(), 3); entries.forEach(Entry::release); // We should only return 3 entries, based on the max size - entries = c.readEntriesOrWait(10, 3 * 1024); + entries = c.readEntriesOrWait(10, 3 * avg); assertEquals(entries.size(), 3); entries.forEach(Entry::release); @@ -4798,5 +4803,82 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); } + @Test + public void testEstimateEntryCountBySize() throws Exception { + final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", ""); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); + long entryCount0 = + ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml); + assertEquals(entryCount0, 1); + // Avoid trimming ledgers. + ml.openCursor("c1"); + + // Build data. + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1}); + } + long ledger1 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2}); + } + long ledger2 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2, 3, 4}); + } + long ledger3 = ml.getCurrentLedger().getId(); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2); + long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + + // Test: the individual ledgers. + long entryCount1 = + ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount1, 16); + long entryCount2 = + ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml); + assertEquals(entryCount2, 8); + long entryCount3 = + ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml); + assertEquals(entryCount3, 4); + + // Test: across ledgers. + long entryCount4 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount4, 108); + long entryCount5 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml); + assertEquals(entryCount5, 104); + long entryCount6 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount6, 204); + + long entryCount7 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml); + assertEquals(entryCount7, 28); + long entryCount8 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml); + assertEquals(entryCount8, 24); + long entryCount9 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml); + assertEquals(entryCount9, 124); + + // Test: read more than entries written. + long entryCount10 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount10, 304); + + // cleanup. + ml.delete(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pom.xml b/pom.xml index 6864bebdba1d9..3f11c7f174f5c 100644 --- a/pom.xml +++ b/pom.xml @@ -259,12 +259,11 @@ flexible messaging model and an intuitive client API. 3.3.2 - 1.18.3 + 1.20.4 + + 3.4.0 2.2 5.4.0 - - - 3.3.0 1.1.1 7.7.1 3.12.4 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 138727a7be9cc..03f1ef64fbb5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -305,7 +305,7 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str // This operation should be reading from zookeeper and it should be allowed without having admin privileges CompletableFuture validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); if (checkIfTopicExists) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index c23bc17c30365..65a92dcd8b970 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -754,7 +754,7 @@ private boolean removeIndexBit(long ledgerId, long entryId) { .orElse(false); } - public boolean containsMessage(long ledgerId, long entryId) { + public synchronized boolean containsMessage(long ledgerId, long entryId) { if (lastMutableBucket.containsMessage(ledgerId, entryId)) { return true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 919771832fdb0..c19d9cfe8f2b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -177,6 +177,7 @@ public void startProducer() { prepareCreateProducer().thenCompose(ignore -> { ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; builderImpl.getConf().setNonPartitionedTopicExpected(true); + builderImpl.getConf().setReplProducer(true); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4f88baf6f618b..5dc05bdf10a0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -120,6 +120,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -1326,7 +1327,11 @@ private CompletableFuture> createNonPersistentTopic(String topic final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic; try { - nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + if (isSystemTopic(topic)) { + nonPersistentTopic = new NonPersistentSystemTopic(topic, this); + } else { + nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + } nonPersistentTopic.setCreateFuture(topicFuture); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index e2f96c8955e8e..5d667605cf6e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -20,6 +20,8 @@ import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; @@ -87,6 +89,7 @@ public class Producer { private final PublisherStatsImpl stats; private final boolean isRemote; + private final boolean isRemoteOrShadow; private final String remoteCluster; private final boolean isNonPersistentTopic; private final boolean isShadowTopic; @@ -148,6 +151,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN String replicatorPrefix = serviceConf.getReplicatorPrefix() + "."; this.isRemote = producerName.startsWith(replicatorPrefix); + this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix()); this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix); this.isEncrypted = isEncrypted; @@ -160,6 +164,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.brokerInterceptor = cnx.getBrokerService().getInterceptor(); } + /** + * Difference with "isRemote" is whether the prefix string is end with a dot. + */ + public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) { + return producerName != null && producerName.startsWith(replicatorPrefix); + } + /** * Producer name for replicator is in format. * "replicatorPrefix.localCluster" (old) @@ -281,11 +292,16 @@ private boolean checkCanProduceTxnOnTopic(long sequenceId, ByteBuf headersAndPay return true; } + private boolean isSupportsReplDedupByLidAndEid() { + // Non-Persistent topic does not have ledger id or entry id, so it does not support. + return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent(); + } + private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker, position); + batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -297,7 +313,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker, position); + isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -393,6 +409,7 @@ private static final class MessagePublishContext implements PublishContext, Runn private long batchSize; private boolean chunked; private boolean isMarker; + private boolean supportsReplDedupByLidAndEid; private long startTimeNs; @@ -483,6 +500,11 @@ public long getOriginalSequenceId() { return originalSequenceId; } + @Override + public boolean supportsReplDedupByLidAndEid() { + return supportsReplDedupByLidAndEid; + } + @Override public void setOriginalHighestSequenceId(long originalHighestSequenceId) { this.originalHighestSequenceId = originalHighestSequenceId; @@ -550,8 +572,12 @@ public void run() { // stats rateIn.recordMultipleEvents(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); - producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, - ledgerId, entryId); + if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) { + sendSendReceiptResponseRepl(); + } else { + // Repl V1 is the same as normal for this handling. + sendSendReceiptResponseNormal(); + } producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize); if (this.chunked) { producer.chunkedMessageRate.recordEvent(); @@ -564,8 +590,46 @@ public void run() { recycle(); } + private void sendSendReceiptResponseRepl() { + // Case-1: is a repl marker. + boolean isReplMarker = getProperty(MSG_PROP_IS_REPL_MARKER) != null; + if (isReplMarker) { + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, Long.MIN_VALUE, + ledgerId, entryId); + + return; + } + // Case-2: is a repl message. + Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[]) + || ((long[]) positionPairObj).length < 2) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired" + + " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {}," + + " sequence-id {}, prop-{}: not in expected format", + producer.topic.getName(), producer.producerName, + supportsReplDedupByLidAndEid(), getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + producer.cnx.getCommandSender().sendSendError(producer.producerId, + Math.max(highestSequenceId, sequenceId), + ServerError.PersistenceError, "Message can not determine whether the message is" + + " duplicated due to the acquired messages props were are invalid"); + return; + } + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId, + replSequenceEId, ledgerId, entryId); + } + + private void sendSendReceiptResponseNormal() { + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, + ledgerId, entryId); + } + static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsReplDedupByLidAndEid) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -577,6 +641,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -586,7 +651,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsReplDedupByLidAndEid) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -599,6 +665,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -829,7 +896,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon } MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null, + cnx.isClientSupportsReplDedupByLidAndEid()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 85a523df3198f..3548243cca45f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -64,6 +64,7 @@ import javax.net.ssl.SSLSession; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -235,6 +236,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Flag to manage throttling-rate by atomically enable/disable read-channel. private volatile boolean autoReadDisabledRateLimiting = false; + @Getter private FeatureFlags features; private PulsarCommandSender commandSender; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 2e961b793d8e4..2ec7a6650e4e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -127,6 +127,10 @@ default long getEntryTimestamp() { default void setEntryTimestamp(long entryTimestamp) { } + + default boolean supportsReplDedupByLidAndEid() { + return false; + } } CompletableFuture initialize(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index 94f934fec681e..1673c0d89679c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -23,6 +23,7 @@ import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.common.api.proto.FeatureFlags; public interface TransportCnx { @@ -90,4 +91,11 @@ public interface TransportCnx { * is null if the connection liveness check is disabled. */ CompletableFuture checkConnectionLiveness(); + + FeatureFlags getFeatures(); + + default boolean isClientSupportsReplDedupByLidAndEid() { + return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid() + && getFeatures().isSupportsReplDedupByLidAndEid(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java new file mode 100644 index 0000000000000..9b867c9a8b3b6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.pulsar.broker.service.BrokerService; + +public class NonPersistentSystemTopic extends NonPersistentTopic { + public NonPersistentSystemTopic(String topic, BrokerService brokerService) { + super(topic, brokerService); + } + + @Override + public boolean isSystemTopic() { + return true; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 3390c3a288526..adcaab29cfae2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -195,9 +196,15 @@ protected boolean replicateEntries(List entries) { msg.setSchemaInfoForReplicator(schemaFuture.get()); msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION) + .setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId())); msgOut.recordEvent(headersAndPayload.readableBytes()); // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); + if (log.isDebugEnabled()) { + log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId()); + } producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); atLeastOneMessageSentForReplication = true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 1715e09dc7ba7..bf482986ec980 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.util.Iterator; @@ -38,10 +40,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic.PublishContext; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -330,27 +336,157 @@ public boolean isEnabled() { * @return true if the message should be published or false if it was recognized as a duplicate */ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) { + setContextPropsIfRepl(publishContext, headersAndPayload); if (!isEnabled() || publishContext.isMarkerMessage()) { return MessageDupStatus.NotDup; } + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { + if (!publishContext.supportsReplDedupByLidAndEid()){ + return isDuplicateReplV1(publishContext, headersAndPayload); + } else { + return isDuplicateReplV2(publishContext, headersAndPayload); + } + } + return isDuplicateNormal(publishContext, headersAndPayload, false); + } + + public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) { + // Message is coming from replication, we need to use the original producer name and sequence id + // for the purpose of deduplication and not rely on the "replicator" name. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + String producerName = md.getProducerName(); + long sequenceId = md.getSequenceId(); + long highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); + publishContext.setOriginalProducerName(producerName); + publishContext.setOriginalSequenceId(sequenceId); + publishContext.setOriginalHighestSequenceId(highestSequenceId); + return isDuplicateNormal(publishContext, headersAndPayload, true); + } + + private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf headersAndPayload) { + // Case-1: is a replication marker. + if (publishContext.isMarkerMessage()) { + // Message is coming from replication, we need to use the replication's producer name, ledger id and entry + // id for the purpose of deduplication. + MessageMetadata md = Commands.peekMessageMetadata(headersAndPayload, "Check-Deduplicate", -1); + if (Markers.isReplicationMarker(md.getMarkerType())) { + publishContext.setProperty(MSG_PROP_IS_REPL_MARKER, ""); + } + return; + } + + // Case-2: is a replicated message. + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { + // Message is coming from replication, we need to use the replication's producer name, source cluster's + // ledger id and entry id for the purpose of deduplication. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + List kvPairList = md.getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { + if (!kvPair.getValue().contains(":")) { + log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(), + MSG_PROP_REPL_SOURCE_POSITION, + kvPair.getValue()); + break; + } + String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); + if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) + || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { + log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(), + MSG_PROP_REPL_SOURCE_POSITION, + kvPair.getValue()); + break; + } + long[] positionPair = new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(), + Long.valueOf(ledgerIdAndEntryId[1]).longValue()}; + publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair); + break; + } + } + } + } + + public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) { + Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[])) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages" + + " props were are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {}," + + " prop-{}: not in expected format", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + return MessageDupStatus.Unknown; + } + + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + synchronized (highestSequencedPushed) { + Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey); + Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey); + if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null + && (replSequenceLId < lastSequenceLIdPushed.longValue() + || (replSequenceLId == lastSequenceLIdPushed.longValue() + && replSequenceEId <= lastSequenceEIdPushed.longValue()))) { + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest publishing" + + " in-progress {}:{}", + topic.getName(), publishContext.getProducerName(), lastSequenceLIdPushed, + lastSequenceEIdPushed, lastSequenceLIdPushed, lastSequenceEIdPushed); + } + + // Also need to check sequence ids that has been persisted. + // If current message's seq id is smaller or equals to the + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted" than its definitely a dup + // If current message's seq id is between "lastSequenceLIdPushed:lastSequenceEIdPushed" and + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted", then we cannot be sure whether the message + // is a dup or not we should return an error to the producer for the latter case so that it can retry + // at a future time + Long lastSequenceLIdPersisted = highestSequencedPersisted.get(lastSequenceLIdKey); + Long lastSequenceEIdPersisted = highestSequencedPersisted.get(lastSequenceEIdKey); + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest" + + " persisted {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, + replSequenceEId, lastSequenceLIdPersisted, lastSequenceEIdPersisted); + } + if (lastSequenceLIdPersisted != null && lastSequenceEIdPersisted != null + && (replSequenceLId < lastSequenceLIdPersisted.longValue() + || (replSequenceLId == lastSequenceLIdPersisted.longValue() + && replSequenceEId <= lastSequenceEIdPersisted))) { + return MessageDupStatus.Dup; + } else { + return MessageDupStatus.Unknown; + } + } + highestSequencedPushed.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPushed.put(lastSequenceEIdKey, replSequenceEId); + } + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as non-duplicated producer={}. publishing {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, replSequenceEId); + } + return MessageDupStatus.NotDup; + } + public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf headersAndPayload, + boolean useOriginalProducerName) { String producerName = publishContext.getProducerName(); + if (useOriginalProducerName) { + producerName = publishContext.getOriginalProducerName(); + } long sequenceId = publishContext.getSequenceId(); long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId); MessageMetadata md = null; - if (producerName.startsWith(replicatorPrefix)) { - // Message is coming from replication, we need to use the original producer name and sequence id - // for the purpose of deduplication and not rely on the "replicator" name. - int readerIndex = headersAndPayload.readerIndex(); - md = Commands.parseMessageMetadata(headersAndPayload); - producerName = md.getProducerName(); - sequenceId = md.getSequenceId(); - highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); - publishContext.setOriginalProducerName(producerName); - publishContext.setOriginalSequenceId(sequenceId); - publishContext.setOriginalHighestSequenceId(highestSequenceId); - headersAndPayload.readerIndex(readerIndex); - } long chunkID = -1; long totalChunk = -1; if (publishContext.isChunked()) { @@ -408,7 +544,37 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p if (!isEnabled() || publishContext.isMarkerMessage()) { return; } + if (publishContext.getProducerName().startsWith(replicatorPrefix) + && publishContext.supportsReplDedupByLidAndEid()) { + recordMessagePersistedRepl(publishContext, position); + } else { + recordMessagePersistedNormal(publishContext, position); + } + } + + public void recordMessagePersistedRepl(PublishContext publishContext, Position position) { + Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION); + if (positionPairObj == null || !(positionPairObj instanceof long[])) { + log.error("[{}] Can not persist highest sequence-id due to the acquired messages" + + " props are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {}," + + " prop-{}: not in expected format", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(), + MSG_PROP_REPL_SOURCE_POSITION); + recordMessagePersistedNormal(publishContext, position); + return; + } + long[] positionPair = (long[]) positionPairObj; + long replSequenceLId = positionPair[0]; + long replSequenceEId = positionPair[1]; + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId); + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + public void recordMessagePersistedNormal(PublishContext publishContext, Position position) { String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); long highestSequenceId = publishContext.getHighestSequenceId(); @@ -422,9 +588,18 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p if (isLastChunk == null || isLastChunk) { highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); } + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + + private void increaseSnapshotCounterAndTakeSnapshotIfNeeded(Position position) { if (++snapshotCounter >= snapshotInterval) { snapshotCounter = 0; takeSnapshot(position); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Waiting for sequence-id snapshot {}/{}", topic.getName(), snapshotCounter, + snapshotInterval); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index ae844b5784456..245ccaf2ee3f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1227,7 +1227,7 @@ public boolean checkAndUnblockIfStuck() { return false; } // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read - if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead + if (isAtleastOneConsumerAvailable() && !havePendingReplayRead && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); readMoreEntries(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 77d580b0389ca..edd2ab6a4bdea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -615,9 +615,9 @@ public boolean checkAndUnblockIfStuck() { if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) { return false; } - int totalAvailablePermits = consumer.getAvailablePermits(); + boolean isConsumerAvailable = !consumer.isBlocked() && consumer.getAvailablePermits() > 0; // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read - if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { + if (isConsumerAvailable && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); readMoreEntries(consumer); return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index b48f748bf5dbf..ab465d1dcc4e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import io.netty.buffer.ByteBuf; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -96,6 +97,9 @@ protected boolean replicateEntries(List entries) { msg.setReplicatedFrom(localCluster); msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1)); + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION) + .setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId())); headersAndPayload.retain(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java index 50f8ac9594433..eb1109272216e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java @@ -24,10 +24,14 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.testng.annotations.Test; public class BrokerMessageDeduplicationTest { @@ -42,7 +46,18 @@ public void markerMessageNotDeduplicated() { doReturn(true).when(deduplication).isEnabled(); Topic.PublishContext context = mock(Topic.PublishContext.class); doReturn(true).when(context).isMarkerMessage(); - MessageDeduplication.MessageDupStatus status = deduplication.isDuplicate(context, null); + + MessageMetadata msgMetadata = new MessageMetadata(); + msgMetadata.setMarkerType(MarkerType.TXN_ABORT_VALUE); + msgMetadata.setProducerName("p1"); + msgMetadata.setSequenceId(0); + msgMetadata.setPublishTime(System.currentTimeMillis()); + byte[] metadataData = msgMetadata.toByteArray(); + ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(metadataData.length + 4); + byteBuf.writeInt(metadataData.length); + byteBuf.writeBytes(metadataData); + + MessageDeduplication.MessageDupStatus status = deduplication.isDuplicate(context, byteBuf); assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 9d5bb75efd6d1..41b7387d2e3a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1024,6 +1024,36 @@ public void testRevokePartitionedTopic() { } } + @Test + public void testRevokePartitionedTopicWithReadonlyPolicies() throws Exception { + final String partitionedTopicName = "testRevokePartitionedTopicWithReadonlyPolicies-topic"; + final int numPartitions = 5; + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic( + response, testTenant, testNamespace, partitionedTopicName, numPartitions, true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + String role = "role"; + Set expectActions = new HashSet<>(); + expectActions.add(AuthAction.produce); + response = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role, + expectActions); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + response = mock(AsyncResponse.class); + doReturn(CompletableFuture.failedFuture( + new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations")) + ).when(persistentTopics).validatePoliciesReadOnlyAccessAsync(); + persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(exceptionCaptor.capture()); + Assert.assertEquals(exceptionCaptor.getValue().getResponse().getStatus(), + Response.Status.FORBIDDEN.getStatusCode()); + } + @Test public void testTriggerCompactionTopic() { final String partitionTopicName = "test-part"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 69f5b085d24c1..08c76e119556b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -73,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; @@ -94,6 +95,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; @@ -101,6 +103,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; @@ -114,6 +117,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; @@ -121,6 +125,7 @@ import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; import org.mockito.MockedStatic; +import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.Test; @@ -1476,6 +1481,43 @@ public void compactionScheduleTest() { }); } + @Test + public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception { + String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; + NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService()); + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + + var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class); + brokerLoadDataStore.init(); + brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get(); + Awaitility.await().until(() -> { + var data = brokerLoadDataStore.get("key"); + return data.isPresent(); + }); + brokerLoadDataStore.pushAsync("key", null).get(); + brokerLoadDataStore.close(); + } + + @Data + private static class BrokerLoadDataV1 { + private ResourceUsage cpu; + private ResourceUsage memory; + private ResourceUsage directMemory; + private ResourceUsage bandwidthIn; + private ResourceUsage bandwidthOut; + private double msgThroughputIn; + private double msgThroughputOut; + private double msgRateIn; + private double msgRateOut; + private int bundleCount; + private int topics; + private double maxResourceUsage; + private double weightedMaxEMA; + private double msgThroughputEMA; + private long updatedAt; + private long reportedAt; + } + @Test(timeOut = 10 * 1000) public void unloadTimeoutCheckTest() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 52147f74f4a6e..21a7c179b822c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -85,7 +85,7 @@ public void testBatchMessageAck() { .newConsumer() .topic(topicName) .subscriptionName(subscriptionName) - .receiverQueueSize(10) + .receiverQueueSize(50) .subscriptionType(SubscriptionType.Shared) .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -114,27 +114,29 @@ public void testBatchMessageAck() { consumer.acknowledge(receive1); consumer.acknowledge(receive2); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18); + // Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size, + // broker will deliver much messages than before. So edit 18 -> 38 here. + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38); }); Message receive3 = consumer.receive(); Message receive4 = consumer.receive(); consumer.acknowledge(receive3); consumer.acknowledge(receive4); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); // Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436. consumer.pause(); Message receive5 = consumer.receive(); consumer.negativeAcknowledge(receive5); Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20); }); // Unblock cmd-flow. consumer.resume(); consumer.receive(); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java new file mode 100644 index 0000000000000..a95a8f0ddfa51 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java @@ -0,0 +1,932 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.vertx.core.impl.ConcurrentHashSet; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; +import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.stubbing.Answer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class OneWayReplicatorDeduplicationTest extends OneWayReplicatorTestBase { + + static final ObjectMapper JACKSON = new ObjectMapper(); + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + waitInternalClientCreated(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + // For check whether deduplication snapshot has done. + config.setBrokerDeduplicationEntriesInterval(10); + config.setReplicationStartAt("earliest"); + // To cover more cases, write more than one ledger. + config.setManagedLedgerMaxEntriesPerLedger(100); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setManagedLedgerMaxLedgerRolloverTimeMinutes(1); + } + + protected void waitReplicatorStopped(String topicName) { + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic2.getProducers().isEmpty()); + Optional topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional1.isPresent()); + PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic1.getReplicators().isEmpty() + || !persistentTopic1.getReplicators().get(cluster2).isConnected()); + }); + } + + protected void waitInternalClientCreated() throws Exception { + // Wait for the internal client created. + final String topicNameTriggerInternalClientCreate = + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); + waitReplicatorStarted(topicNameTriggerInternalClientCreate); + cleanupTopics(() -> { + admin1.topics().delete(topicNameTriggerInternalClientCreate); + admin2.topics().delete(topicNameTriggerInternalClientCreate); + }); + } + + protected Runnable injectReplicatorClientCnx( + InjectedClientCnxClientBuilder.ClientCnxFactory clientCnxFactory) throws Exception { + String cluster2 = pulsar2.getConfig().getClusterName(); + BrokerService brokerService = pulsar1.getBrokerService(); + ClientBuilderImpl clientBuilder2 = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(url2.toString()); + + // Inject spy client. + final var replicationClients = brokerService.getReplicationClients(); + PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); + PulsarClientImpl injectedClient = InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, injectedClient)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, injectedClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + injectedClient.closeAsync(); + }; + } + + @DataProvider(name = "deduplicationArgs") + public Object[][] deduplicationArgs() { + return new Object[][] { + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + // ===== multi schema + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + // ===== Compatability "source-cluster: old, target-cluster: new". + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, false/* multi schemas */}, + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + false /* supportsReplDedupByLidAndEid */, true/* multi schemas */}, + }; + } + + // TODO + // - Review the code to confirm that multi source-brokers can work when the source topic switch. + @Test(timeOut = 360 * 1000, dataProvider = "deduplicationArgs") + public void testDeduplication(final boolean injectRepeatedPublish, final int repeatedMessagesWindow, + final boolean supportsReplDedupByLidAndEid, boolean multiSchemas) throws Exception { + // 0. Inject a mechanism that duplicate all Send-Command for the replicator. + final List duplicatedMsgs = new ArrayList<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + + @Override + protected ByteBuf newConnectCommand() throws Exception { + if (supportsReplDedupByLidAndEid) { + return super.newConnectCommand(); + } + authenticationDataProvider = authentication.getAuthData(remoteHostName); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); + BaseCommand cmd = Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), authData, + this.protocolVersion, clientVersion, proxyToTargetBrokerAddress, null, null, null, null); + cmd.getConnect().getFeatureFlags().setSupportsReplDedupByLidAndEid(false); + return Commands.serializeWithSize(cmd); + } + + @Override + public boolean isBrokerSupportsReplDedupByLidAndEid() { + return supportsReplDedupByLidAndEid; + } + + @Override + public ChannelHandlerContext ctx() { + if (!injectRepeatedPublish) { + return super.ctx(); + } + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + Answer injectedAnswer = invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.size() >= repeatedMessagesWindow) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); + return spyContext; + } + }); + + // 1. Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication1 = persistentTopic1.getMessageDeduplication(); + if (messageDeduplication1 != null) { + int snapshotInterval1 = WhiteboxImpl.getInternalState(messageDeduplication1, "snapshotInterval"); + assertEquals(snapshotInterval1, 10); + } + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + + // 2, Publish messages. + List msgSent = new ArrayList<>(); + Producer p1 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p2 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).create(); + Producer p4 = client1.newProducer(Schema.BOOL).topic(topicName).create(); + for (int i = 0; i < 10; i++) { + p1.send(i); + msgSent.add(String.valueOf(i)); + } + for (int i = 10; i < 200; i++) { + int msg1 = i; + int msg2 = 1000 + i; + String msg3 = (2000 + i) + ""; + boolean msg4 = i % 2 == 0; + p1.send(msg1); + p2.send(msg2); + msgSent.add(String.valueOf(msg1)); + msgSent.add(String.valueOf(msg2)); + if (multiSchemas) { + p3.send(msg3); + p4.send(msg4); + msgSent.add(String.valueOf(msg3)); + msgSent.add(String.valueOf(msg4)); + } + } + p1.close(); + p2.close(); + p3.close(); + p4.close(); + + // 3. Enable replication and wait the task to be finished. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + // Verify: all messages were copied correctly. + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(String.valueOf(msg.getValue())); + consumer.acknowledgeAsync(msg); + } + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived, msgSent); + consumer.close(); + + // Verify: the deduplication cursor has been acked. + // "topic-policy.DeduplicationSnapshotInterval" is "10". + Awaitility.await().untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + }); + // Remove the injection. + taskToClearInjection.run(); + + log.info("====== Verify: all messages will be replicated after reopening replication ======"); + + // Verify: all messages will be replicated after reopening replication. + // Reopen replication: stop replication. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + admin2.topics().unload(topicName); + admin2.topics().delete(topicName); + // Reopen replication: enable replication. + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.c2")) { + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); + } + } + }); + // Reopen replication: consumption. + List msgReceived2 = new ArrayList<>(); + Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer2.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived2.add(String.valueOf(msg.getValue())); + consumer2.acknowledgeAsync(msg); + } + // Verify: all messages were copied correctly. + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived2, msgSent); + consumer2.close(); + + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } + + @DataProvider(name = "enabledDeduplication") + public Object[][] enabledDeduplication() { + return new Object[][] { + {true}, + {false} + }; + } + + /*** + * To reproduce the issue that replication loss message if enabled deduplication + * 1. Publishing in the source cluster + * 1-1. Producer-1 send 2 messages: M1, M2 + * 1-2. Producer-2 send 2 messages: M3, M4 + * 2. Replicate messages to the remote cluster + * 2-1. Copies M1 and M2 + * 2-2. Repeatedly copies M1 and M2. and copies M3 and M4. + * 2-2-1. After repeatedly copies M1 and M2, the network broke. + * 3. After a topic unloading. + * 3-1. The replicator will start after the topic is loaded up. + * 3-2. The client will create a new connection. + * 4. Verify: All 4 messages are copied to the remote cluster. + */ + @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") + public void testDeduplicationNotLostMessage(boolean enabledDeduplication) throws Exception { + waitInternalClientCreated(); + + /** + * step-2: Inject a mechanism that makes the client connect broke after repeatedly copied M1 and M2. + */ + final List duplicatedMsgs = new ArrayList<>(); + final int repeatMsgIndex = 2; + AtomicInteger msgSent = new AtomicInteger(0); + ConcurrentHashSet injectedChannel = new ConcurrentHashSet<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + + @Override + public ChannelHandlerContext ctx() { + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + Answer injectedAnswer = invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.isEmpty() && msgSent.get() == repeatMsgIndex) { + return null; + } + if (msgSent.get() == repeatMsgIndex) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + return null; + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0 && msgSent.incrementAndGet() <= repeatMsgIndex) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); + injectedChannel.add(originalCtx.channel()); + return spyContext; + } + }); + + // Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) tp2.getManagedLedger(); + if (enabledDeduplication) { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + }); + } + // Let broker persist messages one by one, in other words, it starts to persist the next message after the + // previous has been written into BKs. + PersistentTopic spyTp2 = spy(tp2); + doAnswer(invocation -> { + try { + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + assertEquals(ml2.getPendingAddEntriesCount(), 0); + }); + } catch (Throwable throwable) { + // Ignore this timeout error. + } + return invocation.callRealMethod(); + }).when(spyTp2).publishMessage(any(ByteBuf.class), any(Topic.PublishContext.class)); + CompletableFuture> originalTp2 = pulsar2.getBrokerService().getTopics().put(tp2.getName(), + CompletableFuture.completedFuture(Optional.of(spyTp2))); + + /** + * Step-1: Publishes messages in the source cluster and start replication, + */ + ProducerImpl p1 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p1").create(); + ProducerImpl p2 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p2").create(); + p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); + p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); + + // Enable replication and wait the task to be finished, it should not finish if no bug. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + try { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + } catch (Throwable t) { + // Ignore the error. + } + + /** + * Step-3: remove the injections, unload topics and rebuild connections of the replicator. + */ + taskToClearInjection.run(); + pulsar2.getBrokerService().getTopics().put(tp2.getName(), originalTp2); + admin1.topics().unload(topicName); + admin2.topics().unload(topicName); + for (Channel channel : injectedChannel) { + channel.close(); + } + waitReplicatorStarted(topicName); + PersistentTopic tp12 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp22 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + /** + * Verify: All 4 messages are copied to the remote cluster. + */ + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer().topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); + consumer.acknowledgeAsync(msg); + } + + log.info("received msgs: {}", msgReceived); + assertTrue(msgReceived.contains("1")); + assertTrue(msgReceived.contains("2")); + assertTrue(msgReceived.contains("3")); + assertTrue(msgReceived.contains("4")); + if (enabledDeduplication) { + assertEquals(msgReceived, Arrays.asList("1", "2", "3", "4")); + } + + // cleanup. + consumer.close(); + p1.close(); + p2.close(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } + + @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") + public void testReplicationLoadSchemaTimeout(boolean enabledDeduplication) throws Exception { + waitInternalClientCreated(); + + /** + * Inject a timeout error for Get Schema. + */ + Field filedSchemaRegistryService = PulsarService.class.getDeclaredField("schemaRegistryService"); + filedSchemaRegistryService.setAccessible(true); + SchemaRegistryService originalSchemaRegistryService = + (SchemaRegistryService) filedSchemaRegistryService.get(pulsar2); + SchemaRegistryService spySchemaRegistryService = spy(originalSchemaRegistryService); + AtomicBoolean getSchemaSuccess = new AtomicBoolean(false); + doAnswer(invocation -> { + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + return invocation.callRealMethod(); + } else { + getSchemaSuccess.set(true); + } + Thread.sleep(60 * 1000); + return invocation.callRealMethod(); + }).when(spySchemaRegistryService).findSchemaVersion(any(String.class), any(SchemaData.class)); + filedSchemaRegistryService.set(pulsar2, spySchemaRegistryService); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + @Override + protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) { + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + super.handleGetSchemaResponse(commandGetSchemaResponse); + return; + } else { + getSchemaSuccess.set(true); + } + checkArgument(state == State.Ready); + long requestId = commandGetSchemaResponse.getRequestId(); + CompletableFuture future = + (CompletableFuture) pendingRequests.remove(requestId); + if (future == null) { + duplicatedResponseCounter.incrementAndGet(); + log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); + return; + } + future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); + } + + @Override + protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse + commandGetOrCreateSchemaResponse) { + + if (getSchemaSuccess.get()) { + getSchemaSuccess.set(false); + super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse); + return; + } else { + getSchemaSuccess.set(true); + } + + checkArgument(state == State.Ready); + long requestId = commandGetOrCreateSchemaResponse.getRequestId(); + CompletableFuture future = + (CompletableFuture) pendingRequests.remove(requestId); + if (future == null) { + duplicatedResponseCounter.incrementAndGet(); + log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); + return; + } + future.completeExceptionally(new PulsarClientException.TimeoutException("Mocked timeout")); + } + }); + + // Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + if (enabledDeduplication) { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + }); + } + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + + // Publishes messages in the source cluster. + Producer p1 = client1.newProducer().topic(topicName).producerName("p1").create(); + Producer p2 = client1.newProducer().topic(topicName).producerName("p2").create(); + Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).producerName("p3").create(); + p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); + p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); + p3.send("2-1"); + p3.send("2-2"); + p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); + + // Enable replication and wait the task to be finished, it should not finish if no bug. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + Awaitility.await().atMost(Duration.ofSeconds(180)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + // Verify: All messages are copied to the remote cluster. + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer().topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); + consumer.acknowledgeAsync(msg); + } + log.info("received msgs: {}", msgReceived); + assertTrue(msgReceived.contains("1")); + assertTrue(msgReceived.contains("2")); + assertTrue(msgReceived.contains("2-1")); + assertTrue(msgReceived.contains("2-2")); + assertTrue(msgReceived.contains("3")); + assertTrue(msgReceived.contains("4")); + if (enabledDeduplication) { + assertEquals(msgReceived, Arrays.asList("1", "2", "2-1", "2-2", "3", "4")); + } + + // cleanup. + taskToClearInjection.run(); + filedSchemaRegistryService.set(pulsar2, originalSchemaRegistryService); + consumer.close(); + p1.close(); + p2.close(); + p3.close(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index ef29376368cac..f7738c2207bf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -156,6 +156,10 @@ public void testConfigReplicationStartAt() throws Exception { p1.close(); admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + Awaitility.await().untilAsserted(() -> { + assertTrue(admin2.topics().getList(ns1).contains(topic1)); + }); + admin2.topics().createSubscription(topic1, subscription1, MessageId.earliest); org.apache.pulsar.client.api.Consumer c1 = client2.newConsumer(Schema.STRING).topic(topic1) .subscriptionName(subscription1).subscribe(); Message msg2 = c1.receive(2, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 75ff51055fc7e..ec37ffd3e1fb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -403,9 +403,9 @@ public void testReplicationWithSchema() throws Exception { int lastId = -1; for (int i = 0; i < totalMessages; i++) { - Message msg1 = consumer1.receive(); - Message msg2 = consumer2.receive(); - Message msg3 = consumer3.receive(); + Message msg1 = consumer1.receive(10, TimeUnit.SECONDS); + Message msg2 = consumer2.receive(10, TimeUnit.SECONDS); + Message msg3 = consumer3.receive(10, TimeUnit.SECONDS); assertTrue(msg1 != null && msg2 != null && msg3 != null); GenericRecord record1 = msg1.getValue(); GenericRecord record2 = msg2.getValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 243a5ccadb369..6bbd598b454b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -18,7 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -35,8 +36,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -74,6 +73,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -102,6 +102,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; + @Slf4j @Test(groups = "broker") public class PersistentTopicTest extends BrokerTestBase { @@ -216,6 +217,13 @@ public void testUnblockStuckSubscription() throws Exception { assertNotNull(msg); msg = consumer2.receive(5, TimeUnit.SECONDS); assertNotNull(msg); + + org.apache.pulsar.broker.service.Consumer sharedConsumer = sharedDispatcher.getConsumers().get(0); + Field blockField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs"); + blockField.setAccessible(true); + blockField.set(sharedConsumer, true); + producer.newMessage().value("test").eventTime(5).send(); + assertFalse(sharedSub.checkAndUnblockIfStuck()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5aeed40107d5d..423c4daa291b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -388,8 +388,13 @@ public void testAvgMessagesPerEntry() throws Exception { .batchingMaxPublishDelay(5, TimeUnit.SECONDS) .batchingMaxBytes(Integer.MAX_VALUE) .create(); - - producer.send("first-message"); + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + for (int i = 0; i < 20; i++) { + producer.send("first-message"); + } + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". List> futures = new ArrayList<>(); for (int i = 0; i < 20; i++) { futures.add(producer.sendAsync("message")); @@ -423,6 +428,7 @@ public void testAvgMessagesPerEntry() throws Exception { metadataConsumer.put("matchValueReschedule", "producer2"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer) + .receiverQueueSize(20) .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); int counter = 0; @@ -437,14 +443,17 @@ public void testAvgMessagesPerEntry() throws Exception { } } - assertEquals(21, counter); + assertEquals(40, counter); ConsumerStats consumerStats = admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0); - assertEquals(21, consumerStats.getMsgOutCounter()); + assertEquals(40, consumerStats.getMsgOutCounter()); - // Math.round(1 * 0.9 + 0.1 * (20 / 1)) + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry(); assertEquals(3, avgMessagesPerEntry); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index d3cb1d60d37ed..0c1de5708bb2f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -589,9 +589,11 @@ public Void call() throws Exception { restartBroker(); // The available permits should be 10 and num messages in the queue should be 90 - Awaitility.await().untilAsserted(() -> - Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads)); - Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads); + Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); + }); + consumer.close(); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 39ede3bb7aef1..7eaf0ad9bd6f9 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -632,6 +632,19 @@ public void testListFunctions() throws Exception { verify(functions, times(1)).getFunctions(eq(TENANT), eq(NAMESPACE)); } + @Test + public void testListFunctionsWithDefaultValue() throws Exception { + cmd.run(new String[] { + "list", + }); + + ListFunctions lister = cmd.getLister(); + assertEquals("public", lister.getTenant()); + assertEquals("default", lister.getNamespace()); + + verify(functions, times(1)).getFunctions(eq("public"), eq("default")); + } + @Test public void testStateGetter() throws Exception { String key = TEST_NAME + "-key"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 3c1662d00a034..752594f21d193 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.admin.cli; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.admin.cli.CmdTopics.printMessages; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; @@ -39,8 +40,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.cli.NoSplitter; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.util.RelativeTimeUtil; @Parameters(commandDescription = "Operations on persistent topics. The persistent-topics " @@ -606,26 +605,7 @@ private class PeekMessages extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); List> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages); - int position = 0; - for (Message msg : messages) { - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - if (msg.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, false, this); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index d0b04198e13fc..5d93ca9a4c8c4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1246,50 +1246,7 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, showServerMarker, transactionIsolationLevel); - int position = 0; - for (Message msg : messages) { - MessageImpl message = (MessageImpl) msg; - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - - System.out.println("Publish time: " + message.getPublishTime()); - System.out.println("Event time: " + message.getEventTime()); - - if (message.getDeliverAtTime() != 0) { - System.out.println("Deliver at time: " + message.getDeliverAtTime()); - } - MessageMetadata msgMetaData = message.getMessageBuilder(); - if (showServerMarker && msgMetaData.hasMarkerType()) { - System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); - } - - if (message.getBrokerEntryMetadata() != null) { - if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { - System.out.println("Broker entry metadata timestamp: " - + message.getBrokerEntryMetadata().getBrokerTimestamp()); - } - if (message.getBrokerEntryMetadata().hasIndex()) { - System.out.println("Broker entry metadata index: " - + message.getBrokerEntryMetadata().getIndex()); - } - } - - if (message.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, showServerMarker, this); } } @@ -1508,6 +1465,55 @@ static MessageId findFirstLedgerWithinThreshold(List> messages, boolean showServerMarker, CliCommand cli) { + if (messages == null) { + return; + } + int position = 0; + for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (++position != 1) { + System.out.println("-------------------------------------------------------------------------\n"); + } + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + MessageMetadata msgMetaData = message.getMessageBuilder(); + if (showServerMarker && msgMetaData.hasMarkerType()) { + System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + cli.print(msg.getProperties()); + } + ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); + System.out.println(ByteBufUtil.prettyHexDump(data)); + } + } + @Parameters(commandDescription = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)") private class Offload extends CliCommand { @Parameter(names = { "-s", "--size-threshold" }, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 675fa2063473f..89bc2e237a273 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -117,10 +117,12 @@ public class ClientCnx extends PulsarHandler { protected final Authentication authentication; protected State state; - private AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @VisibleForTesting + protected AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @VisibleForTesting @Getter - private final ConcurrentLongHashMap> pendingRequests = + protected final ConcurrentLongHashMap> pendingRequests = ConcurrentLongHashMap.>newBuilder() .expectedItems(16) .concurrencyLevel(1) @@ -188,6 +190,8 @@ public class ClientCnx extends PulsarHandler { private boolean supportsTopicWatchers; @Getter private boolean supportsGetPartitionedMetadataWithoutAutoCreation; + @Getter + private boolean brokerSupportsReplDedupByLidAndEid; /** Idle stat. **/ @Getter @@ -196,7 +200,7 @@ public class ClientCnx extends PulsarHandler { @Getter private long lastDisconnectedTimestamp; - private final String clientVersion; + protected final String clientVersion; protected enum State { None, SentConnectFrame, Ready, Failed, Connecting @@ -387,6 +391,8 @@ protected void handleConnected(CommandConnected connected) { supportsGetPartitionedMetadataWithoutAutoCreation = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); + brokerSupportsReplDedupByLidAndEid = + connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsReplDedupByLidAndEid(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); @@ -456,18 +462,19 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) { ledgerId = sendReceipt.getMessageId().getLedgerId(); entryId = sendReceipt.getMessageId().getEntryId(); } - + ProducerImpl producer = producers.get(producerId); if (ledgerId == -1 && entryId == -1) { - log.warn("{} Message with sequence-id {} published by producer {} has been dropped", ctx.channel(), - sequenceId, producerId); - } - - if (log.isDebugEnabled()) { - log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", ctx.channel(), producerId, sequenceId, - ledgerId, entryId); + log.warn("{} Message with sequence-id {}-{} published by producer [id:{}, name:{}] has been dropped", + ctx.channel(), sequenceId, highestSequenceId, producerId, + producer != null ? producer.getProducerName() : "null"); + } else { + if (log.isDebugEnabled()) { + log.debug("{} Got receipt for producer: [id:{}, name:{}] -- sequence-id: {}-{} -- entry-id: {}:{}", + ctx.channel(), producerId, producer.getProducerName(), sequenceId, highestSequenceId, + ledgerId, entryId); + } } - ProducerImpl producer = producers.get(producerId); if (producer != null) { producer.ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java new file mode 100644 index 0000000000000..3201dde748f34 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.util.ReferenceCountUtil; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.api.proto.KeyValue; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Markers; + +@Slf4j +public class GeoReplicationProducerImpl extends ProducerImpl{ + + public static final String MSG_PROP_REPL_SOURCE_POSITION = "__MSG_PROP_REPL_SOURCE_POSITION"; + public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER"; + + private long lastPersistedSourceLedgerId; + private long lastPersistedSourceEntryId; + + private final boolean isPersistentTopic; + + public GeoReplicationProducerImpl(PulsarClientImpl client, String topic, + ProducerConfigurationData conf, + CompletableFuture producerCreatedFuture, int partitionIndex, + Schema schema, ProducerInterceptors interceptors, + Optional overrideProducerName) { + super(client, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); + isPersistentTopic = TopicName.get(topic).isPersistent(); + } + + private boolean isBrokerSupportsReplDedupByLidAndEid(ClientCnx cnx) { + // Non-Persistent topic does not have ledger id or entry id, so it does not support. + return cnx.isBrokerSupportsReplDedupByLidAndEid() && isPersistentTopic; + } + + @Override + protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId, long entryId) { + if (!isBrokerSupportsReplDedupByLidAndEid(cnx)) { + // Repl V1 is the same as normal for this handling. + super.ackReceived(cnx, seq, highSeq, ledgerId, entryId); + return; + } + synchronized (this) { + OpSendMsg op = pendingMessages.peek(); + if (op == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Got ack for timed out msg {}:{}", topic, producerName, seq, highSeq); + } + return; + } + // Replicator send markers also, use sequenceId to check the marker send-receipt. + if (isReplicationMarker(highSeq)) { + ackReceivedReplMarker(cnx, op, seq, highSeq, ledgerId, entryId); + return; + } + ackReceivedReplicatedMsg(cnx, op, seq, highSeq, ledgerId, entryId); + } + } + + private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLId, long sourceEId, + long targetLId, long targetEid) { + // Parse source cluster's entry position. + Long pendingLId = null; + Long pendingEId = null; + List kvPairList = op.msg.getMessageBuilder().getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) { + if (!kvPair.getValue().contains(":")) { + break; + } + String[] ledgerIdAndEntryId = kvPair.getValue().split(":"); + if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0]) + || !StringUtils.isNumeric(ledgerIdAndEntryId[1])) { + break; + } + pendingLId = Long.valueOf(ledgerIdAndEntryId[0]); + pendingEId = Long.valueOf(ledgerIdAndEntryId[1]); + break; + } + } + + // Case-1: repeatedly publish. Source message was exactly resend by the Replicator after a cursor rewind. + // - The first time: Replicator --M1--> producer --> ... + // - Cursor rewind. + // - The second time: Replicator --M1--> producer --> ... + if (pendingLId != null && pendingEId != null + && (pendingLId < lastPersistedSourceLedgerId || (pendingLId.longValue() == lastPersistedSourceLedgerId + && pendingEId.longValue() <= lastPersistedSourceEntryId))) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[pending send is repeated due to repl cursor rewind]:" + + " source entry {}:{}, pending send: {}:{}, latest persisted: {}:{}", + topic, producerName, sourceLId, sourceEId, pendingLId, pendingEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId); + } + removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false); + ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid); + return; + } + + // Case-2: repeatedly publish. Send command was executed again by the producer after a reconnect. + // - Replicator --M1--> producer --> ... + // - The first time: producer call Send-Command-1. + // - Producer reconnect. + // - The second time: producer call Send-Command-1. + if (sourceLId < lastPersistedSourceLedgerId + || (sourceLId == lastPersistedSourceLedgerId && sourceEId <= lastPersistedSourceEntryId)) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted:" + + " {}:{}", + topic, producerName, sourceLId, sourceEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId); + } + return; + } + + // Case-3, which is expected. + if (pendingLId != null && pendingEId != null && sourceLId == pendingLId.longValue() + && sourceEId == pendingEId.longValue()) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry:" + + " {}:{}", + topic, producerName, sourceLId, sourceEId, + targetLId, targetEid); + } + lastPersistedSourceLedgerId = sourceLId; + lastPersistedSourceEntryId = sourceEId; + removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false); + return; + } + + // Case-4: Unexpected + // 4-1: got null source cluster's entry position, which is unexpected. + // 4-2: unknown error, which is unexpected. + log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{}," + + " pending send: {}:{}, latest persisted: {}:{}, queue-size: {}", + topic, producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId, + lastPersistedSourceLedgerId, lastPersistedSourceEntryId, pendingMessages.messagesCount()); + cnx.channel().close(); + } + + protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker, + long ledgerId, long entryId) { + // Case-1: repeatedly publish repl marker. + long lastSeqPersisted = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this); + if (lastSeqPersisted != 0 && seq <= lastSeqPersisted) { + // Ignoring the ack since it's referring to a message that has already timed out. + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}," + + " isSourceMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); + } + return; + } + + // Case-2, which is expected: + // condition: broker responds SendReceipt who is a repl marker. + // and condition: the current pending msg is also a marker. + boolean pendingMsgIsReplMarker = isReplicationMarker(op); + if (pendingMsgIsReplMarker && seq == op.sequenceId) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}," + + " isReplMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); + } + long calculatedSeq = getHighestSequenceId(op); + LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq)); + removeAndApplyCallback(op, seq, isSourceMarker, ledgerId, entryId, true); + return; + } + + // Case-3, unexpected. + // 3-1: if "lastSeqPersisted < seq <= lastInProgressSend", rather than going here, it should be a SendError. + // 3-2: unknown error. + long lastInProgressSend = LAST_SEQ_ID_PUSHED_UPDATER.get(this); + String logText = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s." + + " sequenceIdPersisted: %s, lastInProgressSend: %s," + + " isSourceMarker: %s, target entry: %s:%s, queue-size: %s", + topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown", + lastSeqPersisted, lastInProgressSend, + isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount() + ); + if (seq < lastInProgressSend) { + log.warn(logText); + } else { + log.error(logText); + } + // Force connection closing so that messages can be re-transmitted in a new connection. + cnx.channel().close(); + } + + private void removeAndApplyCallback(OpSendMsg op, long lIdSent, long eIdSent, long ledgerId, long entryId, + boolean isMarker) { + pendingMessages.remove(); + releaseSemaphoreForSendOp(op); + // Since Geo-Replicator will not send batched message, skip to update the field + // "LAST_SEQ_ID_PUBLISHED_UPDATER". + op.setMessageId(ledgerId, entryId, partitionIndex); + try { + // Need to protect ourselves from any exception being thrown in the future handler from the + // application + op.sendComplete(null); + } catch (Throwable t) { + log.warn("[{}] [{}] Got exception while completing the callback for -- source-message: {}:{} --" + + " target-msg: {}:{} -- isMarker: {}", + topic, producerName, lIdSent, eIdSent, ledgerId, entryId, isMarker, t); + } + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + } + + private boolean isReplicationMarker(OpSendMsg op) { + return op.msg != null && op.msg.getMessageBuilder().hasMarkerType() + && Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType()); + } + + private boolean isReplicationMarker(long highestSeq) { + return Long.MIN_VALUE == highestSeq; + } + + @Override + protected void updateLastSeqPushed(OpSendMsg op) { + // Only update the value for repl marker. + if (isReplicationMarker(op)) { + super.updateLastSeqPushed(op); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6fd47e072f017..56055dd16314d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -109,7 +109,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne // Variable is updated in a synchronized block private volatile long msgIdGenerator; - private final OpSendMsgQueue pendingMessages; + protected final OpSendMsgQueue pendingMessages; private final Optional semaphore; private volatile Timeout sendTimeout = null; private final long lookupDeadline; @@ -126,12 +126,12 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private LastSendFutureWrapper lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); // Globally unique producer name - private String producerName; + protected String producerName; private final boolean userProvidedProducerName; private String connectionId; private String connectedSince; - private final int partitionIndex; + protected final int partitionIndex; private final ProducerStatsRecorder stats; @@ -1199,7 +1199,7 @@ public void terminated(ClientCnx cnx) { } } - void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { + protected void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { OpSendMsg op = null; synchronized (this) { op = pendingMessages.peek(); @@ -1274,11 +1274,11 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le op.recycle(); } - private long getHighestSequenceId(OpSendMsg op) { + protected long getHighestSequenceId(OpSendMsg op) { return Math.max(op.highestSequenceId, op.sequenceId); } - private void releaseSemaphoreForSendOp(OpSendMsg op) { + protected void releaseSemaphoreForSendOp(OpSendMsg op) { semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); @@ -2272,10 +2272,7 @@ protected synchronized void processOpSendMsg(OpSendMsg op) { return; } pendingMessages.add(op); - if (op.msg != null) { - LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, - last -> Math.max(last, getHighestSequenceId(op))); - } + updateLastSeqPushed(op); final ClientCnx cnx = getCnxIfReady(); if (cnx != null) { @@ -2301,6 +2298,13 @@ protected synchronized void processOpSendMsg(OpSendMsg op) { } } + protected void updateLastSeqPushed(OpSendMsg op) { + if (op.msg != null) { + LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, + last -> Math.max(last, getHighestSequenceId(op))); + } + } + // Must acquire a lock on ProducerImpl.this before calling method. private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) { if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c5ec93bcdd2a8..cd9e6021c7e40 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -504,6 +504,10 @@ protected ProducerImpl newProducerImpl(String topic, int partitionIndex, ProducerInterceptors interceptors, CompletableFuture> producerCreatedFuture, Optional overrideProducerName) { + if (conf.isReplProducer()) { + return new GeoReplicationProducerImpl(PulsarClientImpl.this, topic, conf, producerCreatedFuture, + partitionIndex, schema, interceptors, overrideProducerName); + } return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 6ec738bbf4c8d..93261a3b7f68a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -206,6 +206,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private boolean isNonPartitionedTopicExpected; + private boolean isReplProducer; + @ApiModelProperty( name = "initialSubscriptionName", value = "Use this configuration to automatically create an initial subscription when creating a topic." diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 9fae9d1160e1c..c3e5fb8cb049d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -189,6 +189,7 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsBrokerEntryMetadata(true); flags.setSupportsPartialProducer(true); flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + flags.setSupportsReplDedupByLidAndEid(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -242,6 +243,15 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, AuthData originalAuthData, String originalAuthMethod, String proxyVersion) { + BaseCommand cmd = newConnectWithoutSerialize(authMethodName, authData, protocolVersion, libVersion, + targetBroker, originalPrincipal, originalAuthData, originalAuthMethod, proxyVersion); + return serializeWithSize(cmd); + } + + public static BaseCommand newConnectWithoutSerialize(String authMethodName, AuthData authData, + int protocolVersion, String libVersion, + String targetBroker, String originalPrincipal, AuthData originalAuthData, + String originalAuthMethod, String proxyVersion) { BaseCommand cmd = localCmd(Type.CONNECT); CommandConnect connect = cmd.setConnect() .setClientVersion(libVersion != null ? libVersion : "Pulsar Client") @@ -274,7 +284,7 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p connect.setProtocolVersion(protocolVersion); setFeatureFlags(connect.setFeatureFlags()); - return serializeWithSize(cmd); + return cmd; } public static ByteBuf newConnected(int clientProtocoVersion, boolean supportsTopicWatchers) { @@ -300,6 +310,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true); return cmd; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java index 2291aee781f60..de19b777ff015 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.common.protocol; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE; +import static org.apache.pulsar.common.api.proto.MarkerType.REPLICATED_SUBSCRIPTION_UPDATE; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; @@ -98,6 +102,13 @@ public static boolean isServerOnlyMarker(MessageMetadata msgMetadata) { return msgMetadata.hasMarkerType(); } + public static boolean isReplicationMarker(int markerType) { + return markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST.getValue() + || markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE.getValue() + || markerType == REPLICATED_SUBSCRIPTION_SNAPSHOT.getValue() + || markerType == REPLICATED_SUBSCRIPTION_UPDATE.getValue(); + } + public static boolean isReplicatedSubscriptionSnapshotMarker(MessageMetadata msgMetadata) { return msgMetadata != null && msgMetadata.hasMarkerType() diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index fa2181b484f41..eeddb4cdc652e 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -302,6 +302,7 @@ message FeatureFlags { optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; + optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false]; } message CommandConnected { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index c4bd1959accf8..5282507d5c4bc 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -25,6 +25,7 @@ import java.io.File; import java.net.URI; import java.util.Arrays; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletionException; @@ -128,11 +129,11 @@ private Object[][] allImplementations() { // The new connection string won't be available to the test method unless a // Supplier lambda is used for providing the value. return new Object[][]{ - {"ZooKeeper", stringSupplier(() -> zksConnectionString)}, - {"Memory", stringSupplier(() -> memoryConnectionString)}, - {"RocksDB", stringSupplier(() -> rocksdbConnectionString)}, - {"Etcd", stringSupplier(() -> "etcd:" + getEtcdClusterConnectString())}, - {"MockZooKeeper", stringSupplier(() -> mockZkUrl)}, + {"ZooKeeper", providerUrlSupplier(() -> zksConnectionString)}, + {"Memory", providerUrlSupplier(() -> memoryConnectionString)}, + {"RocksDB", providerUrlSupplier(() -> rocksdbConnectionString)}, + {"Etcd", providerUrlSupplier(() -> "etcd:" + getEtcdClusterConnectString(), "etcd:...")}, + {"MockZooKeeper", providerUrlSupplier(() -> mockZkUrl)}, }; } @@ -165,16 +166,29 @@ private synchronized String getEtcdClusterConnectString() { return etcdCluster.clientEndpoints().stream().map(URI::toString).collect(Collectors.joining(",")); } - public static Supplier stringSupplier(Supplier supplier) { - return new StringSupplier(supplier); + private static Supplier providerUrlSupplier(Supplier supplier) { + return new ProviderUrlSupplier(supplier); + } + + // Use this method to provide a custom toString value for the Supplier. Use this when Testcontainers is used + // so that a toString call doesn't eagerly trigger container initialization which could cause a deadlock + // with Gradle Develocity Maven Extension. + private static Supplier providerUrlSupplier(Supplier supplier, String toStringValue) { + return new ProviderUrlSupplier(supplier, Optional.ofNullable(toStringValue)); } // Implements toString() so that the test name is more descriptive - private static class StringSupplier implements Supplier { + private static class ProviderUrlSupplier implements Supplier { private final Supplier supplier; + private final Optional toStringValue; + + ProviderUrlSupplier(Supplier supplier) { + this(supplier, Optional.empty()); + } - public StringSupplier(Supplier supplier) { + ProviderUrlSupplier(Supplier supplier, Optional toStringValue) { this.supplier = supplier; + this.toStringValue = toStringValue; } @Override @@ -184,7 +198,9 @@ public String get() { @Override public String toString() { - return get(); + // toStringValue is used to prevent deadlocks which could occur if toString method call eagerly triggers + // Testcontainers initialization. This is the case when Gradle Develocity Maven Extension is used. + return toStringValue.orElseGet(this::get); } }