From 08a2f1a6fc68e138c8a51d2331d7e1827c4de602 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 10:22:19 -0700 Subject: [PATCH 1/6] Add ML cache metrics draft --- .../mledger/ManagedLedgerFactoryMXBean.java | 25 +++ .../impl/ManagedLedgerFactoryMBeanImpl.java | 25 +++ .../cache/PooledByteBufAllocatorStats.java | 68 ++++++ .../OpenTelemetryManagedLedgerCacheStats.java | 207 ++++++++++++++++++ .../metrics/ManagedLedgerCacheMetrics.java | 43 +--- ...nTelemetryManagedLedgerCacheStatsTest.java | 128 +++++++++++ .../org/apache/pulsar/common/stats/Rate.java | 19 +- 7 files changed, 472 insertions(+), 43 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java index 35c26c5dfdb89..43e8196daa9ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java @@ -47,26 +47,51 @@ public interface ManagedLedgerFactoryMXBean { */ double getCacheHitsRate(); + /** + * Cumulative number of cache hits. + */ + long getCacheHitsTotal(); + /** * Get the number of cache misses per second. */ double getCacheMissesRate(); + /** + * Cumulative number of cache misses. + */ + long getCacheMissesTotal(); + /** * Get the amount of data is retrieved from the cache in byte/s. */ double getCacheHitsThroughput(); + /** + * Cumulative amount of data retrieved from the cache in bytes. + */ + long getCacheHitsBytesTotal(); + /** * Get the amount of data is retrieved from the bookkeeper in byte/s. */ double getCacheMissesThroughput(); + /** + * Cumulative amount of data retrieved from the bookkeeper in bytes. + */ + long getCacheMissesBytesTotal(); + /** * Get the number of cache evictions during the last minute. */ long getNumberOfCacheEvictions(); + /** + * Cumulative number of cache evictions. + */ + long getNumberOfCacheEvictionsTotal(); + /** * Cumulative number of entries inserted into the cache. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java index cf3d7142d617e..a3038a0e7ff76 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java @@ -99,26 +99,51 @@ public double getCacheHitsRate() { return cacheHits.getRate(); } + @Override + public long getCacheHitsTotal() { + return cacheHits.getTotalCount(); + } + @Override public double getCacheMissesRate() { return cacheMisses.getRate(); } + @Override + public long getCacheMissesTotal() { + return cacheMisses.getTotalCount(); + } + @Override public double getCacheHitsThroughput() { return cacheHits.getValueRate(); } + @Override + public long getCacheHitsBytesTotal() { + return cacheHits.getTotalValue(); + } + @Override public double getCacheMissesThroughput() { return cacheMisses.getValueRate(); } + @Override + public long getCacheMissesBytesTotal() { + return cacheMisses.getTotalValue(); + } + @Override public long getNumberOfCacheEvictions() { return cacheEvictions.getCount(); } + @Override + public long getNumberOfCacheEvictionsTotal() { + return cacheEvictions.getTotalCount(); + } + public long getCacheInsertedEntriesCount() { return insertedEntryCount.sum(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java new file mode 100644 index 0000000000000..4f6a18cb5d934 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import io.netty.buffer.PooledByteBufAllocator; +import lombok.Value; + +@Value +public class PooledByteBufAllocatorStats { + + public long activeAllocations; + public long activeAllocationsSmall; + public long activeAllocationsNormal; + public long activeAllocationsHuge; + + public long totalAllocated; + public long totalUsed; + + public PooledByteBufAllocatorStats(PooledByteBufAllocator allocator) { + long activeAllocations = 0; + long activeAllocationsSmall = 0; + long activeAllocationsNormal = 0; + long activeAllocationsHuge = 0; + long totalAllocated = 0; + long totalUsed = 0; + + for (var arena : allocator.metric().directArenas()) { + activeAllocations += arena.numActiveAllocations(); + activeAllocationsSmall += arena.numActiveSmallAllocations(); + activeAllocationsNormal += arena.numActiveNormalAllocations(); + activeAllocationsHuge += arena.numActiveHugeAllocations(); + + for (var list : arena.chunkLists()) { + for (var chunk : list) { + int size = chunk.chunkSize(); + int used = size - chunk.freeBytes(); + + totalAllocated += size; + totalUsed += used; + } + } + } + + this.activeAllocations = activeAllocations; + this.activeAllocationsSmall = activeAllocationsSmall; + this.activeAllocationsNormal = activeAllocationsNormal; + this.activeAllocationsHuge = activeAllocationsHuge; + + this.totalAllocated = totalAllocated; + this.totalUsed = totalUsed; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java new file mode 100644 index 0000000000000..04ad1d7ba7939 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; +import org.apache.pulsar.broker.PulsarService; + +public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { + + public static final AttributeKey POOL_ARENA_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); + public enum PoolArenaType { + SMALL, + NORMAL, + HUGE; + public final Attributes attributes = Attributes.of(POOL_ARENA_TYPE, name().toLowerCase()); + } + + public static final AttributeKey POOL_CHUNK_ALLOCATION_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); + public enum PoolChunkAllocationType { + ALLOCATED, + USED; + public final Attributes attributes = Attributes.of(POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); + } + + public static final AttributeKey CACHE_ENTRY_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.entry.status"); + public enum CacheEntryStatus { + ACTIVE, + EVICTED, + INSERTED; + public final Attributes attributes = Attributes.of(CACHE_ENTRY_STATUS, name().toLowerCase()); + } + + public static final AttributeKey CACHE_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); + public enum CacheOperationStatus { + HIT, + MISS; + public final Attributes attributes = Attributes.of(CACHE_OPERATION_STATUS, name().toLowerCase()); + } + + // Replaces pulsar_ml_count + public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count"; + private final ObservableLongMeasurement managedLedgerCounter; + + // Replaces pulsar_ml_cache_evictions + public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count"; + private final ObservableLongMeasurement cacheEvictionOperationCounter; + + // Replaces 'pulsar_ml_cache_entries', + // 'pulsar_ml_cache_inserted_entries_total', + // 'pulsar_ml_cache_evicted_entries_total' + public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count"; + private final ObservableLongMeasurement cacheEntryCounter; + + // Replaces pulsar_ml_cache_used_size + public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size"; + private final ObservableLongMeasurement cacheSizeCounter; + + // Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate + public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count"; + private final ObservableLongMeasurement cacheOperationCounter; + + // Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput + public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size"; + private final ObservableLongMeasurement cacheOperationBytesCounter; + + // Replaces 'pulsar_ml_cache_pool_active_allocations', + // 'pulsar_ml_cache_pool_active_allocations_huge', + // 'pulsar_ml_cache_pool_active_allocations_normal', + // 'pulsar_ml_cache_pool_active_allocations_small' + public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER = + "pulsar.broker.managed_ledger.cache.pool.allocation.active.count"; + private final ObservableLongMeasurement cachePoolActiveAllocationCounter; + + // Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used'] + public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER = + "pulsar.broker.managed_ledger.cache.pool.allocation.size"; + private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + managedLedgerCounter = meter + .upDownCounterBuilder(MANAGED_LEDGER_COUNTER) + .setUnit("{managed_ledger}") + .setDescription("The total number of managed ledgers.") + .buildObserver(); + + cacheEvictionOperationCounter = meter + .counterBuilder(CACHE_EVICTION_OPERATION_COUNTER) + .setUnit("{eviction}") + .setDescription("The total number of cache eviction operations.") + .buildObserver(); + + cacheEntryCounter = meter + .upDownCounterBuilder(CACHE_ENTRY_COUNTER) + .setUnit("{entry}") + .setDescription("The number of entries in the entry cache.") + .buildObserver(); + + cacheSizeCounter = meter + .upDownCounterBuilder(CACHE_SIZE_COUNTER) + .setUnit("{By}") + .setDescription("The byte amount of entries stored in the entry cache.") + .buildObserver(); + + cacheOperationCounter = meter + .counterBuilder(CACHE_OPERATION_COUNTER) + .setUnit("{entry}") + .setDescription("The number of cache operations.") + .buildObserver(); + + cacheOperationBytesCounter = meter + .counterBuilder(CACHE_OPERATION_BYTES_COUNTER) + .setUnit("{By}") + .setDescription("The byte amount of data retrieved from cache operations.") + .buildObserver(); + + cachePoolActiveAllocationCounter = meter + .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER) + .setUnit("{allocation}") + .setDescription("The number of currently active allocations in the direct arena.") + .buildObserver(); + + cachePoolActiveAllocationSizeCounter = meter + .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER) + .setUnit("{By}") + .setDescription("The memory allocated in the direct arena.") + .buildObserver(); + + + batchCallback = meter.batchCallback(() -> { + if (pulsar.getManagedLedgerFactory() instanceof ManagedLedgerFactoryImpl managedLedgerFactoryImpl) { + recordMetrics(managedLedgerFactoryImpl); + } + }, + managedLedgerCounter, + cacheEvictionOperationCounter, + cacheEntryCounter, + cacheSizeCounter, + cacheOperationCounter, + cacheOperationBytesCounter, + cachePoolActiveAllocationCounter, + cachePoolActiveAllocationSizeCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetrics(ManagedLedgerFactoryImpl factory) { + var stats = factory.getCacheStats(); + + managedLedgerCounter.record(stats.getNumberOfManagedLedgers()); + cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal()); + + var entriesOut = stats.getCacheEvictedEntriesCount(); + var entriesIn = stats.getCacheInsertedEntriesCount(); + var entriesActive = entriesIn - entriesOut; + cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes); + cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes); + cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes); + cacheSizeCounter.record(stats.getCacheUsedSize()); + + cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes); + cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes); + cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes); + cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes); + + var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal, + PoolArenaType.NORMAL.attributes); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes); + cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated, + PoolChunkAllocationType.ALLOCATED.attributes); + cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java index 890a37aa2d877..9eb4beb72fbf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java @@ -18,13 +18,10 @@ */ package org.apache.pulsar.broker.stats.metrics; -import io.netty.buffer.PoolArenaMetric; -import io.netty.buffer.PoolChunkListMetric; -import io.netty.buffer.PoolChunkMetric; -import io.netty.buffer.PooledByteBufAllocator; import java.util.ArrayList; import java.util.List; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; +import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.stats.Metrics; @@ -57,37 +54,13 @@ public synchronized List generate() { m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput()); m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput()); - PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR; - long activeAllocations = 0; - long activeAllocationsSmall = 0; - long activeAllocationsNormal = 0; - long activeAllocationsHuge = 0; - long totalAllocated = 0; - long totalUsed = 0; - - for (PoolArenaMetric arena : allocator.metric().directArenas()) { - activeAllocations += arena.numActiveAllocations(); - activeAllocationsSmall += arena.numActiveSmallAllocations(); - activeAllocationsNormal += arena.numActiveNormalAllocations(); - activeAllocationsHuge += arena.numActiveHugeAllocations(); - - for (PoolChunkListMetric list : arena.chunkLists()) { - for (PoolChunkMetric chunk : list) { - int size = chunk.chunkSize(); - int used = size - chunk.freeBytes(); - - totalAllocated += size; - totalUsed += used; - } - } - } - - m.put("brk_ml_cache_pool_allocated", totalAllocated); - m.put("brk_ml_cache_pool_used", totalUsed); - m.put("brk_ml_cache_pool_active_allocations", activeAllocations); - m.put("brk_ml_cache_pool_active_allocations_small", activeAllocationsSmall); - m.put("brk_ml_cache_pool_active_allocations_normal", activeAllocationsNormal); - m.put("brk_ml_cache_pool_active_allocations_huge", activeAllocationsHuge); + var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR); + m.put("brk_ml_cache_pool_allocated", allocatorStats.totalAllocated); + m.put("brk_ml_cache_pool_used", allocatorStats.totalUsed); + m.put("brk_ml_cache_pool_active_allocations", allocatorStats.activeAllocations); + m.put("brk_ml_cache_pool_active_allocations_small", allocatorStats.activeAllocationsSmall); + m.put("brk_ml_cache_pool_active_allocations_normal", allocatorStats.activeAllocationsNormal); + m.put("brk_ml_cache_pool_active_allocations_huge", allocatorStats.activeAllocationsHuge); metrics.clear(); metrics.add(m); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java new file mode 100644 index 0000000000000..83a891ce23e9b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.Attributes; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.assertj.core.api.Assertions.assertThat; + +public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + @Test + public void testManagedLedgerCacheStats() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testManagedLedgerCacheStats"); + + @Cleanup + var producer = pulsarClient.newProducer().topic(topicName).create(); + + @Cleanup + var consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(BrokerTestUtil.newUniqueName("sub")) + .subscribe(); + + @Cleanup + var consumer2 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(BrokerTestUtil.newUniqueName("sub")) + .subscribe(); + + producer.send("test".getBytes()); + consumer1.receive(); + + Awaitility.await().untilAsserted(() -> { + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.ACTIVE.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.INSERTED.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.EVICTED.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_SIZE_COUNTER, Attributes.empty(), + value -> assertThat(value).isNotNegative()); + }); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + assertMetricLongSumValue(metrics, MANAGED_LEDGER_COUNTER, Attributes.empty(), 2); + assertMetricLongSumValue(metrics, CACHE_EVICTION_OPERATION_COUNTER, Attributes.empty(), 0); + + assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.HIT.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.HIT.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.MISS.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.MISS.attributes, + value -> assertThat(value).isNotNegative()); + + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.SMALL.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.NORMAL.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.HUGE.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER, + PoolChunkAllocationType.ALLOCATED.attributes, value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER, + PoolChunkAllocationType.USED.attributes, value -> assertThat(value).isNotNegative()); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java index 886e31ab71216..41402985ceb37 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java @@ -28,6 +28,7 @@ public class Rate { private final LongAdder valueAdder = new LongAdder(); private final LongAdder countAdder = new LongAdder(); private final LongAdder totalCountAdder = new LongAdder(); + private final LongAdder totalValueAdder = new LongAdder(); // Computed stats private long count = 0L; @@ -37,20 +38,18 @@ public class Rate { private long lastCalculatedTime = System.nanoTime(); public void recordEvent() { - countAdder.increment(); - totalCountAdder.increment(); + recordMultipleEvents(1, 0); } public void recordEvent(long value) { - valueAdder.add(value); - countAdder.increment(); - totalCountAdder.increment(); + recordMultipleEvents(1, value); } - public void recordMultipleEvents(long events, long totalValue) { + public void recordMultipleEvents(long totalCount, long totalValue) { valueAdder.add(totalValue); - countAdder.add(events); - totalCountAdder.add(events); + totalValueAdder.add(totalValue); + countAdder.add(totalCount); + totalCountAdder.add(totalCount); } public void calculateRate() { @@ -88,4 +87,8 @@ public double getValueRate() { public long getTotalCount() { return this.totalCountAdder.longValue(); } + + public long getTotalValue() { + return totalValueAdder.sum(); + } } From 252ee096f2bf27934f489aa918ef69820793102e Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 10:48:25 -0700 Subject: [PATCH 2/6] Pass OpenTelemetry instance to ManagedLedgerClientFactory --- managed-ledger/pom.xml | 12 ++++++++ .../impl/ManagedLedgerFactoryImpl.java | 14 ++++++---- .../broker/ManagedLedgerClientFactory.java | 8 ++++-- .../apache/pulsar/broker/PulsarService.java | 2 +- .../stats/PulsarBrokerOpenTelemetry.java | 5 +++- .../broker/storage/ManagedLedgerStorage.java | 9 ++++-- .../broker/testcontext/PulsarTestContext.java | 6 ++-- .../client/impl/SequenceIdWithErrorTest.java | 3 +- .../pulsar/opentelemetry/Constants.java | 28 +++++++++++++++++++ 9 files changed, 71 insertions(+), 16 deletions(-) create mode 100644 pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index d8b31220d51be..60a4edab95b77 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -71,6 +71,12 @@ ${project.version} + + ${project.groupId} + pulsar-opentelemetry + ${project.version} + + com.google.guava guava @@ -120,6 +126,12 @@ test + + io.opentelemetry + opentelemetry-sdk-testing + test + + org.slf4j slf4j-api diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index ed803a81462e1..c8afd226156a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -25,6 +25,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -150,7 +151,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi ManagedLedgerFactoryConfig config) throws Exception { this(metadataStore, new DefaultBkFactory(bkClientConfiguration), - true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE); + true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper) @@ -169,21 +170,24 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ManagedLedgerFactoryConfig config) throws Exception { this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, NullStatsLogger.INSTANCE); + config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, - ManagedLedgerFactoryConfig config, StatsLogger statsLogger) + ManagedLedgerFactoryConfig config, StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, statsLogger); + config, statsLogger, openTelemetry); } private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, - ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception { + ManagedLedgerFactoryConfig config, + StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { MetadataCompressionConfig compressionConfigForManagedLedgerInfo = config.getCompressionConfigForManagedLedgerInfo(); MetadataCompressionConfig compressionConfigForManagedCursorInfo = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 6ed95f167a15a..9bbc2857863ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -54,9 +55,11 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync(); private StatsProvider statsProvider = new NullStatsProvider(); + @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup) throws Exception { + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); @@ -109,7 +112,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata try { this.managedLedgerFactory = - new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger); + new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry); } catch (Exception e) { statsProvider.stop(); defaultBkClient.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2e9f9dc6b0105..05104444dc769 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1050,7 +1050,7 @@ protected OrderedExecutor newOrderedExecutor() { protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception { return ManagedLedgerStorage.create( config, localMetadataStore, - bkClientFactory, ioEventLoopGroup + bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry() ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java index 01ca65d2cc537..178da8b84983f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java @@ -26,11 +26,14 @@ import lombok.Getter; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryService; public class PulsarBrokerOpenTelemetry implements Closeable { public static final String SERVICE_NAME = "pulsar-broker"; + + @Getter private final OpenTelemetryService openTelemetryService; @Getter @@ -44,7 +47,7 @@ public PulsarBrokerOpenTelemetry(ServiceConfiguration config, .serviceVersion(PulsarVersion.getVersion()) .builderCustomizer(builderCustomizer) .build(); - meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker"); + meter = openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java index 0b5a102eed1e0..944d2badf75f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.storage; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -47,7 +48,8 @@ public interface ManagedLedgerStorage extends AutoCloseable { void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup) throws Exception; + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception; /** * Return the factory to create {@link ManagedLedgerFactory}. @@ -87,11 +89,12 @@ void initialize(ServiceConfiguration conf, static ManagedLedgerStorage create(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bkProvider, - EventLoopGroup eventLoopGroup) throws Exception { + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { ManagedLedgerStorage storage = Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class, Thread.currentThread().getContextClassLoader()); - storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup); + storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry); return storage; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 09cd4f7cb1a93..3d79a17a90f50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.io.IOException; @@ -843,9 +844,8 @@ private static ManagedLedgerStorage createManagedLedgerClientFactory(BookKeeper @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup) - throws Exception { - + BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) { } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index 7d330bb82addd..1395424b14123 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.util.Collections; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -60,7 +61,7 @@ public void testCheckSequenceId() throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(); clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(), - pulsar.getBookKeeperClientFactory(), eventLoopGroup); + pulsar.getBookKeeperClientFactory(), eventLoopGroup, OpenTelemetry.noop()); ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory(); ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding()); ml.close(); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java new file mode 100644 index 0000000000000..6d61cafb5a01a --- /dev/null +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.opentelemetry; + +/** + * Common OpenTelemetry constants to be used by Pulsar components. + */ +public interface Constants { + + String BROKER_INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker"; + +} From b0fb621646b87d51b1aa96a4ccb72cd5f7ae41ff Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:03:23 -0700 Subject: [PATCH 3/6] Relocate stats class to ML package --- .../OpenTelemetryManagedLedgerCacheStats.java | 15 +++++------- .../impl/ManagedLedgerFactoryImpl.java | 6 +++++ ...nTelemetryManagedLedgerCacheStatsTest.java | 24 +++++++++---------- 3 files changed, 24 insertions(+), 21 deletions(-) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => managed-ledger/src/main/java/org/apache/bookkeeper/mledger}/OpenTelemetryManagedLedgerCacheStats.java (95%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java index 04ad1d7ba7939..d936870507ead 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.mledger; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.BatchCallback; @@ -25,7 +26,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; -import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.opentelemetry.Constants; public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { @@ -104,8 +105,8 @@ public enum CacheOperationStatus { private final BatchCallback batchCallback; - public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { - var meter = pulsar.getOpenTelemetry().getMeter(); + public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { + var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); managedLedgerCounter = meter .upDownCounterBuilder(MANAGED_LEDGER_COUNTER) @@ -156,11 +157,7 @@ public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { .buildObserver(); - batchCallback = meter.batchCallback(() -> { - if (pulsar.getManagedLedgerFactory() instanceof ManagedLedgerFactoryImpl managedLedgerFactoryImpl) { - recordMetrics(managedLedgerFactoryImpl); - } - }, + batchCallback = meter.batchCallback(() -> recordMetrics(factory), managedLedgerCounter, cacheEvictionOperationCounter, cacheEntryCounter, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index c8afd226156a0..7b25783483cf2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -120,6 +121,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private volatile long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; + private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; + //indicate whether shutdown() is called. private volatile boolean closed; @@ -225,6 +228,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, closed = false; metadataStore.registerSessionListener(this::handleMetadataStoreNotification); + + openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); } static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { @@ -617,6 +622,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); }).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. + openTelemetryCacheStats.close(); scheduledExecutor.shutdownNow(); entryCacheManager.clear(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java index 83a891ce23e9b..22e7fceb9a833 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -22,10 +22,10 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.awaitility.Awaitility; @@ -34,14 +34,14 @@ import org.testng.annotations.Test; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; import static org.assertj.core.api.Assertions.assertThat; public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { From 7d46f560ef60ed55b7c6eda06435d431192a4665 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:21:06 -0700 Subject: [PATCH 4/6] Relocate attributes --- .../OpenTelemetryManagedLedgerCacheStats.java | 40 ++----------------- .../impl/ManagedLedgerFactoryImpl.java | 2 +- ...nTelemetryManagedLedgerCacheStatsTest.java | 29 +++++++------- .../OpenTelemetryAttributes.java | 33 +++++++++++++++ 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java index d936870507ead..13e7ed6ac6799 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java @@ -19,51 +19,19 @@ package org.apache.bookkeeper.mledger; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.BatchCallback; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.opentelemetry.Constants; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType; public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { - public static final AttributeKey POOL_ARENA_TYPE = - AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); - public enum PoolArenaType { - SMALL, - NORMAL, - HUGE; - public final Attributes attributes = Attributes.of(POOL_ARENA_TYPE, name().toLowerCase()); - } - - public static final AttributeKey POOL_CHUNK_ALLOCATION_TYPE = - AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); - public enum PoolChunkAllocationType { - ALLOCATED, - USED; - public final Attributes attributes = Attributes.of(POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); - } - - public static final AttributeKey CACHE_ENTRY_STATUS = - AttributeKey.stringKey("pulsar.managed_ledger.entry.status"); - public enum CacheEntryStatus { - ACTIVE, - EVICTED, - INSERTED; - public final Attributes attributes = Attributes.of(CACHE_ENTRY_STATUS, name().toLowerCase()); - } - - public static final AttributeKey CACHE_OPERATION_STATUS = - AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); - public enum CacheOperationStatus { - HIT, - MISS; - public final Attributes attributes = Attributes.of(CACHE_OPERATION_STATUS, name().toLowerCase()); - } - // Replaces pulsar_ml_count public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count"; private final ObservableLongMeasurement managedLedgerCounter; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 7b25783483cf2..973ae364d60d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -69,6 +69,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo; import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo; import org.apache.bookkeeper.mledger.MetadataCompressionConfig; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback; @@ -85,7 +86,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java index 22e7fceb9a833..c3a4a2e054ef3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -18,32 +18,31 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; -import static org.assertj.core.api.Assertions.assertThat; - public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 9783f0e754f63..5120864020043 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -127,4 +127,37 @@ enum BacklogQuotaType { TIME; public final Attributes attributes = Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase()); } + + // Managed Ledger Attributes + AttributeKey ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); + enum PoolArenaType { + SMALL, + NORMAL, + HUGE; + public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase()); + } + + AttributeKey ML_POOL_CHUNK_ALLOCATION_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); + enum PoolChunkAllocationType { + ALLOCATED, + USED; + public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); + } + + AttributeKey ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status"); + enum CacheEntryStatus { + ACTIVE, + EVICTED, + INSERTED; + public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase()); + } + + AttributeKey ML_CACHE_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); + enum CacheOperationStatus { + HIT, + MISS; + public final Attributes attributes = Attributes.of(ML_CACHE_OPERATION_STATUS, name().toLowerCase()); + } } From 5651c600894fea1f325abb5285a80914e5d966e2 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:36:05 -0700 Subject: [PATCH 5/6] Cleanup Rate --- .../main/java/org/apache/pulsar/common/stats/Rate.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java index 41402985ceb37..faac9d7fb31d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java @@ -38,11 +38,15 @@ public class Rate { private long lastCalculatedTime = System.nanoTime(); public void recordEvent() { - recordMultipleEvents(1, 0); + countAdder.increment(); + totalCountAdder.increment(); } public void recordEvent(long value) { - recordMultipleEvents(1, value); + valueAdder.add(value); + totalValueAdder.add(value); + countAdder.increment(); + totalCountAdder.increment(); } public void recordMultipleEvents(long totalCount, long totalValue) { From 17ff477f8df832085de0fd8d1fc82496e9878421 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 12 Jun 2024 11:01:45 -0700 Subject: [PATCH 6/6] Cosmetic fixes --- .../opentelemetry/OpenTelemetryAttributes.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 5120864020043..0cc7cf245f11a 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -129,6 +129,10 @@ enum BacklogQuotaType { } // Managed Ledger Attributes + + /** + * The type of the pool arena. + */ AttributeKey ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); enum PoolArenaType { SMALL, @@ -137,6 +141,9 @@ enum PoolArenaType { public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase()); } + /** + * The type of the pool chunk allocation. + */ AttributeKey ML_POOL_CHUNK_ALLOCATION_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); enum PoolChunkAllocationType { @@ -145,6 +152,9 @@ enum PoolChunkAllocationType { public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); } + /** + * The status of the cache entry. + */ AttributeKey ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status"); enum CacheEntryStatus { ACTIVE, @@ -153,6 +163,9 @@ enum CacheEntryStatus { public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase()); } + /** + * The result of the cache operation. + */ AttributeKey ML_CACHE_OPERATION_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); enum CacheOperationStatus {