Skip to content

Commit 107d6e0

Browse files
committed
don't propagate into headers when not possible
1 parent 4fd1400 commit 107d6e0

File tree

13 files changed

+203
-231
lines changed

13 files changed

+203
-231
lines changed

instrumentation/nats/nats-2.21/javaagent/build.gradle.kts

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ muzzle {
77
group.set("io.nats")
88
module.set("jnats")
99
versions.set("[2.21.0,)")
10-
assertInverse.set(true)
1110
}
1211
}
1312

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionInstrumentation.java

+13-16
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
import io.nats.client.Message;
1717
import io.opentelemetry.context.Context;
1818
import io.opentelemetry.context.Scope;
19-
import io.opentelemetry.instrumentation.nats.v2_21.OpenTelemetryMessage;
19+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
2020
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2121
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2222
import net.bytebuddy.asm.Advice;
23-
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
2423
import net.bytebuddy.description.type.TypeDescription;
2524
import net.bytebuddy.matcher.ElementMatcher;
2625

@@ -38,31 +37,28 @@ public void transform(TypeTransformer transformer) {
3837
.and(named("publish"))
3938
.and(takesArguments(1))
4039
.and(takesArgument(0, named("io.nats.client.Message"))),
41-
ConnectionInstrumentation.class.getName() + "$PublishAdvice");
40+
ConnectionInstrumentation.class.getName() + "$PublishMessageAdvice");
4241
}
4342

4443
@SuppressWarnings("unused")
45-
public static class PublishAdvice {
44+
public static class PublishMessageAdvice {
4645

4746
@Advice.OnMethodEnter(suppress = Throwable.class)
48-
@Advice.AssignReturned.ToArguments(@ToArgument(0))
49-
public static Message onEnter(
47+
public static void onEnter(
5048
@Advice.This Connection connection,
5149
@Advice.Argument(0) Message message,
5250
@Advice.Local("otelContext") Context otelContext,
53-
@Advice.Local("otelScope") Scope otelScope) {
51+
@Advice.Local("otelScope") Scope otelScope,
52+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
5453
Context parentContext = Context.current();
54+
natsRequest = NatsRequest.create(connection, message);
5555

56-
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, message)) {
57-
return message;
56+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
57+
return;
5858
}
5959

60-
OpenTelemetryMessage otelMessage = new OpenTelemetryMessage(connection, message);
61-
62-
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, otelMessage);
60+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
6361
otelScope = otelContext.makeCurrent();
64-
65-
return otelMessage;
6662
}
6763

6864
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@@ -71,13 +67,14 @@ public static void onExit(
7167
@Advice.This Connection connection,
7268
@Advice.Argument(0) Message message,
7369
@Advice.Local("otelContext") Context otelContext,
74-
@Advice.Local("otelScope") Scope otelScope) {
70+
@Advice.Local("otelScope") Scope otelScope,
71+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
7572
if (otelScope == null) {
7673
return;
7774
}
7875

7976
otelScope.close();
80-
PRODUCER_INSTRUMENTER.end(otelContext, message, null, throwable);
77+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
8178
}
8279
}
8380
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsSingletons.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77

88
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
99

10-
import io.nats.client.Message;
1110
import io.opentelemetry.api.GlobalOpenTelemetry;
1211
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
12+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
1313

1414
public final class NatsSingletons {
1515

16-
public static final Instrumenter<Message, Void> PRODUCER_INSTRUMENTER =
16+
public static final Instrumenter<NatsRequest, Void> PRODUCER_INSTRUMENTER =
1717
createProducerInstrumenter(GlobalOpenTelemetry.get());
1818

1919
private NatsSingletons() {}

instrumentation/nats/nats-2.21/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationTest.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.nats.client.Message;
1717
import io.nats.client.Nats;
1818
import io.nats.client.Subscription;
19+
import io.nats.client.impl.Headers;
1920
import io.nats.client.impl.NatsMessage;
2021
import io.opentelemetry.api.common.AttributeKey;
2122
import io.opentelemetry.api.trace.SpanKind;
@@ -64,10 +65,43 @@ static void afterAll() throws InterruptedException {
6465
}
6566

6667
@Test
67-
void testPublishMessage() throws InterruptedException {
68+
void testPublishMessageNoHeaders() throws InterruptedException {
6869
// given
70+
int clientId = connection.getServerInfo().getClientId();
6971
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();
72+
73+
// when
74+
testing.runWithSpan("testPublishMessage", () -> connection.publish(message));
75+
76+
// then
77+
testing.waitAndAssertTraces(
78+
trace ->
79+
trace.hasSpansSatisfyingExactly(
80+
span -> span.hasName("testPublishMessage").hasNoParent(),
81+
span ->
82+
span.hasName("sub publish")
83+
.hasKind(SpanKind.PRODUCER)
84+
.hasParent(trace.getSpan(0))
85+
.hasAttributesSatisfyingExactly(
86+
equalTo(MESSAGING_OPERATION, "publish"),
87+
equalTo(MESSAGING_SYSTEM, "nats"),
88+
equalTo(MESSAGING_DESTINATION_NAME, "sub"),
89+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
90+
equalTo(
91+
AttributeKey.stringKey("messaging.client_id"),
92+
String.valueOf(clientId)))));
93+
94+
// and
95+
Message published = subscription.nextMessage(Duration.ofSeconds(1));
96+
assertThat(published.getHeaders()).isNull();
97+
}
98+
99+
@Test
100+
void testPublishMessageWithHeaders() throws InterruptedException {
101+
// given
70102
int clientId = connection.getServerInfo().getClientId();
103+
NatsMessage message =
104+
NatsMessage.builder().subject("sub").data("x").headers(new Headers()).build();
71105

72106
// when
73107
testing.runWithSpan("testPublishMessage", () -> connection.publish(message));

instrumentation/nats/nats-2.21/library/build.gradle.kts

+3
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@ plugins {
55
dependencies {
66
library("io.nats:jnats:2.21.0")
77

8+
compileOnly("com.google.auto.value:auto-value-annotations")
9+
annotationProcessor("com.google.auto.value:auto-value")
10+
811
testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
912
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetry.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
package io.opentelemetry.instrumentation.nats.v2_21;
77

88
import io.nats.client.Connection;
9-
import io.nats.client.Message;
109
import io.opentelemetry.api.OpenTelemetry;
1110
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
1212

1313
public final class NatsTelemetry {
1414

@@ -20,9 +20,9 @@ public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
2020
return new NatsTelemetryBuilder(openTelemetry);
2121
}
2222

23-
private final Instrumenter<Message, Void> producerInstrumenter;
23+
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
2424

25-
public NatsTelemetry(Instrumenter<Message, Void> producerInstrumenter) {
25+
public NatsTelemetry(Instrumenter<NatsRequest, Void> producerInstrumenter) {
2626
this.producerInstrumenter = producerInstrumenter;
2727
}
2828

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/OpenTelemetryConnection.java

+30-29
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
import io.nats.client.Subscription;
2929
import io.nats.client.api.ServerInfo;
3030
import io.nats.client.impl.Headers;
31-
import io.nats.client.impl.NatsMessage;
3231
import io.opentelemetry.api.trace.Span;
3332
import io.opentelemetry.context.Context;
3433
import io.opentelemetry.context.Scope;
3534
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
35+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
3636
import java.io.IOException;
3737
import java.net.InetAddress;
3838
import java.time.Duration;
@@ -43,58 +43,42 @@
4343
public class OpenTelemetryConnection implements Connection {
4444

4545
private final Connection delegate;
46-
private final Instrumenter<Message, Void> producerInstrumenter;
46+
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
4747

4848
public OpenTelemetryConnection(
49-
Connection connection, Instrumenter<Message, Void> producerInstrumenter) {
49+
Connection connection, Instrumenter<NatsRequest, Void> producerInstrumenter) {
5050
this.delegate = connection;
5151
this.producerInstrumenter = producerInstrumenter;
5252
}
5353

5454
@Override
5555
public void publish(String subject, byte[] body) {
56-
this.publish(NatsMessage.builder().subject(subject).data(body).build());
56+
wrapPublish(NatsRequest.create(this, subject, body), () -> delegate.publish(subject, body));
5757
}
5858

5959
@Override
6060
public void publish(String subject, Headers headers, byte[] body) {
61-
this.publish(NatsMessage.builder().subject(subject).headers(headers).data(body).build());
61+
wrapPublish(
62+
NatsRequest.create(this, subject, headers, body),
63+
() -> delegate.publish(subject, headers, body));
6264
}
6365

6466
@Override
6567
public void publish(String subject, String replyTo, byte[] body) {
66-
this.publish(NatsMessage.builder().subject(subject).replyTo(replyTo).data(body).build());
68+
wrapPublish(
69+
NatsRequest.create(this, subject, body), () -> delegate.publish(subject, replyTo, body));
6770
}
6871

6972
@Override
7073
public void publish(String subject, String replyTo, Headers headers, byte[] body) {
71-
this.publish(
72-
NatsMessage.builder()
73-
.subject(subject)
74-
.replyTo(replyTo)
75-
.headers(headers)
76-
.data(body)
77-
.build());
74+
wrapPublish(
75+
NatsRequest.create(this, subject, headers, body),
76+
() -> delegate.publish(subject, replyTo, headers, body));
7877
}
7978

8079
@Override
8180
public void publish(Message message) {
82-
Context parentContext = Context.current();
83-
84-
if (!Span.fromContext(parentContext).getSpanContext().isValid()
85-
|| !producerInstrumenter.shouldStart(parentContext, message)) {
86-
delegate.publish(message);
87-
return;
88-
}
89-
90-
Message otelMessage = new OpenTelemetryMessage(this, message);
91-
Context context = producerInstrumenter.start(parentContext, otelMessage);
92-
93-
try (Scope ignored = context.makeCurrent()) {
94-
delegate.publish(otelMessage);
95-
} finally {
96-
producerInstrumenter.end(context, otelMessage, null, null);
97-
}
81+
wrapPublish(NatsRequest.create(this, message), () -> delegate.publish(message));
9882
}
9983

10084
@Override
@@ -356,4 +340,21 @@ public ObjectStoreManagement objectStoreManagement(ObjectStoreOptions objectStor
356340
throws IOException {
357341
return delegate.objectStoreManagement(objectStoreOptions);
358342
}
343+
344+
private void wrapPublish(NatsRequest natsRequest, Runnable publish) {
345+
Context parentContext = Context.current();
346+
347+
if (!Span.fromContext(parentContext).getSpanContext().isValid()
348+
|| !producerInstrumenter.shouldStart(parentContext, natsRequest)) {
349+
publish.run();
350+
return;
351+
}
352+
353+
Context context = producerInstrumenter.start(parentContext, natsRequest);
354+
try (Scope ignored = context.makeCurrent()) {
355+
publish.run();
356+
} finally {
357+
producerInstrumenter.end(context, natsRequest, null, null);
358+
}
359+
}
359360
}

0 commit comments

Comments
 (0)