Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-264: Add managed ledger cache metrics #22898

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -120,6 +126,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,51 @@ public interface ManagedLedgerFactoryMXBean {
*/
double getCacheHitsRate();

/**
* Cumulative number of cache hits.
*/
long getCacheHitsTotal();

/**
* Get the number of cache misses per second.
*/
double getCacheMissesRate();

/**
* Cumulative number of cache misses.
*/
long getCacheMissesTotal();

/**
* Get the amount of data is retrieved from the cache in byte/s.
*/
double getCacheHitsThroughput();

/**
* Cumulative amount of data retrieved from the cache in bytes.
*/
long getCacheHitsBytesTotal();

/**
* Get the amount of data is retrieved from the bookkeeper in byte/s.
*/
double getCacheMissesThroughput();

/**
* Cumulative amount of data retrieved from the bookkeeper in bytes.
*/
long getCacheMissesBytesTotal();

/**
* Get the number of cache evictions during the last minute.
*/
long getNumberOfCacheEvictions();

/**
* Cumulative number of cache evictions.
*/
long getNumberOfCacheEvictionsTotal();

/**
* Cumulative number of entries inserted into the cache.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.opentelemetry.Constants;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;

public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable {

// Replaces pulsar_ml_count
public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count";
private final ObservableLongMeasurement managedLedgerCounter;

// Replaces pulsar_ml_cache_evictions
public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count";
private final ObservableLongMeasurement cacheEvictionOperationCounter;

// Replaces 'pulsar_ml_cache_entries',
// 'pulsar_ml_cache_inserted_entries_total',
// 'pulsar_ml_cache_evicted_entries_total'
public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count";
private final ObservableLongMeasurement cacheEntryCounter;

// Replaces pulsar_ml_cache_used_size
public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size";
private final ObservableLongMeasurement cacheSizeCounter;

// Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate
public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count";
private final ObservableLongMeasurement cacheOperationCounter;

// Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput
public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size";
private final ObservableLongMeasurement cacheOperationBytesCounter;

// Replaces 'pulsar_ml_cache_pool_active_allocations',
// 'pulsar_ml_cache_pool_active_allocations_huge',
// 'pulsar_ml_cache_pool_active_allocations_normal',
// 'pulsar_ml_cache_pool_active_allocations_small'
public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER =
"pulsar.broker.managed_ledger.cache.pool.allocation.active.count";
private final ObservableLongMeasurement cachePoolActiveAllocationCounter;

// Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used']
public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER =
"pulsar.broker.managed_ledger.cache.pool.allocation.size";
private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter;

private final BatchCallback batchCallback;

public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);

managedLedgerCounter = meter
.upDownCounterBuilder(MANAGED_LEDGER_COUNTER)
.setUnit("{managed_ledger}")
.setDescription("The total number of managed ledgers.")
.buildObserver();

cacheEvictionOperationCounter = meter
.counterBuilder(CACHE_EVICTION_OPERATION_COUNTER)
.setUnit("{eviction}")
.setDescription("The total number of cache eviction operations.")
.buildObserver();

cacheEntryCounter = meter
.upDownCounterBuilder(CACHE_ENTRY_COUNTER)
.setUnit("{entry}")
.setDescription("The number of entries in the entry cache.")
.buildObserver();

cacheSizeCounter = meter
.upDownCounterBuilder(CACHE_SIZE_COUNTER)
.setUnit("{By}")
.setDescription("The byte amount of entries stored in the entry cache.")
.buildObserver();

cacheOperationCounter = meter
.counterBuilder(CACHE_OPERATION_COUNTER)
.setUnit("{entry}")
.setDescription("The number of cache operations.")
.buildObserver();

cacheOperationBytesCounter = meter
.counterBuilder(CACHE_OPERATION_BYTES_COUNTER)
.setUnit("{By}")
.setDescription("The byte amount of data retrieved from cache operations.")
.buildObserver();

cachePoolActiveAllocationCounter = meter
.upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER)
.setUnit("{allocation}")
.setDescription("The number of currently active allocations in the direct arena.")
.buildObserver();

cachePoolActiveAllocationSizeCounter = meter
.upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER)
.setUnit("{By}")
.setDescription("The memory allocated in the direct arena.")
.buildObserver();


batchCallback = meter.batchCallback(() -> recordMetrics(factory),
managedLedgerCounter,
cacheEvictionOperationCounter,
cacheEntryCounter,
cacheSizeCounter,
cacheOperationCounter,
cacheOperationBytesCounter,
cachePoolActiveAllocationCounter,
cachePoolActiveAllocationSizeCounter);
}

@Override
public void close() {
batchCallback.close();
}

private void recordMetrics(ManagedLedgerFactoryImpl factory) {
var stats = factory.getCacheStats();

managedLedgerCounter.record(stats.getNumberOfManagedLedgers());
cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal());

var entriesOut = stats.getCacheEvictedEntriesCount();
var entriesIn = stats.getCacheInsertedEntriesCount();
var entriesActive = entriesIn - entriesOut;
cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes);
cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes);
cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes);
cacheSizeCounter.record(stats.getCacheUsedSize());

cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes);
cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes);
cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes);
cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes);

var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal,
PoolArenaType.NORMAL.attributes);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes);
cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated,
PoolChunkAllocationType.ALLOCATED.attributes);
cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
Expand Down Expand Up @@ -118,6 +120,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;

//indicate whether shutdown() is called.
private volatile boolean closed;

Expand Down Expand Up @@ -149,7 +153,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
Expand All @@ -168,21 +172,24 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, NullStatsLogger.INSTANCE);
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
ManagedLedgerFactoryConfig config, StatsLogger statsLogger,
OpenTelemetry openTelemetry)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, statsLogger);
config, statsLogger, openTelemetry);
}

private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
ManagedLedgerFactoryConfig config,
StatsLogger statsLogger,
OpenTelemetry openTelemetry) throws Exception {
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
config.getCompressionConfigForManagedLedgerInfo();
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
Expand Down Expand Up @@ -220,6 +227,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
closed = false;

metadataStore.registerSessionListener(this::handleMetadataStoreNotification);

openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
}

static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
Expand Down Expand Up @@ -611,6 +620,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,51 @@ public double getCacheHitsRate() {
return cacheHits.getRate();
}

@Override
public long getCacheHitsTotal() {
return cacheHits.getTotalCount();
}

@Override
public double getCacheMissesRate() {
return cacheMisses.getRate();
}

@Override
public long getCacheMissesTotal() {
return cacheMisses.getTotalCount();
}

@Override
public double getCacheHitsThroughput() {
return cacheHits.getValueRate();
}

@Override
public long getCacheHitsBytesTotal() {
return cacheHits.getTotalValue();
}

@Override
public double getCacheMissesThroughput() {
return cacheMisses.getValueRate();
}

@Override
public long getCacheMissesBytesTotal() {
return cacheMisses.getTotalValue();
}

@Override
public long getNumberOfCacheEvictions() {
return cacheEvictions.getCount();
}

@Override
public long getNumberOfCacheEvictionsTotal() {
return cacheEvictions.getTotalCount();
}

public long getCacheInsertedEntriesCount() {
return insertedEntryCount.sum();
}
Expand Down
Loading
Loading