Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafkajs instrumentation #3786

Merged
merged 68 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
55c2755
feat: add kafkajs consumer instrumentation
david-luna Nov 24, 2023
d320cf2
chore: add kafkajs trace sample
david-luna Nov 24, 2023
1e68b8e
chore(kafkajs): add trace sample for kafkajs
david-luna Nov 27, 2023
7066b2d
chore(kafkajs): first try on instrumentation
david-luna Nov 27, 2023
aee6303
chore(kafkajs): add typings
david-luna Nov 28, 2023
a09186d
chore(kafkajs): add age for message transactions
david-luna Nov 28, 2023
2fcbb1a
chore(kafkajs): remove noop early return
david-luna Nov 28, 2023
8846ed9
chore: add send instrumentation
david-luna Nov 30, 2023
289e2f0
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 4, 2023
018b65f
chore: added kafkajs test fixture script
david-luna Dec 4, 2023
7f95acf
chore: merge main
david-luna Dec 5, 2023
a05ef1f
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 7, 2023
22ff0ea
chore: fix consumers subscription
david-luna Dec 7, 2023
82c1e63
chore: single consumer working
david-luna Dec 7, 2023
ebcb8db
chore: separate fixtures for single and batch consumers
david-luna Dec 7, 2023
ebd0ff9
chore: add links on batch processing
david-luna Dec 7, 2023
ba761ed
chore: add tests skeleton
david-luna Dec 11, 2023
989a44b
chore: proper headers capturing
david-luna Dec 12, 2023
05f5622
chore: add batch tests
david-luna Dec 12, 2023
644f2e9
chore: add ignore message queues
david-luna Dec 12, 2023
ed9c6d1
chore: finish batch handling
david-luna Dec 12, 2023
270a8e4
chore: update .tav.yml
david-luna Dec 12, 2023
1d1cb06
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 12, 2023
2950918
chore: update tav.json and test script
david-luna Dec 12, 2023
d58f327
chore: update test action
david-luna Dec 12, 2023
92ba969
chore: log stdout in kafkajs tests
david-luna Dec 13, 2023
8b09802
chore: add node version in topic names
david-luna Dec 13, 2023
c00cb4a
chore: add node version in group id
david-luna Dec 13, 2023
1608746
chore: add checkScriptResult
david-luna Dec 13, 2023
b8bde4e
chore: add waitForLeaders in topic creation
david-luna Dec 13, 2023
9443a6c
chore: add more logs
david-luna Dec 13, 2023
e020d16
chore: remove custom check
david-luna Dec 13, 2023
c369dd3
chore: add err logs
david-luna Dec 13, 2023
1e1a31e
chore: add disconnect logs
david-luna Dec 13, 2023
da8e789
chore: enable delete topics
david-luna Dec 13, 2023
80cb347
chore: remove comments
david-luna Dec 13, 2023
cbd2bc4
chore: add node version to test
david-luna Dec 13, 2023
8c093ab
chore: add logs
david-luna Dec 13, 2023
6f9e036
chore: remove logs
david-luna Dec 13, 2023
ff93284
chore: remove try/catch
david-luna Dec 13, 2023
455fa1c
chore: increase test timeout
david-luna Dec 13, 2023
a5ad188
chore: add test matrix back
david-luna Dec 13, 2023
10f28c9
chore: add kafka cntainer healthcheck
david-luna Dec 13, 2023
996226a
chore: remove kafka logger
david-luna Dec 13, 2023
2c09adb
chore: fix kafa docker compose
david-luna Dec 13, 2023
760e2d8
chore: remove some comments and typos
david-luna Dec 13, 2023
9f52563
chore: update docs
david-luna Dec 13, 2023
e99d189
chore: update comments
david-luna Dec 14, 2023
641a343
chore: config TAV tests
david-luna Dec 14, 2023
f9d9f19
chore: add cpature body tests
david-luna Dec 14, 2023
27aed57
chore: change trace context propagation
david-luna Dec 18, 2023
abaff1d
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 18, 2023
3adf8c4
chore: add target to destination context
david-luna Dec 18, 2023
29c603b
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 27, 2023
86df305
chore: remove some comments
david-luna Dec 27, 2023
0a12b99
chore: add trace propagation tests
david-luna Jan 3, 2024
cb6b08e
chore: fix span links
david-luna Feb 14, 2024
9ffb260
chore: honor captureHeaders config option
david-luna Feb 14, 2024
6437bfe
chore: tune propagetion context tests
david-luna Feb 14, 2024
0c1cc95
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Feb 14, 2024
69b81b0
chore: make nits neat
david-luna Feb 15, 2024
2112877
chore: merge main
david-luna Feb 15, 2024
2b238e9
chore: check for single topic in sendBatch instrumentation
david-luna Feb 23, 2024
0772a66
Update test/instrumentation/modules/kafkajs/kafkajs.test.js
david-luna Mar 7, 2024
c2f1d18
Update lib/instrumentation/modules/kafkajs.js
david-luna Mar 7, 2024
1da35b6
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Mar 7, 2024
eaef6f8
chore: fix naming
david-luna Mar 8, 2024
7712286
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .ci/tav.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{ "name": "generic-pool", "minMajorVersion": 8 },
{ "name": "graphql", "minMajorVersion": 8 },
{ "name": "ioredis", "minMajorVersion": 8 },
{ "name": "kafkajs", "minMajorVersion": 14 },
{ "name": "knex", "minMajorVersion": 8 },
{ "name": "memcached", "minMajorVersion": 8 },
{ "name": "mongodb", "minMajorVersion": 8 },
Expand All @@ -39,7 +40,6 @@
{ "name": "ws", "minMajorVersion": 8 },
{ "name": "@koa/router,koa-router", "minMajorVersion": 8 },
{ "name": "handlebars,pug", "minMajorVersion": 8 },
{ "name": "bluebird,got", "minMajorVersion": 8 },
{ "name": "kafkajs", "minMajorVersion": 14 }
{ "name": "bluebird,got", "minMajorVersion": 8 }
]
}
65 changes: 39 additions & 26 deletions lib/instrumentation/modules/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const semver = require('semver');

const constants = require('../../constants');
const shimmer = require('../shimmer');
const { redactKeysFromObject } = require('../../filters/sanitize-field-names');

const NAME = 'Kafka';
const TYPE = 'messaging';
Expand Down Expand Up @@ -103,6 +104,10 @@ module.exports = function (mod, agent, { version, enabled }) {
return origEachMessage.apply(this, arguments);
}

// For distributed tracing this instrumentation is going to check
// the headers defined by opentelemetry and ignore the propietary
// `elasticaapmtraceparent` header
// https://github.com/elastic/apm/blob/main/specs/agents/tracing-distributed-tracing.md#binary-fields
const traceparent = message.headers && message.headers.traceparent;
const tracestate = message.headers && message.headers.tracestate;
const opts = {};
Expand Down Expand Up @@ -136,19 +141,19 @@ module.exports = function (mod, agent, { version, enabled }) {
messageCtx.body = message.value.toString();
}

if (message.headers) {
messageCtx.headers = Object.keys(message.headers).reduce(
(acc, name) => {
const value = message.headers[name];
if (value instanceof Buffer) {
acc[name] = value.toString('utf-8');
} else {
acc[name] = value;
}
return acc;
},
{},
if (message.headers && config.captureHeaders) {
// Make sure there is no sensitive data
// and transform non-redacted buffers
messageCtx.headers = redactKeysFromObject(
message.headers,
config.sanitizeFieldNamesRegExp,
);
Object.keys(messageCtx.headers).forEach((key) => {
const value = messageCtx.headers[key];
if (value instanceof Buffer) {
messageCtx.headers[key] = value.toString('utf-8');
}
});
}

if (message.timestamp) {
Expand Down Expand Up @@ -192,10 +197,10 @@ module.exports = function (mod, agent, { version, enabled }) {
return origEachBatch.apply(this, arguments);
}

const trans = ins.startTransaction(`${NAME} RECEIVE from batch`, TYPE);

// TODO: not sure if this should be here but we got batches for only one topic
// https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#context-fields
const trans = ins.startTransaction(
`${NAME} RECEIVE from ${batch.topic}`,
TYPE,
);
const messageCtx = { queue: { name: batch.topic } };
trans.setMessageContext(messageCtx);

Expand All @@ -210,6 +215,7 @@ module.exports = function (mod, agent, { version, enabled }) {
const messages = batch && batch.messages;

if (messages) {
const traceparentsSeen = new Set();
const links = [];
const limit = Math.min(
messages.length,
Expand All @@ -218,8 +224,14 @@ module.exports = function (mod, agent, { version, enabled }) {

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
if (msg.headers && msg.headers.traceparent) {
links.push({ context: msg.headers.traceparent.toString() });
const traceparent =
msg.headers &&
msg.headers.traceparent &&
msg.headers.traceparent.toString();

if (traceparent && !traceparentsSeen.has(traceparent)) {
links.push({ context: traceparent });
traceparentsSeen.add(traceparent);

if (links.length >= limit) {
break;
Expand Down Expand Up @@ -299,7 +311,6 @@ module.exports = function (mod, agent, { version, enabled }) {
parentSpan.propagateTraceContextHeaders(
newHeaders,
function (carrier, name, value) {
// TODO: why doing it everywhere???
if (name.startsWith('elastic-')) {
return;
}
Expand Down Expand Up @@ -356,13 +367,15 @@ module.exports = function (mod, agent, { version, enabled }) {
*/
function wrapProducerSendBatch(origSendBatch) {
return async function (batch) {
// TODO: discuss this. if all topics are ignored should the agent avoid
// creating a span?
let span;
const topics =
batch.topicMessages && batch.topicMessages.map((tm) => tm.topic);
const shouldIgnoreBatch =
topics && topics.every((t) => shouldIgnoreTopic(t, config));
let shouldIgnoreBatch = true;
const messages = batch.topicMessages || [];
for (const msg of messages) {
if (!shouldIgnoreTopic(msg.topic, config)) {
shouldIgnoreBatch = false;
break;
}
}

if (!shouldIgnoreBatch) {
span = ins.createSpan(
Expand All @@ -386,7 +399,6 @@ module.exports = function (mod, agent, { version, enabled }) {
parentSpan.propagateTraceContextHeaders(
newHeaders,
function (carrier, name, value) {
// TODO: why doing it everywhere???
if (name.startsWith('elastic-')) {
return;
}
Expand All @@ -405,6 +417,7 @@ module.exports = function (mod, agent, { version, enabled }) {
const destCtx = {
// `service.name` and `service.resource` cannot be set since `producer.sendBatch`
// is meant to send to several topics at a time
// TODO: check what the Java agent is doing
service: { type: SUBTYPE },
target: { type: SUBTYPE },
};
Expand Down
1 change: 1 addition & 0 deletions test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ services:

kafka:
# https://hub.docker.com/r/bitnami/kafka/tags
# Config ref: https://github.com/bitnami/containers/tree/main/bitnami/kafka#how-to-use-this-image
image: bitnami/kafka:3.3.2
ports:
- "9093:9093"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async function useKafkajsClient(kafkaClient, options) {
await producer.disconnect();
log.info('messages sent');
} else if (mode === 'consume') {
// On this mode we consumen the already sent messsages. This time they are
// On this mode we consume the already sent messsages. This time they are
// instrumented (not ignored) and trace context should be added to transactions
// this must be executed 2nd
consumer = kafkaClient.consumer({ groupId });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ const apm = require('../../../../..').start({
stackTraceLimit: 4, // get it smaller for reviewing output
logLevel: 'info',
ignoreMessageQueues: ['*-ignore'],
captureBody: process.env.TEST_CAPTURE_BODY || 'off',
captureHeaders: process.env.TEST_CAPTURE_HEADERS || false,
});

const { Buffer } = require('buffer');
Expand Down Expand Up @@ -80,7 +78,7 @@ async function useKafkajsClient(kafkaClient, options) {
messages: [
{ value: 'each message 1', headers: { foo: 'string' } },
{ value: 'each message 2', headers: { foo: Buffer.from('buffer') } },
{ value: 'each message 3' },
{ value: 'each message 3', headers: { auth: 'secret_token' } },
],
});
log.info({ data }, 'messages sent');
Expand Down
Loading
Loading