From f8e4c11b5cd94382a3493b3e129e46bfc6a0621d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Sat, 8 Feb 2025 19:01:07 +0200 Subject: [PATCH] [fix][broker] Fix rate limiter token bucket and clock consistency issues causing excessive throttling and connection timeouts (#23930) --- microbench/README.md | 26 ++ .../broker/qos/AsyncTokenBucketBenchmark.java | 17 +- ...efaultMonotonicSnapshotClockBenchmark.java | 102 +++++++ .../pulsar/broker/qos/AsyncTokenBucket.java | 127 +++++---- .../broker/qos/AsyncTokenBucketBuilder.java | 34 +++ .../qos/DefaultMonotonicSnapshotClock.java | 260 +++++++++++++++--- .../qos/DynamicRateAsyncTokenBucket.java | 7 +- .../DynamicRateAsyncTokenBucketBuilder.java | 6 +- .../broker/qos/FinalRateAsyncTokenBucket.java | 7 +- .../qos/FinalRateAsyncTokenBucketBuilder.java | 2 +- .../pulsar/broker/service/BrokerService.java | 6 + .../service/PublishRateLimiterImpl.java | 11 +- .../persistent/DispatchRateLimiter.java | 39 ++- .../persistent/SubscribeRateLimiter.java | 8 +- .../auth/MockedPulsarServiceBaseTest.java | 46 ++-- .../broker/qos/AsyncTokenBucketTest.java | 143 +++++++++- .../DefaultMonotonicSnapshotClockTest.java | 185 +++++++++++++ .../RGUsageMTAggrWaitForAllMsgsTest.java | 5 +- .../service/PublishRateLimiterTest.java | 5 +- ...AbstractMessageDispatchThrottlingTest.java | 116 ++++++++ .../api/MessageDispatchThrottlingTest.java | 166 +++-------- ...criptionMessageDispatchThrottlingTest.java | 57 ++-- .../impl/MessagePublishThrottlingTest.java | 2 +- 23 files changed, 1090 insertions(+), 287 deletions(-) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java diff --git a/microbench/README.md b/microbench/README.md index 780e3a5a1d3e8..f50c3036ff494 100644 --- a/microbench/README.md +++ b/microbench/README.md @@ -41,3 +41,29 @@ For fast recompiling of the benchmarks (without compiling Pulsar modules) and cr mvn -Pmicrobench -pl microbench clean package ``` +### Running specific benchmarks + +Display help: + +```shell +java -jar microbench/target/microbenchmarks.jar -h +``` + +Listing all benchmarks: + +```shell +java -jar microbench/target/microbenchmarks.jar -l +``` + +Running specific benchmarks: + +```shell +java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" +``` + +Checking what benchmarks match the pattern: + +```shell +java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp +``` + diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java index 4c069e72ea3ba..1b210258f13d2 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java @@ -33,6 +33,7 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; @Fork(3) @BenchmarkMode(Mode.Throughput) @@ -59,23 +60,29 @@ public void teardown() { @Benchmark @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void consumeTokensBenchmark001Threads() { - asyncTokenBucket.consumeTokens(1); + public void consumeTokensBenchmark001Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole); } @Threads(10) @Benchmark @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void consumeTokensBenchmark010Threads() { - asyncTokenBucket.consumeTokens(1); + public void consumeTokensBenchmark010Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole); } @Threads(100) @Benchmark @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void consumeTokensBenchmark100Threads() { + public void consumeTokensBenchmark100Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole); + } + + private void consumeTokenAndGetTokens(Blackhole blackhole) { asyncTokenBucket.consumeTokens(1); + // blackhole is used to ensure that the compiler doesn't do dead code elimination + blackhole.consume(asyncTokenBucket.getTokens()); } } diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java new file mode 100644 index 0000000000000..d9054b8fe4be8 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.qos; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Fork(3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class DefaultMonotonicSnapshotClockBenchmark { + private DefaultMonotonicSnapshotClock monotonicSnapshotClock = + new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), System::nanoTime); + + @TearDown(Level.Iteration) + public void teardown() { + monotonicSnapshotClock.close(); + } + + @Threads(1) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanos001Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, false); + } + + @Threads(10) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanos010Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, false); + } + + @Threads(100) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanos100Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, false); + } + + @Threads(1) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, true); + } + + @Threads(10) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, true); + } + + @Threads(100) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) { + consumeTokenAndGetTokens(blackhole, true); + } + + private void consumeTokenAndGetTokens(Blackhole blackhole, boolean requestSnapshot) { + // blackhole is used to ensure that the compiler doesn't do dead code elimination + blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java index ac9a1f03e592b..8c43fa0a816fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java @@ -42,6 +42,10 @@ * connection or client from the throttling queue to unthrottle. Before unthrottling, the application should check * for available tokens. If tokens are still not available, the application should continue with throttling and * repeat the throttling loop. + *

By default, the AsyncTokenBucket is eventually consistent. This means that the token balance is updated + * with added tokens and consumed tokens at most once during each "increment", when time advances more than the + * configured resolution. There are settings for configuring consistency, please see {@link AsyncTokenBucketBuilder} + * for details. *

This class does not produce side effects outside its own scope. It functions similarly to a stateful function, * akin to a counter function. In essence, it is a sophisticated counter. It can serve as a foundational component for * constructing higher-level asynchronous rate limiter implementations, which require side effects for throttling. @@ -119,9 +123,28 @@ public static void resetToDefaultEventualConsistentTokensView() { */ private final LongAdder pendingConsumedTokens = new LongAdder(); - protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos) { + /** + * By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from + * the total amount of tokens at most once during each "increment", when time advances more than the configured + * resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently. + * For high performance, it is recommended to keep this setting as false. + */ + private final boolean consistentConsumedTokens; + /** + * By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based + * on elapsed time at most once during each "increment", when time advances more than the configured + * resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently. + * For high performance, it is recommended to keep this setting as false. + */ + private final boolean consistentAddedTokens; + + protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos, + boolean consistentConsumedTokens, boolean consistentAddedTokens) { this.clockSource = clockSource; this.resolutionNanos = resolutionNanos; + this.lastNanos = Long.MIN_VALUE; + this.consistentConsumedTokens = consistentConsumedTokens; + this.consistentAddedTokens = consistentAddedTokens; } public static FinalRateAsyncTokenBucketBuilder builder() { @@ -139,36 +162,46 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() { /** * Consumes tokens and possibly updates the tokens balance. New tokens are calculated and added to the current * tokens balance each time the update takes place. The update takes place once in every interval of the configured - * resolutionNanos or when the forceUpdateTokens parameter is true. + * resolutionNanos or when the forceConsistentTokens parameter is true. * When the tokens balance isn't updated, the consumed tokens are added to the pendingConsumedTokens LongAdder * counter which gets flushed the next time the tokens are updated. This makes the tokens balance * eventually consistent. The reason for this design choice is to optimize performance by preventing CAS loop * contention which could cause excessive CPU consumption. * * @param consumeTokens number of tokens to consume, can be 0 to update the tokens balance - * @param forceUpdateTokens if true, the tokens are updated even if the configured resolution hasn't passed + * @param forceConsistentTokens if true, the token balance is updated consistently * @return the current number of tokens in the bucket or Long.MIN_VALUE when the number of tokens is unknown due * to eventual consistency */ - private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceUpdateTokens) { + private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceConsistentTokens) { if (consumeTokens < 0) { throw new IllegalArgumentException("consumeTokens must be >= 0"); } - long currentNanos = clockSource.getTickNanos(forceUpdateTokens); + boolean requestConsistentTickNanosSnapshot = + consistentAddedTokens || consistentConsumedTokens || forceConsistentTokens || resolutionNanos == 0; + long currentNanos = clockSource.getTickNanos(requestConsistentTickNanosSnapshot); + long newTokens = 0; // check if the tokens should be updated immediately - if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) { + if (shouldAddTokensImmediately(currentNanos, forceConsistentTokens)) { // calculate the number of new tokens since the last update - long newTokens = calculateNewTokensSinceLastUpdate(currentNanos); - // calculate the total amount of tokens to consume in this update + newTokens = calculateNewTokensSinceLastUpdate(currentNanos, forceConsistentTokens); + } + // update tokens if there are new tokens or if resolutionNanos is set to 0 which is currently used for testing + if (newTokens > 0 || resolutionNanos == 0 || consistentConsumedTokens || forceConsistentTokens) { // flush the pendingConsumedTokens by calling "sumThenReset" - long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset(); - // update the tokens and return the current token value - return TOKENS_UPDATER.updateAndGet(this, - currentTokens -> - // after adding new tokens, limit the tokens to the capacity - Math.min(currentTokens + newTokens, getCapacity()) - // subtract the consumed tokens - - totalConsumedTokens); + long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset(); + // calculate the token delta by subtracting the consumed tokens from the new tokens + long tokenDelta = newTokens - currentPendingConsumedTokens; + if (tokenDelta != 0 || consumeTokens != 0) { + // update the tokens and return the current token value + return TOKENS_UPDATER.updateAndGet(this, + // limit the tokens to the capacity of the bucket + currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity()) + // subtract the consumed tokens from the capped tokens + - consumeTokens); + } else { + return tokens; + } } else { // eventual consistent fast path, tokens are not updated immediately @@ -187,19 +220,19 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea * * The tokens will be updated once every resolutionNanos nanoseconds. * This method checks if the configured resolutionNanos has passed since the last update. - * If the forceUpdateTokens is true, the tokens will be updated immediately. + * If the forceConsistentTokens is true, the tokens will be updated immediately. * - * @param currentNanos the current monotonic clock time in nanoseconds - * @param forceUpdateTokens if true, the tokens will be updated immediately + * @param currentNanos the current monotonic clock time in nanoseconds + * @param forceConsistentTokens if true, the tokens are added even if the configured resolution hasn't fully passed * @return true if the tokens should be updated immediately, false otherwise */ - private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUpdateTokens) { + private boolean shouldAddTokensImmediately(long currentNanos, boolean forceConsistentTokens) { long currentIncrement = resolutionNanos != 0 ? currentNanos / resolutionNanos : 0; long currentLastIncrement = lastIncrement; return currentIncrement == 0 || (currentIncrement > currentLastIncrement && LAST_INCREMENT_UPDATER.compareAndSet(this, currentLastIncrement, currentIncrement)) - || forceUpdateTokens; + || consistentAddedTokens || forceConsistentTokens; } /** @@ -209,10 +242,22 @@ private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUp * @param currentNanos the current monotonic clock time in nanoseconds * @return the number of new tokens to add since the last update */ - private long calculateNewTokensSinceLastUpdate(long currentNanos) { + private long calculateNewTokensSinceLastUpdate(long currentNanos, boolean forceConsistentTokens) { + long previousLastNanos = lastNanos; + long newLastNanos; + // update lastNanos only if at least resolutionNanos/2 nanoseconds has passed since the last update + // unless consistency is needed + long minimumIncrementNanos = forceConsistentTokens || consistentAddedTokens ? 0L : resolutionNanos / 2; + if (currentNanos > previousLastNanos + minimumIncrementNanos) { + newLastNanos = currentNanos; + } else { + newLastNanos = previousLastNanos; + } long newTokens; - long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this, currentNanos); - if (previousLastNanos == 0) { + if (newLastNanos == previousLastNanos + // prevent races with a CAS update of lastNanos + || !LAST_NANOS_UPDATER.compareAndSet(this, previousLastNanos, newLastNanos) + || previousLastNanos == Long.MIN_VALUE) { newTokens = 0; } else { long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0); @@ -267,15 +312,14 @@ public boolean consumeTokensAndCheckIfContainsTokens(long consumeTokens) { } /** - * Returns the current token balance. When forceUpdateTokens is true, the tokens balance is updated before - * returning. If forceUpdateTokens is false, the tokens balance could be updated if the last updated happened + * Returns the current token balance. When forceConsistentTokens is true, the tokens balance is updated before + * returning. If forceConsistentTokens is false, the tokens balance could be updated if the last updated happened * more than resolutionNanos nanoseconds ago. * - * @param forceUpdateTokens if true, the tokens balance is updated before returning * @return the current token balance */ - protected long tokens(boolean forceUpdateTokens) { - long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, forceUpdateTokens); + private long tokens() { + long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, false); if (currentTokens != Long.MIN_VALUE) { // when currentTokens isn't Long.MIN_VALUE, the current tokens balance is known return currentTokens; @@ -295,7 +339,7 @@ public long calculateThrottlingDuration() { long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true); if (currentTokens == Long.MIN_VALUE) { throw new IllegalArgumentException( - "Unexpected result from updateAndConsumeTokens with forceUpdateTokens set to true"); + "Unexpected result from updateAndConsumeTokens with forceConsistentTokens set to true"); } if (currentTokens > 0) { return 0L; @@ -309,10 +353,11 @@ public long calculateThrottlingDuration() { /** * Returns the current number of tokens in the bucket. - * The token balance is updated if the configured resolutionNanos has passed since the last update. + * The token balance is updated if the configured resolutionNanos has passed since the last update unless + * consistentConsumedTokens is true. */ public final long getTokens() { - return tokens(false); + return tokens(); } public abstract long getRate(); @@ -320,25 +365,13 @@ public final long getTokens() { /** * Checks if the bucket contains tokens. * The token balance is updated before the comparison if the configured resolutionNanos has passed since the last - * update. It's possible that the returned result is not definite since the token balance is eventually consistent. + * update. It's possible that the returned result is not definite since the token balance is eventually consistent + * if consistentConsumedTokens is false. * * @return true if the bucket contains tokens, false otherwise */ public boolean containsTokens() { - return containsTokens(false); - } - - /** - * Checks if the bucket contains tokens. - * The token balance is updated before the comparison if the configured resolutionNanos has passed since the last - * update. The token balance is also updated when forceUpdateTokens is true. - * It's possible that the returned result is not definite since the token balance is eventually consistent. - * - * @param forceUpdateTokens if true, the token balance is updated before the comparison - * @return true if the bucket contains tokens, false otherwise - */ - public boolean containsTokens(boolean forceUpdateTokens) { - return tokens(forceUpdateTokens) > 0; + return tokens() > 0; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java index ee256d5a37d64..1c05f1a213e3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java @@ -23,6 +23,8 @@ public abstract class AsyncTokenBucketBuilder> { protected MonotonicSnapshotClock clock = AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK; protected long resolutionNanos = AsyncTokenBucket.defaultResolutionNanos; + protected boolean consistentConsumedTokens; + protected boolean consistentAddedTokens; protected AsyncTokenBucketBuilder() { } @@ -31,15 +33,47 @@ protected SELF self() { return (SELF) this; } + /** + * Set the clock source for the token bucket. It's recommended to use the {@link DefaultMonotonicSnapshotClock} + * for most use cases. + */ public SELF clock(MonotonicSnapshotClock clock) { this.clock = clock; return self(); } + /** + * By default, AsyncTokenBucket is eventually consistent. This means that the token balance is updated, when time + * advances more than the configured resolution. This setting determines the duration of the increment. + * Setting this value to 0 will make the token balance fully consistent. There's a performance trade-off + * when setting this value to 0. + */ public SELF resolutionNanos(long resolutionNanos) { this.resolutionNanos = resolutionNanos; return self(); } + /** + * By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from + * the total amount of tokens at most once during each "increment", when time advances more than the configured + * resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently. + * For high performance, it is recommended to keep this setting as false. + */ + public SELF consistentConsumedTokens(boolean consistentConsumedTokens) { + this.consistentConsumedTokens = consistentConsumedTokens; + return self(); + } + + /** + * By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based + * on elapsed time at most once during each "increment", when time advances more than the configured + * resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently. + * For high performance, it is recommended to keep this setting as false. + */ + public SELF consistentAddedTokens(boolean consistentAddedTokens) { + this.consistentAddedTokens = consistentAddedTokens; + return self(); + } + public abstract AsyncTokenBucket build(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java index df3843921ed55..23b9359c8042d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java @@ -19,71 +19,269 @@ package org.apache.pulsar.broker.qos; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongConsumer; import java.util.function.LongSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Default implementation of {@link MonotonicSnapshotClock}. + * Default implementation of {@link MonotonicSnapshotClock} optimized for use with {@link AsyncTokenBucket}. * - * Starts a daemon thread that updates the snapshot value periodically with a configured interval. The close method - * should be called to stop the thread. + *

+ * This class provides a monotonic snapshot value that consistently increases, ensuring reliable behavior + * even in environments where the underlying clock source may not be strictly monotonic across all CPUs, + * such as certain virtualized platforms. + *

+ * + *

+ * Upon instantiation, a daemon thread is launched to periodically update the snapshot value at a configured + * interval. It is essential to invoke the {@link #close()} method to gracefully terminate this thread when it is + * no longer needed. + *

+ * + *

+ * The {@link AsyncTokenBucket} utilizes this clock to obtain tick values. It does not require a consistent value on + * every retrieval. However, when a consistent snapshot is necessary, the {@link #getTickNanos(boolean)} method + * is called with the {@code requestSnapshot} parameter set to {@code true}. + *

+ * + *

+ * By employing a single thread to update the monotonic clock value, this implementation ensures that the snapshot + * value remains strictly increasing. This approach mitigates potential inconsistencies that may arise from clock + * source discrepancies across different CPUs. + *

*/ public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class); - private final long sleepMillis; - private final int sleepNanos; - private final LongSupplier clockSource; - private final Thread thread; + private final TickUpdaterThread tickUpdaterThread; private volatile long snapshotTickNanos; public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier clockSource) { if (snapshotIntervalNanos < TimeUnit.MILLISECONDS.toNanos(1)) { throw new IllegalArgumentException("snapshotIntervalNanos must be at least 1 millisecond"); } - this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos); - this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis)); - this.clockSource = clockSource; - updateSnapshotTickNanos(); - thread = new Thread(this::snapshotLoop, getClass().getSimpleName() + "-update-loop"); - thread.setDaemon(true); - thread.start(); + tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos, + Objects.requireNonNull(clockSource, "clockSource must not be null"), this::setSnapshotTickNanos); + tickUpdaterThread.start(); + } + + private void setSnapshotTickNanos(long snapshotTickNanos) { + this.snapshotTickNanos = snapshotTickNanos; } /** {@inheritDoc} */ @Override public long getTickNanos(boolean requestSnapshot) { if (requestSnapshot) { - updateSnapshotTickNanos(); + tickUpdaterThread.requestUpdateAndWait(); } return snapshotTickNanos; } - private void updateSnapshotTickNanos() { - snapshotTickNanos = clockSource.getAsLong(); + @Override + public void close() { + tickUpdaterThread.interrupt(); } - private void snapshotLoop() { - try { - while (!Thread.currentThread().isInterrupted()) { - updateSnapshotTickNanos(); + /** + * A thread that updates snapshotTickNanos value periodically with a configured interval. + * The thread is started when the DefaultMonotonicSnapshotClock is created and runs until the close method is + * called. + * A single thread is used to read the clock source value since on some hardware of virtualized platforms, + * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by a single thread will improve the + * stability of the read value since a single thread is scheduled on a single CPU. If the thread is migrated + * to another CPU, the clock source value might leap backward or forward, but logic in this class will handle it. + */ + private static class TickUpdaterThread extends Thread { + private final Object tickUpdateDelayMonitor = new Object(); + private final Object tickUpdatedMonitor = new Object(); + private final MonotonicLeapDetectingTickUpdater tickUpdater; + private volatile boolean running; + private boolean tickUpdateDelayMonitorNotified; + private AtomicLong requestCount = new AtomicLong(); + private final long sleepMillis; + private final int sleepNanos; + + TickUpdaterThread(long snapshotIntervalNanos, LongSupplier clockSource, LongConsumer setSnapshotTickNanos) { + super(DefaultMonotonicSnapshotClock.class.getSimpleName() + "-update-loop"); + // set as daemon thread so that it doesn't prevent the JVM from exiting + setDaemon(true); + // set the highest priority + setPriority(MAX_PRIORITY); + this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos); + this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis)); + tickUpdater = new MonotonicLeapDetectingTickUpdater(clockSource, setSnapshotTickNanos, + snapshotIntervalNanos); + } + + @Override + public void run() { + try { + running = true; + long updatedForRequestCount = -1; + while (!isInterrupted()) { + try { + // track if the thread has waited for the whole duration of the snapshot interval + // before updating the tick value + boolean waitedSnapshotInterval = false; + // sleep for the configured interval on a monitor that can be notified to stop the sleep + // and update the tick value immediately. This is used in requestUpdate method. + synchronized (tickUpdateDelayMonitor) { + tickUpdateDelayMonitorNotified = false; + // only wait if no explicit request has been made since the last update + if (requestCount.get() == updatedForRequestCount) { + // if no request has been made, sleep for the configured interval + tickUpdateDelayMonitor.wait(sleepMillis, sleepNanos); + waitedSnapshotInterval = !tickUpdateDelayMonitorNotified; + } + } + updatedForRequestCount = requestCount.get(); + // update the tick value using the tick updater which will tolerate leaps backward + tickUpdater.update(waitedSnapshotInterval); + notifyAllTickUpdated(); + } catch (InterruptedException e) { + interrupt(); + break; + } + } + } catch (Throwable t) { + // report unexpected error since this would be a fatal error when the clock doesn't progress anymore + // this is very unlikely to happen, but it's better to log it in any case + LOG.error("Unexpected fatal error that stopped the clock.", t); + } finally { + LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread stopped. {},tid={}", this, getId()); + running = false; + notifyAllTickUpdated(); + } + } + + private void notifyAllTickUpdated() { + synchronized (tickUpdatedMonitor) { + // notify all threads that are waiting for the tick value to be updated + tickUpdatedMonitor.notifyAll(); + } + } + + public void requestUpdateAndWait() { + if (!running) { + synchronized (tickUpdater) { + // thread has stopped running, fallback to update the value directly without optimizations + tickUpdater.update(false); + } + return; + } + // increment the request count that ensures that the thread will update the tick value after this request + // was made also when there's a race condition between the request and the update + // this solution doesn't prevent all races, and it's not guaranteed that the tick value is always updated + // it will prevent the request having to wait for the delayed update cycle. This is sufficient for the + // use case. + requestCount.incrementAndGet(); + synchronized (tickUpdatedMonitor) { + // notify the thread to stop waiting and update the tick value + synchronized (tickUpdateDelayMonitor) { + tickUpdateDelayMonitorNotified = true; + tickUpdateDelayMonitor.notify(); + } + // wait until the tick value has been updated try { - Thread.sleep(sleepMillis, sleepNanos); + tickUpdatedMonitor.wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; + currentThread().interrupt(); + } + } + } + + @Override + public synchronized void start() { + // wait until the thread is started and the tick value has been updated + synchronized (tickUpdatedMonitor) { + super.start(); + try { + tickUpdatedMonitor.wait(); + } catch (InterruptedException e) { + currentThread().interrupt(); } } - } catch (Throwable t) { - // report unexpected error since this would be a fatal error when the clock doesn't progress anymore - // this is very unlikely to happen, but it's better to log it in any case - LOG.error("Unexpected fatal error that stopped the clock.", t); } } - @Override - public void close() { - thread.interrupt(); + /** + * Handles updating the tick value in a monotonic way so that the value is always increasing, + * regardless of leaps backward in the clock source value. + */ + static class MonotonicLeapDetectingTickUpdater { + private final LongSupplier clockSource; + private final long snapshotInternalNanos; + private final long maxDeltaNanosForLeapDetection; + private final LongConsumer tickUpdatedCallback; + private long referenceClockSourceValue = Long.MIN_VALUE; + private long baseSnapshotTickNanos; + private long previousSnapshotTickNanos; + + MonotonicLeapDetectingTickUpdater(LongSupplier clockSource, LongConsumer tickUpdatedCallback, + long snapshotInternalNanos) { + this.clockSource = clockSource; + this.snapshotInternalNanos = snapshotInternalNanos; + this.maxDeltaNanosForLeapDetection = 2 * snapshotInternalNanos; + this.tickUpdatedCallback = tickUpdatedCallback; + } + + /** + * Updates the snapshot tick value. The tickUpdatedCallback is called if the value has changed. + * The value is updated in a monotonic way so that the value is always increasing, regardless of leaps backward + * in the clock source value. + * Leap detection is done by comparing the new value with the previous value and the maximum delta value. + * + * @param waitedSnapshotInterval if true, the method has waited for the snapshot interval since the previous + * call. + */ + public void update(boolean waitedSnapshotInterval) { + // get the current clock source value + long clockValue = clockSource.getAsLong(); + + // Initialization on first call + if (referenceClockSourceValue == Long.MIN_VALUE) { + referenceClockSourceValue = clockValue; + baseSnapshotTickNanos = clockValue; + previousSnapshotTickNanos = clockValue; + // update the tick value using the callback + tickUpdatedCallback.accept(clockValue); + return; + } + + // calculate the duration since the reference clock source value + // so that the snapshot value is always increasing and tolerates it when the clock source is not strictly + // monotonic across all CPUs and leaps backward + long durationSinceReference = clockValue - referenceClockSourceValue; + // calculate the new snapshot tick value as a duration since the reference clock source value + // and add it to the base snapshot tick value + long newSnapshotTickNanos = baseSnapshotTickNanos + durationSinceReference; + + // reset the reference clock source value if the clock source value leaps backward + // more than the maximum delta value + if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDeltaNanosForLeapDetection) { + // when the clock source value leaps backward, reset the reference value to the new value + // for future duration calculations + referenceClockSourceValue = clockValue; + // if the updater thread has waited for the snapshot interval since the previous call, + // increment the base snapshot tick value by the snapshot interval value + long incrementWhenLeapDetected = waitedSnapshotInterval ? snapshotInternalNanos : 0; + // set the base snapshot tick value to the new value + baseSnapshotTickNanos = previousSnapshotTickNanos + incrementWhenLeapDetected; + // set the new snapshot tick value to the base value + newSnapshotTickNanos = baseSnapshotTickNanos; + } + + // update snapshotTickNanos value if the new value is greater than the previous value + if (newSnapshotTickNanos > previousSnapshotTickNanos) { + // store the previous value + previousSnapshotTickNanos = newSnapshotTickNanos; + // update the tick value using the callback + tickUpdatedCallback.accept(newSnapshotTickNanos); + } + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java index 8edc73d1f51e3..f2eae8aed8d9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java @@ -34,15 +34,16 @@ public class DynamicRateAsyncTokenBucket extends AsyncTokenBucket { protected DynamicRateAsyncTokenBucket(double capacityFactor, LongSupplier rateFunction, MonotonicSnapshotClock clockSource, LongSupplier ratePeriodNanosFunction, - long resolutionNanos, double initialTokensFactor, + long resolutionNanos, boolean consistentConsumedTokens, + boolean consistentAddedTokens, double initialTokensFactor, double targetFillFactorAfterThrottling) { - super(clockSource, resolutionNanos); + super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens); this.capacityFactor = capacityFactor; this.rateFunction = rateFunction; this.ratePeriodNanosFunction = ratePeriodNanosFunction; this.targetFillFactorAfterThrottling = targetFillFactorAfterThrottling; this.tokens = (long) (rateFunction.getAsLong() * initialTokensFactor); - tokens(false); + getTokens(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java index 22270484c72f0..8aebecddf90c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java @@ -64,9 +64,7 @@ public DynamicRateAsyncTokenBucketBuilder targetFillFactorAfterThrottling( @Override public AsyncTokenBucket build() { return new DynamicRateAsyncTokenBucket(this.capacityFactor, this.rateFunction, - this.clock, - this.ratePeriodNanosFunction, this.resolutionNanos, - this.initialFillFactor, - targetFillFactorAfterThrottling); + this.clock, this.ratePeriodNanosFunction, this.resolutionNanos, this.consistentConsumedTokens, + this.consistentAddedTokens, this.initialFillFactor, targetFillFactorAfterThrottling); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java index 627c5ee1334b2..d83290b723f07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java @@ -30,15 +30,16 @@ class FinalRateAsyncTokenBucket extends AsyncTokenBucket { private final long targetAmountOfTokensAfterThrottling; protected FinalRateAsyncTokenBucket(long capacity, long rate, MonotonicSnapshotClock clockSource, - long ratePeriodNanos, long resolutionNanos, long initialTokens) { - super(clockSource, resolutionNanos); + long ratePeriodNanos, long resolutionNanos, boolean consistentConsumedTokens, + boolean consistentAddedTokens, long initialTokens) { + super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens); this.capacity = capacity; this.rate = rate; this.ratePeriodNanos = ratePeriodNanos != -1 ? ratePeriodNanos : ONE_SECOND_NANOS; // The target amount of tokens is the amount of tokens made available in the resolution duration this.targetAmountOfTokensAfterThrottling = Math.max(this.resolutionNanos * rate / ratePeriodNanos, 1); this.tokens = initialTokens; - tokens(false); + getTokens(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java index ff4ed53c6c7fa..a292000eaa825 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java @@ -55,7 +55,7 @@ public FinalRateAsyncTokenBucketBuilder initialTokens(long initialTokens) { public AsyncTokenBucket build() { return new FinalRateAsyncTokenBucket(this.capacity != null ? this.capacity : this.rate, this.rate, this.clock, - this.ratePeriodNanos, this.resolutionNanos, + this.ratePeriodNanos, this.resolutionNanos, this.consistentConsumedTokens, this.consistentAddedTokens, this.initialTokens != null ? this.initialTokens : this.rate ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ddd436b085493..413b1b79d7a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2479,6 +2479,12 @@ public long getNumberOfNamespaceBundles() { private void handleMetadataChanges(Notification n) { + if (!pulsar.isRunning()) { + // Ignore metadata changes when broker is not running + log.info("Ignoring metadata change since broker is not running (id={}, state={}) {}", pulsar.getBrokerId(), + pulsar.getState(), n); + return; + } if (n.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(n.getPath())) { NamespaceName ns = NamespaceResources.namespaceFromPath(n.getPath()); handlePoliciesUpdates(ns); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 8255d9b6931ff..90c8de5f97a05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -20,11 +20,11 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; -import io.netty.channel.EventLoopGroup; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.qos.MonotonicSnapshotClock; import org.apache.pulsar.common.policies.data.Policies; @@ -32,6 +32,7 @@ import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; +@Slf4j public class PublishRateLimiterImpl implements PublishRateLimiter { private volatile AsyncTokenBucket tokenBucketOnMessage; private volatile AsyncTokenBucket tokenBucketOnByte; @@ -80,7 +81,7 @@ private void scheduleDecrementThrottleCount(Producer producer) { // schedule unthrottling when the throttling count is incremented to 1 // this is to avoid scheduling unthrottling multiple times for concurrent producers if (throttledProducersCount.incrementAndGet() == 1) { - EventLoopGroup executor = producer.getCnx().getBrokerService().executor(); + ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next(); scheduleUnthrottling(executor, calculateThrottlingDurationNanos()); } } @@ -134,7 +135,11 @@ private void unthrottleQueuedProducers(ScheduledExecutorService executor) { // unthrottle as many producers as possible while there are token available while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L && (producer = unthrottlingQueue.poll()) != null) { - producer.decrementThrottleCount(); + try { + producer.decrementThrottleCount(); + } catch (Exception e) { + log.error("Failed to unthrottle producer {}", producer, e); + } throttledProducersCount.decrementAndGet(); } // if there are still producers to be unthrottled, schedule unthrottling again diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index b29cbcd660db1..f43b134eb122a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -76,7 +77,9 @@ public DispatchRateLimiter(BrokerService brokerService) { * @return */ public long getAvailableDispatchRateLimitOnMsg() { - return dispatchRateLimiterOnMessage == null ? -1 : Math.max(dispatchRateLimiterOnMessage.getTokens(), 0); + AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + return localDispatchRateLimiterOnMessage == null ? -1 : + Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0); } /** @@ -85,7 +88,8 @@ public long getAvailableDispatchRateLimitOnMsg() { * @return */ public long getAvailableDispatchRateLimitOnByte() { - return dispatchRateLimiterOnByte == null ? -1 : Math.max(dispatchRateLimiterOnByte.getTokens(), 0); + AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + return localDispatchRateLimiterOnByte == null ? -1 : Math.max(localDispatchRateLimiterOnByte.getTokens(), 0); } /** @@ -95,11 +99,13 @@ public long getAvailableDispatchRateLimitOnByte() { * @param byteSize */ public void consumeDispatchQuota(long numberOfMessages, long byteSize) { - if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) { - dispatchRateLimiterOnMessage.consumeTokens(numberOfMessages); + AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) { + localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages); } - if (byteSize > 0 && dispatchRateLimiterOnByte != null) { - dispatchRateLimiterOnByte.consumeTokens(byteSize); + AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + if (byteSize > 0 && localDispatchRateLimiterOnByte != null) { + localDispatchRateLimiterOnByte.consumeTokens(byteSize); } } @@ -221,13 +227,14 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (msgRate > 0) { if (dispatchRate.isRelativeToPublishRate()) { this.dispatchRateLimiterOnMessage = - AsyncTokenBucket.builderForDynamicRate() + configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate()) .rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate)) .ratePeriodNanosFunction(() -> ratePeriodNanos) .build(); } else { this.dispatchRateLimiterOnMessage = - AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos) + configureAsyncTokenBucket(AsyncTokenBucket.builder()) + .rate(msgRate).ratePeriodNanos(ratePeriodNanos) .build(); } } else { @@ -238,13 +245,14 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (byteRate > 0) { if (dispatchRate.isRelativeToPublishRate()) { this.dispatchRateLimiterOnByte = - AsyncTokenBucket.builderForDynamicRate() + configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate()) .rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate)) .ratePeriodNanosFunction(() -> ratePeriodNanos) .build(); } else { this.dispatchRateLimiterOnByte = - AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos) + configureAsyncTokenBucket(AsyncTokenBucket.builder()) + .rate(byteRate).ratePeriodNanos(ratePeriodNanos) .build(); } } else { @@ -252,6 +260,11 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { } } + private > T configureAsyncTokenBucket(T builder) { + builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock()); + return builder; + } + private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) { return (topic != null && dispatchRate != null) ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg() @@ -270,7 +283,8 @@ private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) { * @return */ public long getDispatchRateOnMsg() { - return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1; + AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + return localDispatchRateLimiterOnMessage != null ? localDispatchRateLimiterOnMessage.getRate() : -1; } /** @@ -279,7 +293,8 @@ public long getDispatchRateOnMsg() { * @return */ public long getDispatchRateOnByte() { - return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1; + AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + return localDispatchRateLimiterOnByte != null ? localDispatchRateLimiterOnByte.getRate() : -1; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index b1de10e73b76f..0f98ab94142c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -70,7 +70,7 @@ public synchronized boolean tryAcquire(ConsumerIdentifier consumerIdentifier) { if (tokenBucket == null) { return true; } - if (!tokenBucket.containsTokens(true)) { + if (!tokenBucket.containsTokens()) { return false; } tokenBucket.consumeTokens(1); @@ -117,7 +117,11 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif // update subscribe-rateLimiter if (ratePerConsumer > 0) { AsyncTokenBucket tokenBucket = - AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build(); + AsyncTokenBucket.builder() + .consistentAddedTokens(true) + .consistentConsumedTokens(true) + .clock(brokerService.getPulsar().getMonotonicSnapshotClock()) + .rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build(); this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket); } else { // subscribe-rate should be disable and close diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 8dd2fc1c3c26d..42e2c00f73acf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -170,20 +170,26 @@ protected final void resetConfig() { protected final void internalSetup() throws Exception { init(); - lookupUrl = new URI(brokerUrl.toString()); - if (isTcpLookup) { - lookupUrl = new URI(pulsar.getBrokerServiceUrl()); - + lookupUrl = resolveLookupUrl(); + if (isTcpLookup && enableBrokerGateway) { // setup port forwarding from the advertised port to the listen port - if (enableBrokerGateway) { - InetSocketAddress gatewayAddress = new InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort()); - InetSocketAddress brokerAddress = new InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get()); - brokerGateway = new PortForwarder(gatewayAddress, brokerAddress); - } + InetSocketAddress gatewayAddress = new InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort()); + InetSocketAddress brokerAddress = new InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get()); + brokerGateway = new PortForwarder(gatewayAddress, brokerAddress); } pulsarClient = newPulsarClient(lookupUrl.toString(), 0); } + private URI resolveLookupUrl() { + if (isTcpLookup) { + return URI.create(pulsar.getBrokerServiceUrl()); + } else { + return URI.create(brokerUrl != null + ? brokerUrl.toString() + : brokerUrlTls.toString()); + } + } + protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception { this.conf = serviceConfiguration; internalSetup(); @@ -228,11 +234,10 @@ protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder) throws P protected final void internalSetupForStatsTest() throws Exception { init(); - String lookupUrl = brokerUrl.toString(); - if (isTcpLookup) { - lookupUrl = new URI(pulsar.getBrokerServiceUrl()).toString(); + if (pulsarClient != null) { + pulsarClient.shutdown(); } - pulsarClient = newPulsarClient(lookupUrl, 1); + pulsarClient = newPulsarClient(resolveLookupUrl().toString(), 1); } protected void doInitConf() throws Exception { @@ -360,6 +365,9 @@ protected void afterPulsarStart(PulsarService pulsar) throws Exception { protected void restartBroker() throws Exception { stopBroker(); startBroker(); + if (pulsarClient == null) { + pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + } } protected void stopBroker() throws Exception { @@ -384,12 +392,16 @@ protected void startBroker() throws Exception { brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null; brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null; - if (admin != null) { - admin.close(); - if (MockUtil.isMock(admin)) { - Mockito.reset(admin); + URI newLookupUrl = resolveLookupUrl(); + if (lookupUrl == null || !newLookupUrl.equals(lookupUrl)) { + lookupUrl = newLookupUrl; + if (pulsarClient != null) { + pulsarClient.shutdown(); + pulsarClient = newPulsarClient(lookupUrl.toString(), 0); } } + + closeAdmin(); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java index b446f9e902f2a..82793f2748d78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.qos; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +51,8 @@ private void incrementMillis(long millis) { @Test void shouldAddTokensWithConfiguredRate() { asyncTokenBucket = - AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build(); + AsyncTokenBucket.builder().consistentConsumedTokens(true) + .capacity(100).rate(10).initialTokens(0).clock(clockSource).build(); incrementSeconds(5); assertEquals(asyncTokenBucket.getTokens(), 50); incrementSeconds(1); @@ -64,7 +66,7 @@ void shouldAddTokensWithConfiguredRate() { // Consume all and verify none available and then wait 1 period and check replenished asyncTokenBucket.consumeTokens(100); - assertEquals(asyncTokenBucket.tokens(true), 0); + assertEquals(asyncTokenBucket.getTokens(), 0); incrementSeconds(1); assertEquals(asyncTokenBucket.getTokens(), 10); } @@ -91,13 +93,148 @@ void shouldSupportFractionsWhenUpdatingTokens() { @Test void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens() { asyncTokenBucket = - AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build(); + AsyncTokenBucket.builder().capacity(100) + .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1)) + .rate(10) + .initialTokens(0) + .clock(clockSource) + .build(); for (int i = 0; i < 150; i++) { incrementMillis(1); } assertEquals(asyncTokenBucket.getTokens(), 1); incrementMillis(150); assertEquals(asyncTokenBucket.getTokens(), 3); + incrementMillis(1); + assertEquals(asyncTokenBucket.getTokens(), 3); + incrementMillis(99); + assertEquals(asyncTokenBucket.getTokens(), 4); + } + + @Test + void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens2() { + asyncTokenBucket = + AsyncTokenBucket.builder().capacity(100) + .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1)) + .rate(1) + .initialTokens(0) + .clock(clockSource) + .build(); + for (int i = 0; i < 150; i++) { + incrementMillis(1); + assertEquals(asyncTokenBucket.getTokens(), 0); + } + incrementMillis(150); + assertEquals(asyncTokenBucket.getTokens(), 0); + incrementMillis(699); + assertEquals(asyncTokenBucket.getTokens(), 0); + incrementMillis(1); + assertEquals(asyncTokenBucket.getTokens(), 1); + incrementMillis(1000); + assertEquals(asyncTokenBucket.getTokens(), 2); + } + + @Test + void shouldHandleNegativeBalanceWithEventuallyConsistentTokenUpdates() { + asyncTokenBucket = + AsyncTokenBucket.builder() + // intentionally pick a coarse resolution + .resolutionNanos(TimeUnit.SECONDS.toNanos(51)) + .capacity(100).rate(10).initialTokens(0).clock(clockSource).build(); + // assert that the token balance is 0 initially + assertThat(asyncTokenBucket.getTokens()).isEqualTo(0); + + // consume tokens without exceeding the rate + for (int i = 0; i < 10000; i++) { + asyncTokenBucket.consumeTokens(500); + incrementSeconds(50); + } + + // let 9 seconds pass + incrementSeconds(9); + + // there should be 90 tokens available + assertThat(asyncTokenBucket.getTokens()).isEqualTo(90); } + @Test + void shouldNotExceedTokenBucketSizeWithNegativeTokens() { + asyncTokenBucket = + AsyncTokenBucket.builder() + // intentionally pick a coarse resolution + .resolutionNanos(TimeUnit.SECONDS.toNanos(51)) + .capacity(100).rate(10).initialTokens(0).clock(clockSource).build(); + // assert that the token balance is 0 initially + assertThat(asyncTokenBucket.getTokens()).isEqualTo(0); + + // consume tokens without exceeding the rate + for (int i = 0; i < 100; i++) { + asyncTokenBucket.consumeTokens(600); + incrementSeconds(50); + // let tokens accumulate back to 0 every 10 seconds + if ((i + 1) % 10 == 0) { + incrementSeconds(100); + } + } + + // let 9 seconds pass + incrementSeconds(9); + + // there should be 90 tokens available + assertThat(asyncTokenBucket.getTokens()).isEqualTo(90); + } + + @Test + void shouldAccuratelyCalculateTokensWhenTimeIsLaggingBehindInInconsistentUpdates() { + clockSource = requestSnapshot -> { + if (requestSnapshot) { + return manualClockSource.get(); + } else { + // let the clock lag behind + return manualClockSource.get() - TimeUnit.SECONDS.toNanos(52); + } + }; + incrementSeconds(1); + asyncTokenBucket = + AsyncTokenBucket.builder().resolutionNanos(TimeUnit.SECONDS.toNanos(51)) + .capacity(100).rate(10).initialTokens(100).clock(clockSource).build(); + assertThat(asyncTokenBucket.getTokens()).isEqualTo(100); + + // consume tokens without exceeding the rate + for (int i = 0; i < 10000; i++) { + asyncTokenBucket.consumeTokens(500); + incrementSeconds(i == 0 ? 40 : 50); + } + + // let 9 seconds pass + incrementSeconds(9); + + // there should be 90 tokens available + assertThat(asyncTokenBucket.getTokens()).isEqualTo(90); + } + + @Test + void shouldHandleEventualConsistency() { + AtomicLong offset = new AtomicLong(0); + long resolutionNanos = TimeUnit.MILLISECONDS.toNanos(1); + DefaultMonotonicSnapshotClock monotonicSnapshotClock = + new DefaultMonotonicSnapshotClock(resolutionNanos, + () -> offset.get() + manualClockSource.get()); + long initialTokens = 500L; + asyncTokenBucket = + AsyncTokenBucket.builder() + .consistentConsumedTokens(true) + .resolutionNanos(resolutionNanos) + .capacity(100000).rate(1000).initialTokens(initialTokens).clock(monotonicSnapshotClock).build(); + for (int i = 0; i < 100000; i++) { + // increment the clock by 1ms, since rate is 1000 tokens/s, this should make 1 token available + incrementMillis(1); + // consume 1 token + asyncTokenBucket.consumeTokens(1); + } + assertThat(asyncTokenBucket.getTokens()) + // since the rate is 1/ms and the test increments the clock by 1ms and consumes 1 token in each + // iteration, the tokens should be equal to the initial tokens + .isEqualTo(initialTokens); + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java new file mode 100644 index 0000000000000..0820b439915bb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.qos; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.assertj.core.data.Offset; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class DefaultMonotonicSnapshotClockTest { + @DataProvider + private static Object[] booleanValues() { + return new Object[]{ true, false }; + } + + @Test(dataProvider = "booleanValues") + void testClockHandlesTimeLeapsBackwards(boolean requestSnapshot) throws InterruptedException { + long snapshotIntervalMillis = 5; + AtomicLong clockValue = new AtomicLong(1); + @Cleanup + DefaultMonotonicSnapshotClock clock = + new DefaultMonotonicSnapshotClock(Duration.ofMillis(snapshotIntervalMillis).toNanos(), + clockValue::get); + + + long previousTick = -1; + boolean leapDirection = true; + for (int i = 0; i < 10000; i++) { + clockValue.addAndGet(TimeUnit.MILLISECONDS.toNanos(1)); + long tick = clock.getTickNanos(requestSnapshot); + //log.info("i = {}, tick = {}", i, tick); + if ((i + 1) % 5 == 0) { + leapDirection = !leapDirection; + //log.info("Time leap 5 minutes backwards"); + clockValue.addAndGet(-Duration.ofMinutes(5).toNanos()); + } + if (previousTick != -1) { + assertThat(tick) + .describedAs("i = %d, tick = %d, previousTick = %d", i, tick, previousTick) + .isGreaterThanOrEqualTo(previousTick) + .isCloseTo(previousTick, + // then snapshot is requested, the time difference between the two ticks is accurate + // otherwise allow time difference at most 4 times the snapshot interval since the + // clock is updated periodically by a background thread + Offset.offset(TimeUnit.MILLISECONDS.toNanos( + requestSnapshot ? 1 : 4 * snapshotIntervalMillis))); + } + previousTick = tick; + } + } + + @Test + void testRequestUpdate() throws InterruptedException { + @Cleanup + DefaultMonotonicSnapshotClock clock = + new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), System::nanoTime); + long tick1 = clock.getTickNanos(false); + long tick2 = clock.getTickNanos(true); + assertThat(tick2).isGreaterThan(tick1); + } + + @Test + void testRequestingSnapshotAfterClosed() throws InterruptedException { + DefaultMonotonicSnapshotClock clock = + new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), System::nanoTime); + clock.close(); + long tick1 = clock.getTickNanos(true); + Thread.sleep(10); + long tick2 = clock.getTickNanos(true); + assertThat(tick2).isGreaterThan(tick1); + } + + @Test + void testConstructorValidation() { + assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(0, System::nanoTime)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("snapshotIntervalNanos must be at least 1 millisecond"); + assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(-1, System::nanoTime)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("snapshotIntervalNanos must be at least 1 millisecond"); + assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("clockSource must not be null"); + } + + @Test + void testFailureHandlingInClockSource() { + @Cleanup + DefaultMonotonicSnapshotClock clock = + new DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), () -> { + throw new RuntimeException("Test clock failure"); + }); + // the exception should be propagated + assertThatThrownBy(() -> clock.getTickNanos(true)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Test clock failure"); + } + + @Test + void testLeapDetectionIndependently() { + AtomicLong clockValue = new AtomicLong(0); + AtomicLong tickValue = new AtomicLong(0); + long expectedTickValue = 0; + long snapshotIntervalNanos = TimeUnit.MILLISECONDS.toNanos(1); + DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater updater = + new DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater(clockValue::get, tickValue::set, + snapshotIntervalNanos); + + updater.update(true); + + // advance the clock + clockValue.addAndGet(snapshotIntervalNanos); + expectedTickValue += snapshotIntervalNanos; + updater.update(true); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // simulate a leap backwards in time + clockValue.addAndGet(-10 * snapshotIntervalNanos); + expectedTickValue += snapshotIntervalNanos; + updater.update(true); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // advance the clock + clockValue.addAndGet(snapshotIntervalNanos); + expectedTickValue += snapshotIntervalNanos; + updater.update(true); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // simulate a leap backwards in time, without waiting a full snapshot interval + clockValue.addAndGet(-10 * snapshotIntervalNanos); + updater.update(false); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // advance the clock + clockValue.addAndGet(snapshotIntervalNanos); + expectedTickValue += snapshotIntervalNanos; + updater.update(true); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // simulate a small leap backwards in time which isn't detected, without waiting a full snapshot interval + clockValue.addAndGet(-1 * snapshotIntervalNanos); + updater.update(false); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + // clock doesn't advance for one snapshot interval + clockValue.addAndGet(snapshotIntervalNanos); + updater.update(false); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + // now the clock should advance again + clockValue.addAndGet(snapshotIntervalNanos); + expectedTickValue += snapshotIntervalNanos; + updater.update(false); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + + // simulate a leap forward + clockValue.addAndGet(10 * snapshotIntervalNanos); + // no special handling for leap forward + expectedTickValue += 10 * snapshotIntervalNanos; + updater.update(true); + assertThat(tickValue.get()).isEqualTo(expectedTickValue); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 392ec0d3ff46f..8343680f9bf7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; @@ -58,9 +59,10 @@ @Slf4j @Test(groups = "flaky") public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { - @BeforeClass + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { + AsyncTokenBucket.switchToConsistentTokensView(); super.internalSetup(); this.prepareForOps(); @@ -91,6 +93,7 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { @Override protected void cleanup() throws Exception { super.internalCleanup(); + AsyncTokenBucket.resetToDefaultEventualConsistentTokensView(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index 2c44ba7e23004..5c149d4e1e792 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import java.util.HashMap; import java.util.concurrent.TimeUnit; @@ -73,7 +74,9 @@ public void setup() throws Exception { when(transportCnx.getBrokerService()).thenReturn(brokerService); EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); when(brokerService.executor()).thenReturn(eventLoopGroup); - doReturn(null).when(eventLoopGroup).schedule(any(Runnable.class), anyLong(), any()); + EventLoop eventLoop = mock(EventLoop.class); + when(eventLoopGroup.next()).thenReturn(eventLoop); + doReturn(null).when(eventLoop).schedule(any(Runnable.class), anyLong(), any()); incrementSeconds(1); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java new file mode 100644 index 0000000000000..31c628b2bc4ca --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.apache.pulsar.broker.service.BrokerService; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; + +public abstract class AbstractMessageDispatchThrottlingTest extends ProducerConsumerBase { + public static T[] merge(T[] first, T[] last) { + int totalLength = first.length + last.length; + T[] result = Arrays.copyOf(first, totalLength); + int offset = first.length; + System.arraycopy(last, 0, result, offset, first.length); + return result; + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + AsyncTokenBucket.switchToConsistentTokensView(); + this.conf.setClusterName("test"); + internalSetup(); + producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + AsyncTokenBucket.resetToDefaultEventualConsistentTokensView(); + } + + @AfterMethod(alwaysRun = true) + protected void reset() throws Exception { + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + + for (String tenant : admin.tenants().getTenants()) { + for (String namespace : admin.namespaces().getNamespaces(tenant)) { + admin.namespaces().deleteNamespace(namespace, true); + } + admin.tenants().deleteTenant(tenant, true); + } + + for (String cluster : admin.clusters().getClusters()) { + admin.clusters().deleteCluster(cluster); + } + + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + + producerBaseSetup(); + } + + @DataProvider(name = "subscriptions") + public Object[][] subscriptionsProvider() { + return new Object[][]{new Object[]{SubscriptionType.Shared}, {SubscriptionType.Exclusive}}; + } + + @DataProvider(name = "dispatchRateType") + public Object[][] dispatchRateProvider() { + return new Object[][]{{DispatchRateType.messageRate}, {DispatchRateType.byteRate}}; + } + + @DataProvider(name = "subscriptionAndDispatchRateType") + public Object[][] subDisTypeProvider() { + List mergeList = new LinkedList<>(); + for (Object[] sub : subscriptionsProvider()) { + for (Object[] dispatch : dispatchRateProvider()) { + mergeList.add(AbstractMessageDispatchThrottlingTest.merge(sub, dispatch)); + } + } + return mergeList.toArray(new Object[0][0]); + } + + protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { + Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater"); + statsUpdaterField.setAccessible(true); + ScheduledExecutorService statsUpdater = (ScheduledExecutorService) statsUpdaterField + .get(pulsar.getBrokerService()); + statsUpdater.shutdownNow(); + ledger.getCursors().forEach(cursor -> { + ledger.deactivateCursor(cursor); + }); + } + + enum DispatchRateType { + messageRate, byteRate; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index a544c7e13bc83..5d6f0c519abc6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.spy; @@ -27,15 +28,11 @@ import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.lang.reflect.Field; -import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; @@ -43,7 +40,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; -import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.ClusterData; @@ -52,93 +49,17 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.assertj.core.data.Offset; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") -public class MessageDispatchThrottlingTest extends ProducerConsumerBase { +@Test(groups = "broker-api") +public class MessageDispatchThrottlingTest extends AbstractMessageDispatchThrottlingTest { private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class); - @BeforeClass - @Override - protected void setup() throws Exception { - AsyncTokenBucket.switchToConsistentTokensView(); - this.conf.setClusterName("test"); - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - AsyncTokenBucket.resetToDefaultEventualConsistentTokensView(); - } - - @AfterMethod(alwaysRun = true) - protected void reset() throws Exception { - pulsar.getConfiguration().setForceDeleteTenantAllowed(true); - pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); - - for (String tenant : admin.tenants().getTenants()) { - for (String namespace : admin.namespaces().getNamespaces(tenant)) { - admin.namespaces().deleteNamespace(namespace, true); - } - admin.tenants().deleteTenant(tenant, true); - } - - for (String cluster : admin.clusters().getClusters()) { - admin.clusters().deleteCluster(cluster); - } - - pulsar.getConfiguration().setForceDeleteTenantAllowed(false); - pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); - - super.producerBaseSetup(); - } - - - @DataProvider(name = "subscriptions") - public Object[][] subscriptionsProvider() { - return new Object[][] { new Object[] { SubscriptionType.Shared }, { SubscriptionType.Exclusive } }; - } - - @DataProvider(name = "dispatchRateType") - public Object[][] dispatchRateProvider() { - return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } }; - } - - @DataProvider(name = "subscriptionAndDispatchRateType") - public Object[][] subDisTypeProvider() { - List mergeList = new LinkedList<>(); - for (Object[] sub : subscriptionsProvider()) { - for (Object[] dispatch : dispatchRateProvider()) { - mergeList.add(merge(sub, dispatch)); - } - } - return mergeList.toArray(new Object[0][0]); - } - - public static T[] merge(T[] first, T[] last) { - int totalLength = first.length + last.length; - T[] result = Arrays.copyOf(first, totalLength); - int offset = first.length; - System.arraycopy(last, 0, result, offset, first.length); - return result; - } - - enum DispatchRateType { - messageRate, byteRate; - } - /** * verifies: message-rate change gets reflected immediately into topic at runtime * @@ -150,7 +71,7 @@ public void testMessageRateDynamicallyChange() throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); @@ -220,7 +141,7 @@ public void testMessageRateDynamicallyChange() throws Exception { @SuppressWarnings("deprecation") @Test public void testSystemTopicDeliveryNonBlock() throws Exception { - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID().toString().replaceAll("-", ""); admin.topics().createNonPartitionedTopic(topicName); @@ -264,7 +185,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr DispatchRateType dispatchRateType) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; final int messageRate = 100; @@ -332,7 +253,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr public void testClusterMsgByteRateLimitingClusterConfig() throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; final int messageRate = 5; final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered @@ -407,7 +328,7 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingAll"; final int messageRate = 10; @@ -475,7 +396,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true); log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingAll"; final String subscriptionName = "my-subscriber-name"; @@ -528,8 +449,9 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT public void testRateLimitingMultipleConsumers() throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers"; + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true); final int messageRate = 5; DispatchRate dispatchRate = DispatchRate.builder() @@ -540,7 +462,8 @@ public void testRateLimitingMultipleConsumers() throws Exception { admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic - Producer producer = pulsarClient.newProducer().topic(topicName).create(); + @Cleanup + Producer producer = pulsarClient.newProducer().enableBatching(false).topic(topicName).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); Awaitility.await() @@ -566,10 +489,15 @@ public void testRateLimitingMultipleConsumers() throws Exception { throw new RuntimeException(e); } }); + @Cleanup Consumer consumer1 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer2 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer3 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer4 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer5 = consumerBuilder.subscribe(); // deactive cursors @@ -585,15 +513,10 @@ public void testRateLimitingMultipleConsumers() throws Exception { Thread.sleep(1000); // rate limiter should have limited messages with at least 10% accuracy (or 2 messages if messageRate is low) - Assert.assertEquals(totalReceived.get(), messageRate, Math.max(messageRate / 10, 2)); + assertThat(totalReceived.get()).isCloseTo(messageRate, Offset.offset(Math.max(messageRate / 10, 2))); - consumer1.close(); - consumer2.close(); - consumer3.close(); - consumer4.close(); - consumer5.close(); - producer.close(); log.info("-- Exiting {} test --", methodName); + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false); } @Test @@ -602,7 +525,7 @@ public void testRateLimitingWithBatchMsgEnabled() throws Exception { conf.setDispatchThrottlingOnBatchMessageEnabled(true); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers"; final int messageRate = 5; @@ -614,6 +537,7 @@ public void testRateLimitingWithBatchMsgEnabled() throws Exception { final int messagesPerBatch = 100; final int numProducedMessages = messageRate * messagesPerBatch; // create producer and topic + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(messagesPerBatch).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); @@ -634,10 +558,15 @@ public void testRateLimitingWithBatchMsgEnabled() throws Exception { log.debug("Received message [{}] in the listener", receivedMessage); totalReceived.incrementAndGet(); }); + @Cleanup Consumer consumer1 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer2 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer3 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer4 = consumerBuilder.subscribe(); + @Cleanup Consumer consumer5 = consumerBuilder.subscribe(); // deactive cursors @@ -657,12 +586,6 @@ public void testRateLimitingWithBatchMsgEnabled() throws Exception { // consumer should not have received all published message due to message-rate throttling Assert.assertEquals(totalReceived.get(), numProducedMessages); - consumer1.close(); - consumer2.close(); - consumer3.close(); - consumer4.close(); - consumer5.close(); - producer.close(); log.info("-- Exiting {} test --", methodName); } @@ -670,7 +593,7 @@ public void testRateLimitingWithBatchMsgEnabled() throws Exception { public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; final int messageRate = 5; @@ -688,12 +611,14 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); // create producer and topic + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); int numMessages = 500; final AtomicInteger totalReceived = new AtomicInteger(0); + @Cleanup Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") .subscriptionType(subscription).messageListener((c1, msg) -> { Assert.assertNotNull(msg, "Message cannot be null"); @@ -716,8 +641,6 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) // consumer should not have received all published message due to message-rate throttling Assert.assertNotEquals(totalReceived.get(), numMessages); - consumer.close(); - producer.close(); admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(initValue)); log.info("-- Exiting {} test --", methodName); @@ -733,7 +656,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) public void testMessageByteRateThrottlingCombined(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingAll"; final int messageRate = 5; // 5 msgs per second @@ -803,7 +726,7 @@ public void testMessageByteRateThrottlingCombined(SubscriptionType subscription) public void testGlobalNamespaceThrottling() throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; final int messageRate = 5; @@ -869,7 +792,7 @@ public void testGlobalNamespaceThrottling() throws Exception { public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/throttlingBlock"; final int messageRate = 10; @@ -948,7 +871,7 @@ public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscri public void testClusterPolicyOverrideConfiguration() throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName1 = "persistent://" + namespace + "/throttlingOverride1"; final String topicName2 = "persistent://" + namespace + "/throttlingOverride2"; final int clusterMessageRate = 100; @@ -1018,7 +941,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { public void testClosingRateLimiter(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = "persistent://" + namespace + "/closingRateLimiter" + subscription.name(); final String subName = "mySubscription" + subscription.name(); @@ -1066,7 +989,7 @@ public void testClosingRateLimiter(SubscriptionType subscription) throws Excepti @SuppressWarnings("deprecation") @Test public void testDispatchRateCompatibility2() throws Exception { - final String namespace = "my-property/dispatch-rate-compatibility"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/dispatch-rate-compatibility"); final String topicName = "persistent://" + namespace + "/t1"; final String cluster = "test"; admin.namespaces().createNamespace(namespace, Sets.newHashSet(cluster)); @@ -1112,17 +1035,6 @@ public void testDispatchRateCompatibility2() throws Exception { topic.close().get(); } - protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { - Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater"); - statsUpdaterField.setAccessible(true); - ScheduledExecutorService statsUpdater = (ScheduledExecutorService) statsUpdaterField - .get(pulsar.getBrokerService()); - statsUpdater.shutdownNow(); - ledger.getCursors().forEach(cursor -> { - ledger.deactivateCursor(cursor); - }); - } - /** * It verifies that relative throttling at least dispatch messages as publish-rate. * @@ -1133,7 +1045,7 @@ protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); - final String namespace = "my-property/relative_throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/relative_throttling_ns"); final String topicName = "persistent://" + namespace + "/relative-throttle" + subscription; final int messageRate = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index ce554ab2d9c00..db40ec644e9ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import com.google.common.collect.Sets; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +32,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.DispatchRate; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -37,8 +40,8 @@ import org.testng.Assert; import org.testng.annotations.Test; -@Test(groups = "flaky") -public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest { +@Test(groups = "broker-api") +public class SubscriptionMessageDispatchThrottlingTest extends AbstractMessageDispatchThrottlingTest { private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class); /** @@ -241,7 +244,7 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); admin.namespaces().setDispatchRate(namespace, topicDispatchRate); long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); + updateBrokerDispatchThrottlingRateInBytes(brokerRate); final int numProducedMessages = 30; final CountDownLatch latch = new CountDownLatch(numProducedMessages); @@ -272,10 +275,11 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce Assert.fail("Should only have PersistentDispatcher in this test"); } final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter; - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> { DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter(); - Assert.assertTrue(brokerDispatchRateLimiter != null - && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0); + assertThat(brokerDispatchRateLimiter) + .isNotNull() + .satisfies(l -> assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate)); DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null); Assert.assertTrue(topicDispatchRateLimiter != null && topicDispatchRateLimiter.getDispatchRateOnByte() > 0); @@ -301,10 +305,7 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce consumer.close(); producer.close(); - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); - - admin.topics().delete(topicName, true); - admin.namespaces().deleteNamespace(namespace); + updateBrokerDispatchThrottlingRateInBytes(initBytes); } /** @@ -401,7 +402,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT private void testDispatchRate(SubscriptionType subscription, int brokerRate, int topicRate, int subRate, int expectRate) throws Exception { - final String namespace = "my-property/throttling_ns"; + final String namespace = BrokerTestUtil.newUniqueName("my-property/throttling_ns"); final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); final String subName = "my-subscriber-name-" + subscription; @@ -419,7 +420,7 @@ private void testDispatchRate(SubscriptionType subscription, admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); admin.namespaces().setDispatchRate(namespace, topicDispatchRate); long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); + updateBrokerDispatchThrottlingRateInBytes(brokerRate); final int numProducedMessages = 30; final CountDownLatch latch = new CountDownLatch(numProducedMessages); @@ -450,10 +451,11 @@ private void testDispatchRate(SubscriptionType subscription, Assert.fail("Should only have PersistentDispatcher in this test"); } final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter; - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> { DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter(); - Assert.assertTrue(brokerDispatchRateLimiter != null - && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0); + assertThat(brokerDispatchRateLimiter) + .isNotNull() + .satisfies(l -> assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate)); DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null); Assert.assertTrue(topicDispatchRateLimiter != null && topicDispatchRateLimiter.getDispatchRateOnByte() > 0); @@ -482,9 +484,18 @@ private void testDispatchRate(SubscriptionType subscription, consumer.close(); producer.close(); - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); - admin.topics().delete(topicName, true); - admin.namespaces().deleteNamespace(namespace); + updateBrokerDispatchThrottlingRateInBytes(initBytes); + } + + private void updateBrokerDispatchThrottlingRateInBytes(long bytes) throws PulsarAdminException { + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(bytes)); + long expectedBytes = bytes > 0L ? bytes : -1L; + await().untilAsserted(() -> { + DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter(); + assertThat(brokerDispatchRateLimiter) + .isNotNull() + .satisfies(l -> assertThat(l.getDispatchRateOnByte()).isEqualTo(expectedBytes)); + }); } /** @@ -537,7 +548,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); final int byteRate = 1000; - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate); + updateBrokerDispatchThrottlingRateInBytes(byteRate); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(pulsar.getConfiguration().getDispatchThrottlingRateInByte(), byteRate); @@ -576,12 +587,6 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri Producer producer1 = pulsarClient.newProducer().topic(topicName1).create(); Producer producer2 = pulsarClient.newProducer().topic(topicName2).create(); - Awaitility.await().untilAsserted(() -> { - DispatchRateLimiter rateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter(); - Assert.assertTrue(rateLimiter != null - && rateLimiter.getDispatchRateOnByte() > 0); - }); - long start = System.currentTimeMillis(); // Asynchronously produce messages for (int i = 0; i < numProducedMessagesEachTopic; i++) { @@ -600,7 +605,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri consumer2.close(); producer1.close(); producer2.close(); - admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); + updateBrokerDispatchThrottlingRateInBytes(initBytes); log.info("-- Exiting {} test --", methodName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java index 1c0ae5547d53b..a848d68f37f63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java @@ -41,7 +41,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -@Test +@Test(groups = "broker-api") public class MessagePublishThrottlingTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(MessagePublishThrottlingTest.class);