Skip to content

FEAT: Add Operation throughput and latency metrics by mbean. #841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.ReconnDelay;
import net.spy.memcached.metrics.OpLatencyMonitor;
import net.spy.memcached.metrics.OpThroughputMonitor;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.MultiOperationCallback;
import net.spy.memcached.ops.Operation;
Expand Down Expand Up @@ -990,6 +992,7 @@ private void handleReads(MemcachedNode qa)
throw new IllegalStateException("No read operation.");
}
currentOp.readFromBuffer(rbuf);
OpLatencyMonitor.getInstance().recordLatency(currentOp.getStartTime());
if (currentOp.getState() == OperationState.COMPLETE) {
getLogger().debug("Completed read op: %s and giving the next %d bytes",
currentOp, rbuf.remaining());
Expand Down Expand Up @@ -1519,6 +1522,7 @@ public String toString() {
* @param op
*/
public static void opTimedOut(Operation op) {
OpThroughputMonitor.getInstance().addTimeOutedOpCount(1);
MemcachedConnection.setTimeout(op, true);
}

Expand All @@ -1528,6 +1532,7 @@ public static void opTimedOut(Operation op) {
* @param ops
*/
public static void opsTimedOut(Collection<Operation> ops) {
OpThroughputMonitor.getInstance().addTimeOutedOpCount(ops.size());
Collection<String> timedOutNodes = new HashSet<>();
for (Operation op : ops) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package net.spy.memcached.metrics;

class LatencyMetricsSnapShot {
private static final LatencyMetricsSnapShot EMPTY = new LatencyMetricsSnapShot(0, 0, 0, 0, 0, 0);

private final long avgLatency;
private final long minLatency;
private final long maxLatency;
private final long p25Latency;
private final long p50Latency;
private final long p75Latency;
private final long timestamp; // μΊμ‹œ 생성 μ‹œκ°„

LatencyMetricsSnapShot(long avg, long min, long max, long p25, long p50, long p75) {
this.avgLatency = avg;
this.minLatency = min;
this.maxLatency = max;
this.p25Latency = p25;
this.p50Latency = p50;
this.p75Latency = p75;
this.timestamp = System.currentTimeMillis();
}

public static LatencyMetricsSnapShot empty() {
return EMPTY;
}

public long getAvgLatency() {
return avgLatency;
}

public long getMinLatency() {
return minLatency;
}

public long getMaxLatency() {
return maxLatency;
}

public long getP25Latency() {
return p25Latency;
}

public long getP50Latency() {
return p50Latency;
}

public long getP75Latency() {
return p75Latency;
}

public long getTimestamp() {
return timestamp;
}
}
149 changes: 149 additions & 0 deletions src/main/java/net/spy/memcached/metrics/OpLatencyMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package net.spy.memcached.metrics;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import net.spy.memcached.ArcusMBeanServer;

public final class OpLatencyMonitor implements OpLatencyMonitorMBean {

private static final OpLatencyMonitor INSTANCE = new OpLatencyMonitor();
private static final long CACHE_DURATION = 2000; // 2초 μΊμ‹œ
private static final int WINDOW_SIZE = 10_000;

private final AtomicReferenceArray<Long> latencies = new AtomicReferenceArray<>(WINDOW_SIZE);
private final AtomicInteger currentIndex = new AtomicInteger(0);
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicReference<LatencyMetricsSnapShot> cachedMetrics
= new AtomicReference<>(LatencyMetricsSnapShot.empty());
private final boolean enabled;

private OpLatencyMonitor() {
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
enabled = false;
return;
}
enabled = true;
for (int i = 0; i < WINDOW_SIZE; i++) {
latencies.set(i, 0L);
}

try {
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
mbs.registMBean(this, this.getClass().getPackage().getName()
+ ":type=" + this.getClass().getSimpleName());
} catch (Exception e) {
throw new RuntimeException("Failed to register MBean", e);
}
}

public static OpLatencyMonitor getInstance() {
return INSTANCE;
}

public void recordLatency(long startNanos) {
if (!enabled) {
return;
}
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos);
int index = currentIndex.getAndUpdate(i -> (i + 1) % WINDOW_SIZE);
latencies.lazySet(index, latencyMicros);

if (count.get() < WINDOW_SIZE) {
count.incrementAndGet();
}
}

// λͺ¨λ“  λ©”νŠΈλ¦­μ„ ν•œ λ²ˆμ— κ³„μ‚°ν•˜κ³  μΊμ‹œν•˜λŠ” λ©”μ„œλ“œ
private LatencyMetricsSnapShot computeMetrics() {
int currentCount = count.get();
if (currentCount == 0) {
return LatencyMetricsSnapShot.empty();
}

// ν˜„μž¬ 데이터λ₯Ό λ°°μ—΄λ‘œ 볡사
List<Long> sortedLatencies = new ArrayList<>(currentCount);
int startIndex = currentIndex.get();

for (int i = 0; i < currentCount; i++) {
int idx = (startIndex - i + WINDOW_SIZE) % WINDOW_SIZE;
long value = latencies.get(idx);
if (value > 0) {
sortedLatencies.add(value);
}
}

if (sortedLatencies.isEmpty()) {
return LatencyMetricsSnapShot.empty();
}

sortedLatencies.sort(Long::compareTo);

// λͺ¨λ“  λ©”νŠΈλ¦­μ„ ν•œ λ²ˆμ— 계산
long avg = sortedLatencies.stream().mapToLong(Long::longValue).sum() / sortedLatencies.size();
long min = sortedLatencies.get(0);
long max = sortedLatencies.get(sortedLatencies.size() - 1);
long p25 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 25.0) / 100.0) - 1);
long p50 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 50.0) / 100.0) - 1);
long p75 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 75.0) / 100.0) - 1);

return new LatencyMetricsSnapShot(avg, min, max, p25, p50, p75);
}

// μΊμ‹œλœ λ©”νŠΈλ¦­μ„ κ°€μ Έμ˜€κ±°λ‚˜ ν•„μš”μ‹œ μƒˆλ‘œ 계산
private LatencyMetricsSnapShot getMetricsSnapshot() {
LatencyMetricsSnapShot current = cachedMetrics.get();
long now = System.currentTimeMillis();

// μΊμ‹œκ°€ μœ νš¨ν•œμ§€ 확인
if (now - current.getTimestamp() < CACHE_DURATION) {
return current;
}

// μƒˆλ‘œμš΄ λ©”νŠΈλ¦­ 계산 및 μΊμ‹œ μ—…λ°μ΄νŠΈ
LatencyMetricsSnapShot newMetrics = computeMetrics();
cachedMetrics.set(newMetrics);
return newMetrics;
}

@Override
public long getAverageLatencyMicros() {
return getMetricsSnapshot().getAvgLatency();
}

@Override
public long getMinLatencyMicros() {
return getMetricsSnapshot().getMinLatency();
}

@Override
public long getMaxLatencyMicros() {
return getMetricsSnapshot().getMaxLatency();
}

@Override
public long get25thPercentileLatencyMicros() {
return getMetricsSnapshot().getP25Latency();
}

@Override
public long get50thPercentileLatencyMicros() {
return getMetricsSnapshot().getP50Latency();
}

@Override
public long get75thPercentileLatencyMicros() {
return getMetricsSnapshot().getP75Latency();
}

@Override
public void resetStatistics() {
count.set(0);
currentIndex.set(0);
cachedMetrics.set(LatencyMetricsSnapShot.empty());
}
}
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/metrics/OpLatencyMonitorMBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.spy.memcached.metrics;

public interface OpLatencyMonitorMBean {
long getAverageLatencyMicros();

long getMaxLatencyMicros();

long getMinLatencyMicros();

long get25thPercentileLatencyMicros();

long get50thPercentileLatencyMicros();

long get75thPercentileLatencyMicros();

void resetStatistics();
}
76 changes: 76 additions & 0 deletions src/main/java/net/spy/memcached/metrics/OpThroughputMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.spy.memcached.metrics;

import java.util.concurrent.atomic.LongAdder;

import net.spy.memcached.ArcusMBeanServer;

public final class OpThroughputMonitor implements OpThroughputMonitorMBean {
private static final OpThroughputMonitor INSTANCE = new OpThroughputMonitor();

private final LongAdder completeOps = new LongAdder();
private final LongAdder cancelOps = new LongAdder();
private final LongAdder timeOutOps = new LongAdder();
private final boolean enabled;

private OpThroughputMonitor() {
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
enabled = false;
return;
}
enabled = true;
try {
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
mbs.registMBean(this, this.getClass().getPackage().getName()
+ ":type=" + this.getClass().getSimpleName());
} catch (Exception e) {
throw new RuntimeException("Failed to register Throughput MBean", e);
}
}

public static OpThroughputMonitor getInstance() {
return INSTANCE;
}

public void addCompletedOpCount() {
if (!enabled) {
return;
}
completeOps.increment();
}

public void addCanceledOpCount() {
if (!enabled) {
return;
}
cancelOps.increment();
}

public void addTimeOutedOpCount(int count) {
if (!enabled) {
return;
}
timeOutOps.add(count);
}

@Override
public long getCompletedOps() {
return completeOps.sum();
}

@Override
public long getCanceledOps() {
return cancelOps.sum();
}

@Override
public long getTimeoutOps() {
return timeOutOps.sum();
}

@Override
public void resetStatistics() {
completeOps.reset();
cancelOps.reset();
timeOutOps.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package net.spy.memcached.metrics;

public interface OpThroughputMonitorMBean {
long getCompletedOps();

long getCanceledOps();

long getTimeoutOps();

void resetStatistics();
}
4 changes: 4 additions & 0 deletions src/main/java/net/spy/memcached/ops/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,8 @@ public interface Operation {
/* ENABLE_MIGRATION end */

APIType getAPIType();

void setStartTime(long startTime);

long getStartTime();
}
13 changes: 13 additions & 0 deletions src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import net.spy.memcached.MemcachedReplicaGroup;
import net.spy.memcached.RedirectHandler;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.metrics.OpThroughputMonitor;
import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.OperationCallback;
Expand Down Expand Up @@ -58,6 +59,16 @@ public abstract class BaseOperationImpl extends SpyObject {
private OperationType opType = OperationType.UNDEFINED;
private APIType apiType = APIType.UNDEFINED;

private long startTime;

public void setStartTime(long startTime) {
this.startTime = startTime;
}

public long getStartTime() {
return startTime;
}

/* ENABLE_MIGRATION if */
private RedirectHandler redirectHandler = null;
/* ENABLE_MIGRATION end */
Expand Down Expand Up @@ -95,6 +106,7 @@ public final OperationException getException() {
public final boolean cancel(String cause) {
if (callbacked.compareAndSet(false, true)) {
cancelled = true;
OpThroughputMonitor.getInstance().addCanceledOpCount();
if (handlingNode != null) {
cause += " @ " + handlingNode.getNodeName();
}
Expand Down Expand Up @@ -222,6 +234,7 @@ protected final void transitionState(OperationState newState) {
}
if (state == OperationState.COMPLETE &&
callbacked.compareAndSet(false, true)) {
OpThroughputMonitor.getInstance().addCompletedOpCount();
callback.complete();
}
}
Expand Down
Loading