diff --git a/microbench/README.md b/microbench/README.md
index 780e3a5a1d3e8..f50c3036ff494 100644
--- a/microbench/README.md
+++ b/microbench/README.md
@@ -41,3 +41,29 @@ For fast recompiling of the benchmarks (without compiling Pulsar modules) and cr
mvn -Pmicrobench -pl microbench clean package
+### Running specific benchmarks
+Display help:
+java -jar microbench/target/microbenchmarks.jar -h
+Listing all benchmarks:
+java -jar microbench/target/microbenchmarks.jar -l
+Running specific benchmarks:
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*"
+Checking what benchmarks match the pattern:
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
index 4c069e72ea3ba..1b210258f13d2 100644
--- a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
+++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
@@ -33,6 +33,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
@@ -59,23 +60,29 @@ public void teardown() {
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark001Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void consumeTokensBenchmark001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark010Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void consumeTokensBenchmark010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark100Threads() {
+ public void consumeTokensBenchmark100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
+ }
+ private void consumeTokenAndGetTokens(Blackhole blackhole) {
+ // blackhole is used to ensure that the compiler doesn't do dead code elimination
+ blackhole.consume(asyncTokenBucket.getTokens());
diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
new file mode 100644
index 0000000000000..d9054b8fe4be8
--- /dev/null
+++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
@@ -0,0 +1,102 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.qos;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+public class DefaultMonotonicSnapshotClockBenchmark {
+ private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
+ new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), System::nanoTime);
+ @TearDown(Level.Iteration)
+ public void teardown() {
+ monotonicSnapshotClock.close();
+ }
+ @Threads(1)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanos001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+ @Threads(10)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanos010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+ @Threads(100)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanos100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+ @Threads(1)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
+ }
+ @Threads(10)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
+ }
+ @Threads(100)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
+ }
+ private void consumeTokenAndGetTokens(Blackhole blackhole, boolean requestSnapshot) {
+ // blackhole is used to ensure that the compiler doesn't do dead code elimination
+ blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot));
+ }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e592b..8c43fa0a816fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -42,6 +42,10 @@
* connection or client from the throttling queue to unthrottle. Before unthrottling, the application should check
* for available tokens. If tokens are still not available, the application should continue with throttling and
* repeat the throttling loop.
+ *
By default, the AsyncTokenBucket is eventually consistent. This means that the token balance is updated
+ * with added tokens and consumed tokens at most once during each "increment", when time advances more than the
+ * configured resolution. There are settings for configuring consistency, please see {@link AsyncTokenBucketBuilder}
+ * for details.
This class does not produce side effects outside its own scope. It functions similarly to a stateful function,
* akin to a counter function. In essence, it is a sophisticated counter. It can serve as a foundational component for
* constructing higher-level asynchronous rate limiter implementations, which require side effects for throttling.
@@ -119,9 +123,28 @@ public static void resetToDefaultEventualConsistentTokensView() {
private final LongAdder pendingConsumedTokens = new LongAdder();
- protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos) {
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from
+ * the total amount of tokens at most once during each "increment", when time advances more than the configured
+ * resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ private final boolean consistentConsumedTokens;
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based
+ * on elapsed time at most once during each "increment", when time advances more than the configured
+ * resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ private final boolean consistentAddedTokens;
+ protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos,
+ boolean consistentConsumedTokens, boolean consistentAddedTokens) {
this.clockSource = clockSource;
this.resolutionNanos = resolutionNanos;
+ this.lastNanos = Long.MIN_VALUE;
+ this.consistentConsumedTokens = consistentConsumedTokens;
+ this.consistentAddedTokens = consistentAddedTokens;
public static FinalRateAsyncTokenBucketBuilder builder() {
@@ -139,36 +162,46 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() {
* Consumes tokens and possibly updates the tokens balance. New tokens are calculated and added to the current
* tokens balance each time the update takes place. The update takes place once in every interval of the configured
- * resolutionNanos or when the forceUpdateTokens parameter is true.
+ * resolutionNanos or when the forceConsistentTokens parameter is true.
* When the tokens balance isn't updated, the consumed tokens are added to the pendingConsumedTokens LongAdder
* counter which gets flushed the next time the tokens are updated. This makes the tokens balance
* eventually consistent. The reason for this design choice is to optimize performance by preventing CAS loop
* contention which could cause excessive CPU consumption.
* @param consumeTokens number of tokens to consume, can be 0 to update the tokens balance
- * @param forceUpdateTokens if true, the tokens are updated even if the configured resolution hasn't passed
+ * @param forceConsistentTokens if true, the token balance is updated consistently
* @return the current number of tokens in the bucket or Long.MIN_VALUE when the number of tokens is unknown due
* to eventual consistency
- private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceUpdateTokens) {
+ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceConsistentTokens) {
if (consumeTokens < 0) {
throw new IllegalArgumentException("consumeTokens must be >= 0");
- long currentNanos = clockSource.getTickNanos(forceUpdateTokens);
+ boolean requestConsistentTickNanosSnapshot =
+ consistentAddedTokens || consistentConsumedTokens || forceConsistentTokens || resolutionNanos == 0;
+ long currentNanos = clockSource.getTickNanos(requestConsistentTickNanosSnapshot);
+ long newTokens = 0;
// check if the tokens should be updated immediately
- if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
+ if (shouldAddTokensImmediately(currentNanos, forceConsistentTokens)) {
// calculate the number of new tokens since the last update
- long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
- // calculate the total amount of tokens to consume in this update
+ newTokens = calculateNewTokensSinceLastUpdate(currentNanos, forceConsistentTokens);
+ }
+ // update tokens if there are new tokens or if resolutionNanos is set to 0 which is currently used for testing
+ if (newTokens > 0 || resolutionNanos == 0 || consistentConsumedTokens || forceConsistentTokens) {
// flush the pendingConsumedTokens by calling "sumThenReset"
- long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset();
- // update the tokens and return the current token value
- return TOKENS_UPDATER.updateAndGet(this,
- currentTokens ->
- // after adding new tokens, limit the tokens to the capacity
- Math.min(currentTokens + newTokens, getCapacity())
- // subtract the consumed tokens
- - totalConsumedTokens);
+ long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
+ // calculate the token delta by subtracting the consumed tokens from the new tokens
+ long tokenDelta = newTokens - currentPendingConsumedTokens;
+ if (tokenDelta != 0 || consumeTokens != 0) {
+ // update the tokens and return the current token value
+ return TOKENS_UPDATER.updateAndGet(this,
+ // limit the tokens to the capacity of the bucket
+ currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity())
+ // subtract the consumed tokens from the capped tokens
+ - consumeTokens);
+ } else {
+ return tokens;
+ }
} else {
// eventual consistent fast path, tokens are not updated immediately
@@ -187,19 +220,19 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
* The tokens will be updated once every resolutionNanos nanoseconds.
* This method checks if the configured resolutionNanos has passed since the last update.
- * If the forceUpdateTokens is true, the tokens will be updated immediately.
+ * If the forceConsistentTokens is true, the tokens will be updated immediately.
- * @param currentNanos the current monotonic clock time in nanoseconds
- * @param forceUpdateTokens if true, the tokens will be updated immediately
+ * @param currentNanos the current monotonic clock time in nanoseconds
+ * @param forceConsistentTokens if true, the tokens are added even if the configured resolution hasn't fully passed
* @return true if the tokens should be updated immediately, false otherwise
- private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUpdateTokens) {
+ private boolean shouldAddTokensImmediately(long currentNanos, boolean forceConsistentTokens) {
long currentIncrement = resolutionNanos != 0 ? currentNanos / resolutionNanos : 0;
long currentLastIncrement = lastIncrement;
return currentIncrement == 0
|| (currentIncrement > currentLastIncrement
&& LAST_INCREMENT_UPDATER.compareAndSet(this, currentLastIncrement, currentIncrement))
- || forceUpdateTokens;
+ || consistentAddedTokens || forceConsistentTokens;
@@ -209,10 +242,22 @@ private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUp
* @param currentNanos the current monotonic clock time in nanoseconds
* @return the number of new tokens to add since the last update
- private long calculateNewTokensSinceLastUpdate(long currentNanos) {
+ private long calculateNewTokensSinceLastUpdate(long currentNanos, boolean forceConsistentTokens) {
+ long previousLastNanos = lastNanos;
+ long newLastNanos;
+ // update lastNanos only if at least resolutionNanos/2 nanoseconds has passed since the last update
+ // unless consistency is needed
+ long minimumIncrementNanos = forceConsistentTokens || consistentAddedTokens ? 0L : resolutionNanos / 2;
+ if (currentNanos > previousLastNanos + minimumIncrementNanos) {
+ newLastNanos = currentNanos;
+ } else {
+ newLastNanos = previousLastNanos;
+ }
long newTokens;
- long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this, currentNanos);
- if (previousLastNanos == 0) {
+ if (newLastNanos == previousLastNanos
+ // prevent races with a CAS update of lastNanos
+ || !LAST_NANOS_UPDATER.compareAndSet(this, previousLastNanos, newLastNanos)
+ || previousLastNanos == Long.MIN_VALUE) {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
@@ -267,15 +312,14 @@ public boolean consumeTokensAndCheckIfContainsTokens(long consumeTokens) {
- * Returns the current token balance. When forceUpdateTokens is true, the tokens balance is updated before
- * returning. If forceUpdateTokens is false, the tokens balance could be updated if the last updated happened
+ * Returns the current token balance. When forceConsistentTokens is true, the tokens balance is updated before
+ * returning. If forceConsistentTokens is false, the tokens balance could be updated if the last updated happened
* more than resolutionNanos nanoseconds ago.
- * @param forceUpdateTokens if true, the tokens balance is updated before returning
* @return the current token balance
- protected long tokens(boolean forceUpdateTokens) {
- long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, forceUpdateTokens);
+ private long tokens() {
+ long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, false);
if (currentTokens != Long.MIN_VALUE) {
// when currentTokens isn't Long.MIN_VALUE, the current tokens balance is known
return currentTokens;
@@ -295,7 +339,7 @@ public long calculateThrottlingDuration() {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true);
if (currentTokens == Long.MIN_VALUE) {
throw new IllegalArgumentException(
- "Unexpected result from updateAndConsumeTokens with forceUpdateTokens set to true");
+ "Unexpected result from updateAndConsumeTokens with forceConsistentTokens set to true");
if (currentTokens > 0) {
return 0L;
@@ -309,10 +353,11 @@ public long calculateThrottlingDuration() {
* Returns the current number of tokens in the bucket.
- * The token balance is updated if the configured resolutionNanos has passed since the last update.
+ * The token balance is updated if the configured resolutionNanos has passed since the last update unless
+ * consistentConsumedTokens is true.
public final long getTokens() {
- return tokens(false);
+ return tokens();
public abstract long getRate();
@@ -320,25 +365,13 @@ public final long getTokens() {
* Checks if the bucket contains tokens.
* The token balance is updated before the comparison if the configured resolutionNanos has passed since the last
- * update. It's possible that the returned result is not definite since the token balance is eventually consistent.
+ * update. It's possible that the returned result is not definite since the token balance is eventually consistent
+ * if consistentConsumedTokens is false.
* @return true if the bucket contains tokens, false otherwise
public boolean containsTokens() {
- return containsTokens(false);
- }
- /**
- * Checks if the bucket contains tokens.
- * The token balance is updated before the comparison if the configured resolutionNanos has passed since the last
- * update. The token balance is also updated when forceUpdateTokens is true.
- * It's possible that the returned result is not definite since the token balance is eventually consistent.
- *
- * @param forceUpdateTokens if true, the token balance is updated before the comparison
- * @return true if the bucket contains tokens, false otherwise
- */
- public boolean containsTokens(boolean forceUpdateTokens) {
- return tokens(forceUpdateTokens) > 0;
+ return tokens() > 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
index ee256d5a37d64..1c05f1a213e3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
@@ -23,6 +23,8 @@
public abstract class AsyncTokenBucketBuilder> {
protected MonotonicSnapshotClock clock = AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK;
protected long resolutionNanos = AsyncTokenBucket.defaultResolutionNanos;
+ protected boolean consistentConsumedTokens;
+ protected boolean consistentAddedTokens;
protected AsyncTokenBucketBuilder() {
@@ -31,15 +33,47 @@ protected SELF self() {
return (SELF) this;
+ /**
+ * Set the clock source for the token bucket. It's recommended to use the {@link DefaultMonotonicSnapshotClock}
+ * for most use cases.
+ */
public SELF clock(MonotonicSnapshotClock clock) {
this.clock = clock;
return self();
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that the token balance is updated, when time
+ * advances more than the configured resolution. This setting determines the duration of the increment.
+ * Setting this value to 0 will make the token balance fully consistent. There's a performance trade-off
+ * when setting this value to 0.
+ */
public SELF resolutionNanos(long resolutionNanos) {
this.resolutionNanos = resolutionNanos;
return self();
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from
+ * the total amount of tokens at most once during each "increment", when time advances more than the configured
+ * resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ public SELF consistentConsumedTokens(boolean consistentConsumedTokens) {
+ this.consistentConsumedTokens = consistentConsumedTokens;
+ return self();
+ }
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based
+ * on elapsed time at most once during each "increment", when time advances more than the configured
+ * resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ public SELF consistentAddedTokens(boolean consistentAddedTokens) {
+ this.consistentAddedTokens = consistentAddedTokens;
+ return self();
+ }
public abstract AsyncTokenBucket build();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
index df3843921ed55..23b9359c8042d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
@@ -19,71 +19,269 @@
package org.apache.pulsar.broker.qos;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- * Default implementation of {@link MonotonicSnapshotClock}.
+ * Default implementation of {@link MonotonicSnapshotClock} optimized for use with {@link AsyncTokenBucket}.
- * Starts a daemon thread that updates the snapshot value periodically with a configured interval. The close method
- * should be called to stop the thread.
+ *
+ * This class provides a monotonic snapshot value that consistently increases, ensuring reliable behavior
+ * even in environments where the underlying clock source may not be strictly monotonic across all CPUs,
+ * such as certain virtualized platforms.
+ *
+ *
+ *
+ * Upon instantiation, a daemon thread is launched to periodically update the snapshot value at a configured
+ * interval. It is essential to invoke the {@link #close()} method to gracefully terminate this thread when it is
+ * no longer needed.
+ *
+ *
+ *
+ * The {@link AsyncTokenBucket} utilizes this clock to obtain tick values. It does not require a consistent value on
+ * every retrieval. However, when a consistent snapshot is necessary, the {@link #getTickNanos(boolean)} method
+ * is called with the {@code requestSnapshot} parameter set to {@code true}.
+ *
+ *
+ *
+ * By employing a single thread to update the monotonic clock value, this implementation ensures that the snapshot
+ * value remains strictly increasing. This approach mitigates potential inconsistencies that may arise from clock
+ * source discrepancies across different CPUs.
+ *
public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class);
- private final long sleepMillis;
- private final int sleepNanos;
- private final LongSupplier clockSource;
- private final Thread thread;
+ private final TickUpdaterThread tickUpdaterThread;
private volatile long snapshotTickNanos;
public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier clockSource) {
if (snapshotIntervalNanos < TimeUnit.MILLISECONDS.toNanos(1)) {
throw new IllegalArgumentException("snapshotIntervalNanos must be at least 1 millisecond");
- this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
- this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis));
- this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() + "-update-loop");
- thread.setDaemon(true);
- thread.start();
+ tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos,
+ Objects.requireNonNull(clockSource, "clockSource must not be null"), this::setSnapshotTickNanos);
+ tickUpdaterThread.start();
+ }
+ private void setSnapshotTickNanos(long snapshotTickNanos) {
+ this.snapshotTickNanos = snapshotTickNanos;
/** {@inheritDoc} */
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdateAndWait();
return snapshotTickNanos;
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
+ @Override
+ public void close() {
+ tickUpdaterThread.interrupt();
- private void snapshotLoop() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- updateSnapshotTickNanos();
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward, but logic in this class will handle it.
+ */
+ private static class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final MonotonicLeapDetectingTickUpdater tickUpdater;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private AtomicLong requestCount = new AtomicLong();
+ private final long sleepMillis;
+ private final int sleepNanos;
+ TickUpdaterThread(long snapshotIntervalNanos, LongSupplier clockSource, LongConsumer setSnapshotTickNanos) {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() + "-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
+ this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis));
+ tickUpdater = new MonotonicLeapDetectingTickUpdater(clockSource, setSnapshotTickNanos,
+ snapshotIntervalNanos);
+ }
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ // track if the thread has waited for the whole duration of the snapshot interval
+ // before updating the tick value
+ boolean waitedSnapshotInterval = false;
+ // sleep for the configured interval on a monitor that can be notified to stop the sleep
+ // and update the tick value immediately. This is used in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made since the last update
+ if (requestCount.get() == updatedForRequestCount) {
+ // if no request has been made, sleep for the configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis, sleepNanos);
+ waitedSnapshotInterval = !tickUpdateDelayMonitorNotified;
+ }
+ }
+ updatedForRequestCount = requestCount.get();
+ // update the tick value using the tick updater which will tolerate leaps backward
+ tickUpdater.update(waitedSnapshotInterval);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+ private void notifyAllTickUpdated() {
+ synchronized (tickUpdatedMonitor) {
+ // notify all threads that are waiting for the tick value to be updated
+ tickUpdatedMonitor.notifyAll();
+ }
+ }
+ public void requestUpdateAndWait() {
+ if (!running) {
+ synchronized (tickUpdater) {
+ // thread has stopped running, fallback to update the value directly without optimizations
+ tickUpdater.update(false);
+ }
+ return;
+ }
+ // increment the request count that ensures that the thread will update the tick value after this request
+ // was made also when there's a race condition between the request and the update
+ // this solution doesn't prevent all races, and it's not guaranteed that the tick value is always updated
+ // it will prevent the request having to wait for the delayed update cycle. This is sufficient for the
+ // use case.
+ requestCount.incrementAndGet();
+ synchronized (tickUpdatedMonitor) {
+ // notify the thread to stop waiting and update the tick value
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = true;
+ tickUpdateDelayMonitor.notify();
+ }
+ // wait until the tick value has been updated
try {
- Thread.sleep(sleepMillis, sleepNanos);
+ tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
+ currentThread().interrupt();
+ }
+ }
+ }
+ @Override
+ public synchronized void start() {
+ // wait until the thread is started and the tick value has been updated
+ synchronized (tickUpdatedMonitor) {
+ super.start();
+ try {
+ tickUpdatedMonitor.wait();
+ } catch (InterruptedException e) {
+ currentThread().interrupt();
- } catch (Throwable t) {
- // report unexpected error since this would be a fatal error when the clock doesn't progress anymore
- // this is very unlikely to happen, but it's better to log it in any case
- LOG.error("Unexpected fatal error that stopped the clock.", t);
- @Override
- public void close() {
- thread.interrupt();
+ /**
+ * Handles updating the tick value in a monotonic way so that the value is always increasing,
+ * regardless of leaps backward in the clock source value.
+ */
+ static class MonotonicLeapDetectingTickUpdater {
+ private final LongSupplier clockSource;
+ private final long snapshotInternalNanos;
+ private final long maxDeltaNanosForLeapDetection;
+ private final LongConsumer tickUpdatedCallback;
+ private long referenceClockSourceValue = Long.MIN_VALUE;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+ MonotonicLeapDetectingTickUpdater(LongSupplier clockSource, LongConsumer tickUpdatedCallback,
+ long snapshotInternalNanos) {
+ this.clockSource = clockSource;
+ this.snapshotInternalNanos = snapshotInternalNanos;
+ this.maxDeltaNanosForLeapDetection = 2 * snapshotInternalNanos;
+ this.tickUpdatedCallback = tickUpdatedCallback;
+ }
+ /**
+ * Updates the snapshot tick value. The tickUpdatedCallback is called if the value has changed.
+ * The value is updated in a monotonic way so that the value is always increasing, regardless of leaps backward
+ * in the clock source value.
+ * Leap detection is done by comparing the new value with the previous value and the maximum delta value.
+ *
+ * @param waitedSnapshotInterval if true, the method has waited for the snapshot interval since the previous
+ * call.
+ */
+ public void update(boolean waitedSnapshotInterval) {
+ // get the current clock source value
+ long clockValue = clockSource.getAsLong();
+ // Initialization on first call
+ if (referenceClockSourceValue == Long.MIN_VALUE) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ // update the tick value using the callback
+ tickUpdatedCallback.accept(clockValue);
+ return;
+ }
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward
+ long durationSinceReference = clockValue - referenceClockSourceValue;
+ // calculate the new snapshot tick value as a duration since the reference clock source value
+ // and add it to the base snapshot tick value
+ long newSnapshotTickNanos = baseSnapshotTickNanos + durationSinceReference;
+ // reset the reference clock source value if the clock source value leaps backward
+ // more than the maximum delta value
+ if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDeltaNanosForLeapDetection) {
+ // when the clock source value leaps backward, reset the reference value to the new value
+ // for future duration calculations
+ referenceClockSourceValue = clockValue;
+ // if the updater thread has waited for the snapshot interval since the previous call,
+ // increment the base snapshot tick value by the snapshot interval value
+ long incrementWhenLeapDetected = waitedSnapshotInterval ? snapshotInternalNanos : 0;
+ // set the base snapshot tick value to the new value
+ baseSnapshotTickNanos = previousSnapshotTickNanos + incrementWhenLeapDetected;
+ // set the new snapshot tick value to the base value
+ newSnapshotTickNanos = baseSnapshotTickNanos;
+ }
+ // update snapshotTickNanos value if the new value is greater than the previous value
+ if (newSnapshotTickNanos > previousSnapshotTickNanos) {
+ // store the previous value
+ previousSnapshotTickNanos = newSnapshotTickNanos;
+ // update the tick value using the callback
+ tickUpdatedCallback.accept(newSnapshotTickNanos);
+ }
+ }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
index 8edc73d1f51e3..f2eae8aed8d9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
@@ -34,15 +34,16 @@ public class DynamicRateAsyncTokenBucket extends AsyncTokenBucket {
protected DynamicRateAsyncTokenBucket(double capacityFactor, LongSupplier rateFunction,
MonotonicSnapshotClock clockSource, LongSupplier ratePeriodNanosFunction,
- long resolutionNanos, double initialTokensFactor,
+ long resolutionNanos, boolean consistentConsumedTokens,
+ boolean consistentAddedTokens, double initialTokensFactor,
double targetFillFactorAfterThrottling) {
- super(clockSource, resolutionNanos);
+ super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens);
this.capacityFactor = capacityFactor;
this.rateFunction = rateFunction;
this.ratePeriodNanosFunction = ratePeriodNanosFunction;
this.targetFillFactorAfterThrottling = targetFillFactorAfterThrottling;
this.tokens = (long) (rateFunction.getAsLong() * initialTokensFactor);
- tokens(false);
+ getTokens();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
index 22270484c72f0..8aebecddf90c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
@@ -64,9 +64,7 @@ public DynamicRateAsyncTokenBucketBuilder targetFillFactorAfterThrottling(
public AsyncTokenBucket build() {
return new DynamicRateAsyncTokenBucket(this.capacityFactor, this.rateFunction,
- this.clock,
- this.ratePeriodNanosFunction, this.resolutionNanos,
- this.initialFillFactor,
- targetFillFactorAfterThrottling);
+ this.clock, this.ratePeriodNanosFunction, this.resolutionNanos, this.consistentConsumedTokens,
+ this.consistentAddedTokens, this.initialFillFactor, targetFillFactorAfterThrottling);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
index 627c5ee1334b2..d83290b723f07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
@@ -30,15 +30,16 @@ class FinalRateAsyncTokenBucket extends AsyncTokenBucket {
private final long targetAmountOfTokensAfterThrottling;
protected FinalRateAsyncTokenBucket(long capacity, long rate, MonotonicSnapshotClock clockSource,
- long ratePeriodNanos, long resolutionNanos, long initialTokens) {
- super(clockSource, resolutionNanos);
+ long ratePeriodNanos, long resolutionNanos, boolean consistentConsumedTokens,
+ boolean consistentAddedTokens, long initialTokens) {
+ super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens);
this.capacity = capacity;
this.rate = rate;
this.ratePeriodNanos = ratePeriodNanos != -1 ? ratePeriodNanos : ONE_SECOND_NANOS;
// The target amount of tokens is the amount of tokens made available in the resolution duration
this.targetAmountOfTokensAfterThrottling = Math.max(this.resolutionNanos * rate / ratePeriodNanos, 1);
this.tokens = initialTokens;
- tokens(false);
+ getTokens();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
index ff4ed53c6c7fa..a292000eaa825 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
@@ -55,7 +55,7 @@ public FinalRateAsyncTokenBucketBuilder initialTokens(long initialTokens) {
public AsyncTokenBucket build() {
return new FinalRateAsyncTokenBucket(this.capacity != null ? this.capacity : this.rate, this.rate,
- this.ratePeriodNanos, this.resolutionNanos,
+ this.ratePeriodNanos, this.resolutionNanos, this.consistentConsumedTokens, this.consistentAddedTokens,
this.initialTokens != null ? this.initialTokens : this.rate
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ddd436b085493..413b1b79d7a45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2479,6 +2479,12 @@ public long getNumberOfNamespaceBundles() {
private void handleMetadataChanges(Notification n) {
+ if (!pulsar.isRunning()) {
+ // Ignore metadata changes when broker is not running
+ log.info("Ignoring metadata change since broker is not running (id={}, state={}) {}", pulsar.getBrokerId(),
+ pulsar.getState(), n);
+ return;
+ }
if (n.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(n.getPath())) {
NamespaceName ns = NamespaceResources.namespaceFromPath(n.getPath());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 8255d9b6931ff..90c8de5f97a05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -20,11 +20,11 @@
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.channel.EventLoopGroup;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
import org.apache.pulsar.common.policies.data.Policies;
@@ -32,6 +32,7 @@
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
public class PublishRateLimiterImpl implements PublishRateLimiter {
private volatile AsyncTokenBucket tokenBucketOnMessage;
private volatile AsyncTokenBucket tokenBucketOnByte;
@@ -80,7 +81,7 @@ private void scheduleDecrementThrottleCount(Producer producer) {
// schedule unthrottling when the throttling count is incremented to 1
// this is to avoid scheduling unthrottling multiple times for concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
- EventLoopGroup executor = producer.getCnx().getBrokerService().executor();
+ ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next();
scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
@@ -134,7 +135,11 @@ private void unthrottleQueuedProducers(ScheduledExecutorService executor) {
// unthrottle as many producers as possible while there are token available
while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L
&& (producer = unthrottlingQueue.poll()) != null) {
- producer.decrementThrottleCount();
+ try {
+ producer.decrementThrottleCount();
+ } catch (Exception e) {
+ log.error("Failed to unthrottle producer {}", producer, e);
+ }
// if there are still producers to be unthrottled, schedule unthrottling again
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b29cbcd660db1..f43b134eb122a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -76,7 +77,9 @@ public DispatchRateLimiter(BrokerService brokerService) {
* @return
public long getAvailableDispatchRateLimitOnMsg() {
- return dispatchRateLimiterOnMessage == null ? -1 : Math.max(dispatchRateLimiterOnMessage.getTokens(), 0);
+ AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
+ return localDispatchRateLimiterOnMessage == null ? -1 :
+ Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0);
@@ -85,7 +88,8 @@ public long getAvailableDispatchRateLimitOnMsg() {
* @return
public long getAvailableDispatchRateLimitOnByte() {
- return dispatchRateLimiterOnByte == null ? -1 : Math.max(dispatchRateLimiterOnByte.getTokens(), 0);
+ AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
+ return localDispatchRateLimiterOnByte == null ? -1 : Math.max(localDispatchRateLimiterOnByte.getTokens(), 0);
@@ -95,11 +99,13 @@ public long getAvailableDispatchRateLimitOnByte() {
* @param byteSize
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
- if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
- dispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
+ AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
+ if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
+ localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
- if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
- dispatchRateLimiterOnByte.consumeTokens(byteSize);
+ AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
+ if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
+ localDispatchRateLimiterOnByte.consumeTokens(byteSize);
@@ -221,13 +227,14 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (msgRate > 0) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnMessage =
- AsyncTokenBucket.builderForDynamicRate()
+ configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
.rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
} else {
this.dispatchRateLimiterOnMessage =
- AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos)
+ configureAsyncTokenBucket(AsyncTokenBucket.builder())
+ .rate(msgRate).ratePeriodNanos(ratePeriodNanos)
} else {
@@ -238,13 +245,14 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (byteRate > 0) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnByte =
- AsyncTokenBucket.builderForDynamicRate()
+ configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
.rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
} else {
this.dispatchRateLimiterOnByte =
- AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos)
+ configureAsyncTokenBucket(AsyncTokenBucket.builder())
+ .rate(byteRate).ratePeriodNanos(ratePeriodNanos)
} else {
@@ -252,6 +260,11 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
+ private > T configureAsyncTokenBucket(T builder) {
+ builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock());
+ return builder;
+ }
private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg()
@@ -270,7 +283,8 @@ private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
* @return
public long getDispatchRateOnMsg() {
- return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1;
+ AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
+ return localDispatchRateLimiterOnMessage != null ? localDispatchRateLimiterOnMessage.getRate() : -1;
@@ -279,7 +293,8 @@ public long getDispatchRateOnMsg() {
* @return
public long getDispatchRateOnByte() {
- return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1;
+ AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
+ return localDispatchRateLimiterOnByte != null ? localDispatchRateLimiterOnByte.getRate() : -1;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index b1de10e73b76f..0f98ab94142c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -70,7 +70,7 @@ public synchronized boolean tryAcquire(ConsumerIdentifier consumerIdentifier) {
if (tokenBucket == null) {
return true;
- if (!tokenBucket.containsTokens(true)) {
+ if (!tokenBucket.containsTokens()) {
return false;
@@ -117,7 +117,11 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
AsyncTokenBucket tokenBucket =
- AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
+ AsyncTokenBucket.builder()
+ .consistentAddedTokens(true)
+ .consistentConsumedTokens(true)
+ .clock(brokerService.getPulsar().getMonotonicSnapshotClock())
+ .rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket);
} else {
// subscribe-rate should be disable and close
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 8dd2fc1c3c26d..42e2c00f73acf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -170,20 +170,26 @@ protected final void resetConfig() {
protected final void internalSetup() throws Exception {
- lookupUrl = new URI(brokerUrl.toString());
- if (isTcpLookup) {
- lookupUrl = new URI(pulsar.getBrokerServiceUrl());
+ lookupUrl = resolveLookupUrl();
+ if (isTcpLookup && enableBrokerGateway) {
// setup port forwarding from the advertised port to the listen port
- if (enableBrokerGateway) {
- InetSocketAddress gatewayAddress = new InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
- InetSocketAddress brokerAddress = new InetSocketAddress("", pulsar.getBrokerListenPort().get());
- brokerGateway = new PortForwarder(gatewayAddress, brokerAddress);
- }
+ InetSocketAddress gatewayAddress = new InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
+ InetSocketAddress brokerAddress = new InetSocketAddress("", pulsar.getBrokerListenPort().get());
+ brokerGateway = new PortForwarder(gatewayAddress, brokerAddress);
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ private URI resolveLookupUrl() {
+ if (isTcpLookup) {
+ return URI.create(pulsar.getBrokerServiceUrl());
+ } else {
+ return URI.create(brokerUrl != null
+ ? brokerUrl.toString()
+ : brokerUrlTls.toString());
+ }
+ }
protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception {
this.conf = serviceConfiguration;
@@ -228,11 +234,10 @@ protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder) throws P
protected final void internalSetupForStatsTest() throws Exception {
- String lookupUrl = brokerUrl.toString();
- if (isTcpLookup) {
- lookupUrl = new URI(pulsar.getBrokerServiceUrl()).toString();
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
- pulsarClient = newPulsarClient(lookupUrl, 1);
+ pulsarClient = newPulsarClient(resolveLookupUrl().toString(), 1);
protected void doInitConf() throws Exception {
@@ -360,6 +365,9 @@ protected void afterPulsarStart(PulsarService pulsar) throws Exception {
protected void restartBroker() throws Exception {
+ if (pulsarClient == null) {
+ pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ }
protected void stopBroker() throws Exception {
@@ -384,12 +392,16 @@ protected void startBroker() throws Exception {
brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;
- if (admin != null) {
- admin.close();
- if (MockUtil.isMock(admin)) {
- Mockito.reset(admin);
+ URI newLookupUrl = resolveLookupUrl();
+ if (lookupUrl == null || !newLookupUrl.equals(lookupUrl)) {
+ lookupUrl = newLookupUrl;
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ closeAdmin();
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString()
: brokerUrlTls.toString());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
index b446f9e902f2a..82793f2748d78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.qos;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +51,8 @@ private void incrementMillis(long millis) {
void shouldAddTokensWithConfiguredRate() {
asyncTokenBucket =
- AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ AsyncTokenBucket.builder().consistentConsumedTokens(true)
+ .capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
assertEquals(asyncTokenBucket.getTokens(), 50);
@@ -64,7 +66,7 @@ void shouldAddTokensWithConfiguredRate() {
// Consume all and verify none available and then wait 1 period and check replenished
- assertEquals(asyncTokenBucket.tokens(true), 0);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
assertEquals(asyncTokenBucket.getTokens(), 10);
@@ -91,13 +93,148 @@ void shouldSupportFractionsWhenUpdatingTokens() {
void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens() {
asyncTokenBucket =
- AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ AsyncTokenBucket.builder().capacity(100)
+ .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+ .rate(10)
+ .initialTokens(0)
+ .clock(clockSource)
+ .build();
for (int i = 0; i < 150; i++) {
assertEquals(asyncTokenBucket.getTokens(), 1);
assertEquals(asyncTokenBucket.getTokens(), 3);
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 3);
+ incrementMillis(99);
+ assertEquals(asyncTokenBucket.getTokens(), 4);
+ }
+ @Test
+ void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens2() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder().capacity(100)
+ .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+ .rate(1)
+ .initialTokens(0)
+ .clock(clockSource)
+ .build();
+ for (int i = 0; i < 150; i++) {
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ }
+ incrementMillis(150);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ incrementMillis(699);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 1);
+ incrementMillis(1000);
+ assertEquals(asyncTokenBucket.getTokens(), 2);
+ }
+ @Test
+ void shouldHandleNegativeBalanceWithEventuallyConsistentTokenUpdates() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ // intentionally pick a coarse resolution
+ .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+ .capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ // assert that the token balance is 0 initially
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 10000; i++) {
+ asyncTokenBucket.consumeTokens(500);
+ incrementSeconds(50);
+ }
+ // let 9 seconds pass
+ incrementSeconds(9);
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+ @Test
+ void shouldNotExceedTokenBucketSizeWithNegativeTokens() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ // intentionally pick a coarse resolution
+ .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+ .capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ // assert that the token balance is 0 initially
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 100; i++) {
+ asyncTokenBucket.consumeTokens(600);
+ incrementSeconds(50);
+ // let tokens accumulate back to 0 every 10 seconds
+ if ((i + 1) % 10 == 0) {
+ incrementSeconds(100);
+ }
+ }
+ // let 9 seconds pass
+ incrementSeconds(9);
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+ }
+ @Test
+ void shouldAccuratelyCalculateTokensWhenTimeIsLaggingBehindInInconsistentUpdates() {
+ clockSource = requestSnapshot -> {
+ if (requestSnapshot) {
+ return manualClockSource.get();
+ } else {
+ // let the clock lag behind
+ return manualClockSource.get() - TimeUnit.SECONDS.toNanos(52);
+ }
+ };
+ incrementSeconds(1);
+ asyncTokenBucket =
+ AsyncTokenBucket.builder().resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+ .capacity(100).rate(10).initialTokens(100).clock(clockSource).build();
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(100);
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 10000; i++) {
+ asyncTokenBucket.consumeTokens(500);
+ incrementSeconds(i == 0 ? 40 : 50);
+ }
+ // let 9 seconds pass
+ incrementSeconds(9);
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+ }
+ @Test
+ void shouldHandleEventualConsistency() {
+ AtomicLong offset = new AtomicLong(0);
+ long resolutionNanos = TimeUnit.MILLISECONDS.toNanos(1);
+ DefaultMonotonicSnapshotClock monotonicSnapshotClock =
+ new DefaultMonotonicSnapshotClock(resolutionNanos,
+ () -> offset.get() + manualClockSource.get());
+ long initialTokens = 500L;
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ .consistentConsumedTokens(true)
+ .resolutionNanos(resolutionNanos)
+ .capacity(100000).rate(1000).initialTokens(initialTokens).clock(monotonicSnapshotClock).build();
+ for (int i = 0; i < 100000; i++) {
+ // increment the clock by 1ms, since rate is 1000 tokens/s, this should make 1 token available
+ incrementMillis(1);
+ // consume 1 token
+ asyncTokenBucket.consumeTokens(1);
+ }
+ assertThat(asyncTokenBucket.getTokens())
+ // since the rate is 1/ms and the test increments the clock by 1ms and consumes 1 token in each
+ // iteration, the tokens should be equal to the initial tokens
+ .isEqualTo(initialTokens);
+ }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
new file mode 100644
index 0000000000000..0820b439915bb
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
@@ -0,0 +1,185 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.qos;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.data.Offset;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+public class DefaultMonotonicSnapshotClockTest {
+ @DataProvider
+ private static Object[] booleanValues() {
+ return new Object[]{ true, false };
+ }
+ @Test(dataProvider = "booleanValues")
+ void testClockHandlesTimeLeapsBackwards(boolean requestSnapshot) throws InterruptedException {
+ long snapshotIntervalMillis = 5;
+ AtomicLong clockValue = new AtomicLong(1);
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new DefaultMonotonicSnapshotClock(Duration.ofMillis(snapshotIntervalMillis).toNanos(),
+ clockValue::get);
+ long previousTick = -1;
+ boolean leapDirection = true;
+ for (int i = 0; i < 10000; i++) {
+ clockValue.addAndGet(TimeUnit.MILLISECONDS.toNanos(1));
+ long tick = clock.getTickNanos(requestSnapshot);
+ //log.info("i = {}, tick = {}", i, tick);
+ if ((i + 1) % 5 == 0) {
+ leapDirection = !leapDirection;
+ //log.info("Time leap 5 minutes backwards");
+ clockValue.addAndGet(-Duration.ofMinutes(5).toNanos());
+ }
+ if (previousTick != -1) {
+ assertThat(tick)
+ .describedAs("i = %d, tick = %d, previousTick = %d", i, tick, previousTick)
+ .isGreaterThanOrEqualTo(previousTick)
+ .isCloseTo(previousTick,
+ // then snapshot is requested, the time difference between the two ticks is accurate
+ // otherwise allow time difference at most 4 times the snapshot interval since the
+ // clock is updated periodically by a background thread
+ Offset.offset(TimeUnit.MILLISECONDS.toNanos(
+ requestSnapshot ? 1 : 4 * snapshotIntervalMillis)));
+ }
+ previousTick = tick;
+ }
+ }
+ @Test
+ void testRequestUpdate() throws InterruptedException {
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), System::nanoTime);
+ long tick1 = clock.getTickNanos(false);
+ long tick2 = clock.getTickNanos(true);
+ assertThat(tick2).isGreaterThan(tick1);
+ }
+ @Test
+ void testRequestingSnapshotAfterClosed() throws InterruptedException {
+ DefaultMonotonicSnapshotClock clock =
+ new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), System::nanoTime);
+ clock.close();
+ long tick1 = clock.getTickNanos(true);
+ Thread.sleep(10);
+ long tick2 = clock.getTickNanos(true);
+ assertThat(tick2).isGreaterThan(tick1);
+ }
+ @Test
+ void testConstructorValidation() {
+ assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(0, System::nanoTime))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("snapshotIntervalNanos must be at least 1 millisecond");
+ assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(-1, System::nanoTime))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("snapshotIntervalNanos must be at least 1 millisecond");
+ assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("clockSource must not be null");
+ }
+ @Test
+ void testFailureHandlingInClockSource() {
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), () -> {
+ throw new RuntimeException("Test clock failure");
+ });
+ // the exception should be propagated
+ assertThatThrownBy(() -> clock.getTickNanos(true))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Test clock failure");
+ }
+ @Test
+ void testLeapDetectionIndependently() {
+ AtomicLong clockValue = new AtomicLong(0);
+ AtomicLong tickValue = new AtomicLong(0);
+ long expectedTickValue = 0;
+ long snapshotIntervalNanos = TimeUnit.MILLISECONDS.toNanos(1);
+ DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater updater =
+ new DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater(clockValue::get, tickValue::set,
+ snapshotIntervalNanos);
+ updater.update(true);
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // simulate a leap backwards in time
+ clockValue.addAndGet(-10 * snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // simulate a leap backwards in time, without waiting a full snapshot interval
+ clockValue.addAndGet(-10 * snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // simulate a small leap backwards in time which isn't detected, without waiting a full snapshot interval
+ clockValue.addAndGet(-1 * snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // clock doesn't advance for one snapshot interval
+ clockValue.addAndGet(snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // now the clock should advance again
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // simulate a leap forward
+ clockValue.addAndGet(10 * snapshotIntervalNanos);
+ // no special handling for leap forward
+ expectedTickValue += 10 * snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 392ec0d3ff46f..8343680f9bf7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
@@ -58,9 +59,10 @@
@Test(groups = "flaky")
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
+ AsyncTokenBucket.switchToConsistentTokensView();
@@ -91,6 +93,7 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
protected void cleanup() throws Exception {
+ AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 2c44ba7e23004..5c149d4e1e792 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@@ -73,7 +74,9 @@ public void setup() throws Exception {
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
- doReturn(null).when(eventLoopGroup).schedule(any(Runnable.class), anyLong(), any());
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoopGroup.next()).thenReturn(eventLoop);
+ doReturn(null).when(eventLoop).schedule(any(Runnable.class), anyLong(), any());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
new file mode 100644
index 0000000000000..31c628b2bc4ca
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
@@ -0,0 +1,116 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+public abstract class AbstractMessageDispatchThrottlingTest extends ProducerConsumerBase {
+ public static T[] merge(T[] first, T[] last) {
+ int totalLength = first.length + last.length;
+ T[] result = Arrays.copyOf(first, totalLength);
+ int offset = first.length;
+ System.arraycopy(last, 0, result, offset, first.length);
+ return result;
+ }
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ AsyncTokenBucket.switchToConsistentTokensView();
+ this.conf.setClusterName("test");
+ internalSetup();
+ producerBaseSetup();
+ }
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
+ }
+ @AfterMethod(alwaysRun = true)
+ protected void reset() throws Exception {
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+ for (String tenant : admin.tenants().getTenants()) {
+ for (String namespace : admin.namespaces().getNamespaces(tenant)) {
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+ admin.tenants().deleteTenant(tenant, true);
+ }
+ for (String cluster : admin.clusters().getClusters()) {
+ admin.clusters().deleteCluster(cluster);
+ }
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+ producerBaseSetup();
+ }
+ @DataProvider(name = "subscriptions")
+ public Object[][] subscriptionsProvider() {
+ return new Object[][]{new Object[]{SubscriptionType.Shared}, {SubscriptionType.Exclusive}};
+ }
+ @DataProvider(name = "dispatchRateType")
+ public Object[][] dispatchRateProvider() {
+ return new Object[][]{{DispatchRateType.messageRate}, {DispatchRateType.byteRate}};
+ }
+ @DataProvider(name = "subscriptionAndDispatchRateType")
+ public Object[][] subDisTypeProvider() {
+ List