Skip to content

Commit

Permalink
Minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor committed Jul 11, 2024
1 parent e9599cc commit 28d23a2
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
Expand Down Expand Up @@ -81,7 +80,6 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
"Counter of currently pending snapshots")
.register();

@Getter
private final OpenTelemetryReplicatedSubscriptionStats stats;

public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
Expand Down Expand Up @@ -264,20 +262,24 @@ private void cleanupTimedOutSnapshots() {
}

pendingSnapshotsMetric.dec();
stats.recordSnapshotCompleted();
var latencyMillis = entry.getValue().getDurationMillis();
stats.recordSnapshotTimedOut(latencyMillis);
it.remove();
}
}
}

void snapshotCompleted(String snapshotId) {
ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId);
pendingSnapshotsMetric.dec();
stats.recordSnapshotCompleted();
lastCompletedSnapshotId = snapshotId;

if (snapshot != null) {
lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();

pendingSnapshotsMetric.dec();
var latencyMillis = snapshot.getDurationMillis();
ReplicatedSubscriptionsSnapshotBuilder.snapshotMetric.observe(latencyMillis);
stats.recordSnapshotCompleted(latencyMillis);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ReplicatedSubscriptionsSnapshotBuilder {

@PulsarDeprecatedMetric(newMetricName = OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME)
@Deprecated
private static final Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
public static final Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
"Time taken to create a consistent snapshot across clusters").register();

public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller,
Expand Down Expand Up @@ -127,9 +127,6 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);

var latencyMillis = clock.millis() - startTimeMillis;
snapshotMetric.observe(latencyMillis);
controller.getStats().recordSnapshotDuration(latencyMillis, TimeUnit.MILLISECONDS);
}

boolean isTimedOut() {
Expand All @@ -139,4 +136,8 @@ boolean isTimedOut() {
long getStartTimeMillis() {
return startTimeMillis;
}

long getDurationMillis() {
return clock.millis() - startTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@

public class OpenTelemetryReplicatedSubscriptionStats {

public static final AttributeKey<String> SNAPSHOT_OPERATION_STATE =
AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.state");
public enum SnapshotOperationState {
STARTED,
COMPLETED;
public final Attributes attributes = Attributes.of(SNAPSHOT_OPERATION_STATE, this.name().toLowerCase());
public static final AttributeKey<String> SNAPSHOT_OPERATION_RESULT =
AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.result");
public enum SnapshotOperationResult {
SUCCESS,
TIMED_OUT;
private final Attributes attributes = Attributes.of(SNAPSHOT_OPERATION_RESULT, name().toLowerCase());
}

public static final String SNAPSHOT_OPERATION_COUNT_METRIC_NAME =
"pulsar.broker.replication.subscription.snapshot.operation.count";
private final LongCounter snapshotOperationCounter;

public static final String SNAPSHOT_DURATION_METRIC_NAME =
"pulsar.broker.replication.subscription.snapshot.duration";
"pulsar.broker.replication.subscription.snapshot.operation.duration";
private final DoubleHistogram snapshotDuration;

public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) {
var meter = pulsar.getOpenTelemetry().getMeter();
snapshotOperationCounter = meter.counterBuilder(SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
.setDescription("Number of snapshot operations")
.setDescription("Total number of snapshot operations attempted")
.setUnit("{operation}")
.build();
snapshotDuration = meter.histogramBuilder(SNAPSHOT_DURATION_METRIC_NAME)
Expand All @@ -57,14 +57,16 @@ public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) {
}

public void recordSnapshotStarted() {
snapshotOperationCounter.add(1, SnapshotOperationState.STARTED.attributes);
snapshotOperationCounter.add(1);
}

public void recordSnapshotCompleted() {
snapshotOperationCounter.add(1, SnapshotOperationState.COMPLETED.attributes);
public void recordSnapshotTimedOut(long durationMs) {
snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS),
SnapshotOperationResult.TIMED_OUT.attributes);
}

public void recordSnapshotDuration(long duration, TimeUnit timeUnit) {
snapshotDuration.record(MetricsUtil.convertToSeconds(duration, timeUnit));
public void recordSnapshotCompleted(long durationMs) {
snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS),
SnapshotOperationResult.SUCCESS.attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand All @@ -49,7 +50,6 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SnapshotOperationState;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -177,9 +177,9 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {

var metrics1 = metricReader1.collectAllMetrics();
assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
SnapshotOperationState.STARTED.attributes, value -> assertThat(value).isPositive());
Attributes.empty(),value -> assertThat(value).isPositive());
assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
SnapshotOperationState.COMPLETED.attributes, value -> assertThat(value).isPositive());
Attributes.empty(), value -> assertThat(value).isPositive());
assertThat(metrics1)
.anySatisfy(metric -> OpenTelemetryAssertions.assertThat(metric)
.hasName(SNAPSHOT_DURATION_METRIC_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import java.time.Clock;
import java.util.ArrayList;
Expand All @@ -34,7 +33,6 @@
import java.util.List;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
Expand Down Expand Up @@ -73,8 +71,6 @@ public void setup() {
})
.when(controller)
.writeMarker(any(ByteBuf.class));
when(controller.getStats())
.thenReturn(mock(OpenTelemetryReplicatedSubscriptionStats.class));
}

@Test
Expand Down

0 comments on commit 28d23a2

Please sign in to comment.