Skip to content

Commit

Permalink
[fix][broker] Make InflightReadsLimiter asynchronous and apply it for…
Browse files Browse the repository at this point in the history
… replay queue reads (#23901)
  • Loading branch information
lhotari authored Jan 29, 2025
1 parent ed5dbb5 commit c5173d5
Show file tree
Hide file tree
Showing 12 changed files with 984 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ public class ManagedLedgerFactoryConfig {
*/
private long managedLedgerMaxReadsInFlightSize = 0;

/**
* Maximum time to wait for acquiring permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is
* set (>0) and the limit is reached.
*/
private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000;

/**
* Maximum number of reads that can be queued for acquiring permits for max reads in flight when
* managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.
*/
private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 10000;

/**
* Whether trace managed ledger task execution time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
compressionConfigForManagedCursorInfo);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, openTelemetry);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry);
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Gauge;
import lombok.AllArgsConstructor;
import lombok.ToString;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.opentelemetry.Constants;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.jctools.queues.SpscArrayQueue;

@Slf4j
public class InflightReadsLimiter implements AutoCloseable {
Expand Down Expand Up @@ -58,16 +62,36 @@ public class InflightReadsLimiter implements AutoCloseable {

private final long maxReadsInFlightSize;
private long remainingBytes;
private final long acquireTimeoutMillis;
private final ScheduledExecutorService timeOutExecutor;
private final boolean enabled;

public InflightReadsLimiter(long maxReadsInFlightSize, OpenTelemetry openTelemetry) {
if (maxReadsInFlightSize <= 0) {
record Handle(long permits, long creationTime, boolean success) {
}

record QueuedHandle(Handle handle, Consumer<Handle> callback) {
}

private final Queue<QueuedHandle> queuedHandles;
private boolean timeoutCheckRunning = false;

public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcquireQueueSize,
long acquireTimeoutMillis, ScheduledExecutorService timeOutExecutor,
OpenTelemetry openTelemetry) {
this.maxReadsInFlightSize = maxReadsInFlightSize;
this.remainingBytes = maxReadsInFlightSize;
this.acquireTimeoutMillis = acquireTimeoutMillis;
this.timeOutExecutor = timeOutExecutor;
if (maxReadsInFlightSize > 0) {
enabled = true;
this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
} else {
enabled = false;
this.queuedHandles = null;
// set it to -1 in order to show in the metrics that the metric is not available
PULSAR_ML_READS_BUFFER_SIZE.set(-1);
PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1);
}
this.maxReadsInFlightSize = maxReadsInFlightSize;
this.remainingBytes = maxReadsInFlightSize;

var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
inflightReadsLimitCounter = meter.counterBuilder(INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
.setDescription("Maximum number of bytes that can be retained by managed ledger data read from storage "
Expand Down Expand Up @@ -102,70 +126,178 @@ public void close() {
inflightReadsUsageCounter.close();
}

@AllArgsConstructor
@ToString
static class Handle {
final long acquiredPermits;
final boolean success;
final int trials;
private static final Handle DISABLED = new Handle(0, 0, true);
private static final Optional<Handle> DISABLED_OPTIONAL = Optional.of(DISABLED);

final long creationTime;
/**
* Acquires permits from the limiter. If the limiter is disabled, it will immediately return a successful handle.
* If permits are available, it will return a handle with the acquired permits. If no permits are available,
* it will return an empty optional and the callback will be called when permits become available or when the
* acquire timeout is reached. The success field in the handle passed to the callback will be false if the acquire
* operation times out. The callback should be non-blocking and run on a desired executor handled within the
* callback itself.
*
* A successful handle will have the success field set to true, and the caller must call release with the handle
* when the permits are no longer needed.
*
* If an unsuccessful handle is returned immediately, it means that the queue limit has been reached and the
* callback will not be called. The caller should fail the read operation in this case to apply backpressure.
*
* @param permits the number of permits to acquire
* @param callback the callback to be called when the permits are acquired or timed out
* @return an optional handle that contains the permits if acquired, otherwise an empty optional
*/
public Optional<Handle> acquire(long permits, Consumer<Handle> callback) {
if (isDisabled()) {
return DISABLED_OPTIONAL;
}
return internalAcquire(permits, callback);
}

private static final Handle DISABLED = new Handle(0, true, 0, -1);
private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Handle> callback) {
Handle handle = new Handle(permits, System.currentTimeMillis(), true);
if (remainingBytes >= permits) {
remainingBytes -= permits;
if (log.isDebugEnabled()) {
log.debug("acquired permits: {}, creationTime: {}, remainingBytes:{}", permits, handle.creationTime,
remainingBytes);
}
updateMetrics();
return Optional.of(handle);
} else if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize) {
remainingBytes = 0;
if (log.isInfoEnabled()) {
log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
+ "Allowing request with permits set to maxReadsInFlightSize.",
permits, maxReadsInFlightSize, handle.creationTime, remainingBytes);
}
updateMetrics();
return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true));
} else {
if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
scheduleTimeOutCheck(acquireTimeoutMillis);
return Optional.empty();
} else {
log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}",
permits, handle.creationTime, remainingBytes);
return Optional.of(new Handle(0, handle.creationTime, false));
}
}
}

Handle acquire(long permits, Handle current) {
if (maxReadsInFlightSize <= 0) {
// feature is disabled
return DISABLED;
private synchronized void scheduleTimeOutCheck(long delayMillis) {
if (acquireTimeoutMillis <= 0) {
return;
}
synchronized (this) {
try {
if (current == null) {
if (remainingBytes == 0) {
return new Handle(0, false, 1, System.currentTimeMillis());
}
if (remainingBytes >= permits) {
remainingBytes -= permits;
return new Handle(permits, true, 1, System.currentTimeMillis());
} else {
long possible = remainingBytes;
remainingBytes = 0;
return new Handle(possible, false, 1, System.currentTimeMillis());
}
if (!timeoutCheckRunning) {
timeoutCheckRunning = true;
timeOutExecutor.schedule(this::timeoutCheck, delayMillis, TimeUnit.MILLISECONDS);
}
}

private synchronized void timeoutCheck() {
timeoutCheckRunning = false;
long delay = 0;
while (true) {
QueuedHandle queuedHandle = queuedHandles.peek();
if (queuedHandle != null) {
long age = System.currentTimeMillis() - queuedHandle.handle.creationTime;
if (age >= acquireTimeoutMillis) {
// remove the peeked handle from the queue
queuedHandles.poll();
handleTimeout(queuedHandle);
} else {
if (current.trials >= 4 && current.acquiredPermits > 0) {
remainingBytes += current.acquiredPermits;
return new Handle(0, false, 1, current.creationTime);
}
if (remainingBytes == 0) {
return new Handle(current.acquiredPermits, false, current.trials + 1,
current.creationTime);
}
long needed = permits - current.acquiredPermits;
if (remainingBytes >= needed) {
remainingBytes -= needed;
return new Handle(permits, true, current.trials + 1, current.creationTime);
} else {
long possible = remainingBytes;
remainingBytes = 0;
return new Handle(current.acquiredPermits + possible, false,
current.trials + 1, current.creationTime);
}
delay = acquireTimeoutMillis - age;
break;
}
} finally {
updateMetrics();
} else {
break;
}
}
if (delay > 0) {
scheduleTimeOutCheck(delay);
}
}

private void handleTimeout(QueuedHandle queuedHandle) {
if (log.isDebugEnabled()) {
log.debug("timed out queued permits: {}, creationTime: {}, remainingBytes:{}",
queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes);
}
try {
queuedHandle.callback.accept(new Handle(0, queuedHandle.handle.creationTime, false));
} catch (Exception e) {
log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}",
queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, e);
}
}

void release(Handle handle) {
/**
* Releases permits back to the limiter. If the handle is disabled, this method will be a no-op.
*
* @param handle the handle containing the permits to release
*/
public void release(Handle handle) {
if (handle == DISABLED) {
return;
}
synchronized (this) {
remainingBytes += handle.acquiredPermits;
updateMetrics();
internalRelease(handle);
}

private synchronized void internalRelease(Handle handle) {
if (log.isDebugEnabled()) {
log.debug("release permits: {}, creationTime: {}, remainingBytes:{}", handle.permits,
handle.creationTime, getRemainingBytes());
}
remainingBytes += handle.permits;
while (true) {
QueuedHandle queuedHandle = queuedHandles.peek();
if (queuedHandle != null) {
boolean timedOut = acquireTimeoutMillis > 0
&& System.currentTimeMillis() - queuedHandle.handle.creationTime > acquireTimeoutMillis;
if (timedOut) {
// remove the peeked handle from the queue
queuedHandles.poll();
handleTimeout(queuedHandle);
} else if (remainingBytes >= queuedHandle.handle.permits
|| queuedHandle.handle.permits > maxReadsInFlightSize
&& remainingBytes == maxReadsInFlightSize) {
// remove the peeked handle from the queue
queuedHandles.poll();
handleQueuedHandle(queuedHandle);
} else {
break;
}
} else {
break;
}
}
updateMetrics();
}

private void handleQueuedHandle(QueuedHandle queuedHandle) {
long permits = queuedHandle.handle.permits;
Handle handleForCallback = queuedHandle.handle;
if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize) {
remainingBytes = 0;
if (log.isInfoEnabled()) {
log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
+ "Allowing request with permits set to maxReadsInFlightSize.",
permits, maxReadsInFlightSize, queuedHandle.handle.creationTime, remainingBytes);
}
handleForCallback = new Handle(maxReadsInFlightSize, queuedHandle.handle.creationTime, true);
} else {
remainingBytes -= permits;
if (log.isDebugEnabled()) {
log.debug("acquired queued permits: {}, creationTime: {}, remainingBytes:{}",
permits, queuedHandle.handle.creationTime, remainingBytes);
}
}
try {
queuedHandle.callback.accept(handleForCallback);
} catch (Exception e) {
log.error("Error in callback of acquired queued permits: {}, creationTime: {}, remainingBytes:{}",
handleForCallback.permits, handleForCallback.creationTime, remainingBytes, e);
}
}

Expand All @@ -175,8 +307,6 @@ private synchronized void updateMetrics() {
}

public boolean isDisabled() {
return maxReadsInFlightSize <= 0;
return !enabled;
}


}
}
Loading

0 comments on commit c5173d5

Please sign in to comment.