Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafkajs instrumentation #3786

Merged
merged 68 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
55c2755
feat: add kafkajs consumer instrumentation
david-luna Nov 24, 2023
d320cf2
chore: add kafkajs trace sample
david-luna Nov 24, 2023
1e68b8e
chore(kafkajs): add trace sample for kafkajs
david-luna Nov 27, 2023
7066b2d
chore(kafkajs): first try on instrumentation
david-luna Nov 27, 2023
aee6303
chore(kafkajs): add typings
david-luna Nov 28, 2023
a09186d
chore(kafkajs): add age for message transactions
david-luna Nov 28, 2023
2fcbb1a
chore(kafkajs): remove noop early return
david-luna Nov 28, 2023
8846ed9
chore: add send instrumentation
david-luna Nov 30, 2023
289e2f0
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 4, 2023
018b65f
chore: added kafkajs test fixture script
david-luna Dec 4, 2023
7f95acf
chore: merge main
david-luna Dec 5, 2023
a05ef1f
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 7, 2023
22ff0ea
chore: fix consumers subscription
david-luna Dec 7, 2023
82c1e63
chore: single consumer working
david-luna Dec 7, 2023
ebcb8db
chore: separate fixtures for single and batch consumers
david-luna Dec 7, 2023
ebd0ff9
chore: add links on batch processing
david-luna Dec 7, 2023
ba761ed
chore: add tests skeleton
david-luna Dec 11, 2023
989a44b
chore: proper headers capturing
david-luna Dec 12, 2023
05f5622
chore: add batch tests
david-luna Dec 12, 2023
644f2e9
chore: add ignore message queues
david-luna Dec 12, 2023
ed9c6d1
chore: finish batch handling
david-luna Dec 12, 2023
270a8e4
chore: update .tav.yml
david-luna Dec 12, 2023
1d1cb06
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 12, 2023
2950918
chore: update tav.json and test script
david-luna Dec 12, 2023
d58f327
chore: update test action
david-luna Dec 12, 2023
92ba969
chore: log stdout in kafkajs tests
david-luna Dec 13, 2023
8b09802
chore: add node version in topic names
david-luna Dec 13, 2023
c00cb4a
chore: add node version in group id
david-luna Dec 13, 2023
1608746
chore: add checkScriptResult
david-luna Dec 13, 2023
b8bde4e
chore: add waitForLeaders in topic creation
david-luna Dec 13, 2023
9443a6c
chore: add more logs
david-luna Dec 13, 2023
e020d16
chore: remove custom check
david-luna Dec 13, 2023
c369dd3
chore: add err logs
david-luna Dec 13, 2023
1e1a31e
chore: add disconnect logs
david-luna Dec 13, 2023
da8e789
chore: enable delete topics
david-luna Dec 13, 2023
80cb347
chore: remove comments
david-luna Dec 13, 2023
cbd2bc4
chore: add node version to test
david-luna Dec 13, 2023
8c093ab
chore: add logs
david-luna Dec 13, 2023
6f9e036
chore: remove logs
david-luna Dec 13, 2023
ff93284
chore: remove try/catch
david-luna Dec 13, 2023
455fa1c
chore: increase test timeout
david-luna Dec 13, 2023
a5ad188
chore: add test matrix back
david-luna Dec 13, 2023
10f28c9
chore: add kafka cntainer healthcheck
david-luna Dec 13, 2023
996226a
chore: remove kafka logger
david-luna Dec 13, 2023
2c09adb
chore: fix kafa docker compose
david-luna Dec 13, 2023
760e2d8
chore: remove some comments and typos
david-luna Dec 13, 2023
9f52563
chore: update docs
david-luna Dec 13, 2023
e99d189
chore: update comments
david-luna Dec 14, 2023
641a343
chore: config TAV tests
david-luna Dec 14, 2023
f9d9f19
chore: add cpature body tests
david-luna Dec 14, 2023
27aed57
chore: change trace context propagation
david-luna Dec 18, 2023
abaff1d
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 18, 2023
3adf8c4
chore: add target to destination context
david-luna Dec 18, 2023
29c603b
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Dec 27, 2023
86df305
chore: remove some comments
david-luna Dec 27, 2023
0a12b99
chore: add trace propagation tests
david-luna Jan 3, 2024
cb6b08e
chore: fix span links
david-luna Feb 14, 2024
9ffb260
chore: honor captureHeaders config option
david-luna Feb 14, 2024
6437bfe
chore: tune propagetion context tests
david-luna Feb 14, 2024
0c1cc95
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Feb 14, 2024
69b81b0
chore: make nits neat
david-luna Feb 15, 2024
2112877
chore: merge main
david-luna Feb 15, 2024
2b238e9
chore: check for single topic in sendBatch instrumentation
david-luna Feb 23, 2024
0772a66
Update test/instrumentation/modules/kafkajs/kafkajs.test.js
david-luna Mar 7, 2024
c2f1d18
Update lib/instrumentation/modules/kafkajs.js
david-luna Mar 7, 2024
1da35b6
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Mar 7, 2024
eaef6f8
chore: fix naming
david-luna Mar 8, 2024
7712286
Merge branch 'main' into dluna/2905-feat-kafkajs-instrumentation
david-luna Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .ci/docker/docker-compose-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ services:
extends:
file: docker-compose.yml
service: localstack
kafka:
extends:
file: docker-compose.yml
service: kafka
node_tests:
extends:
file: docker-compose-node-test.yml
Expand All @@ -60,6 +64,8 @@ services:
condition: service_healthy
localstack:
condition: service_healthy
kafka:
condition: service_healthy

volumes:
nodepgdata:
Expand All @@ -76,3 +82,5 @@ volumes:
driver: local
nodelocalstackdata:
driver: local
nodekafkadata:
driver: local
8 changes: 8 additions & 0 deletions .ci/docker/docker-compose-edge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ services:
extends:
file: docker-compose.yml
service: redis
kafka:
extends:
file: docker-compose.yml
service: kafka
node_tests:
extends:
file: docker-compose-node-edge-test.yml
Expand All @@ -60,6 +64,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_healthy

volumes:
nodepgdata:
Expand All @@ -76,3 +82,5 @@ volumes:
driver: local
nodecassandradata:
driver: local
nodekafkadata:
driver: local
28 changes: 28 additions & 0 deletions .ci/docker/docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: '2.1'

services:
zookeeper:
extends:
file: docker-compose.yml
service: zookeeper
kafka:
extends:
file: docker-compose.yml
service: kafka
depends_on:
- zookeeper
node_tests:
extends:
file: docker-compose-node-test.yml
service: node_tests
depends_on:
- kafka
# TODO: uncomment this if health_check is necessary
# kafka:
# condition: service_healthy

volumes:
nodekafkadata:
driver: local
nodezookeeperdata:
driver: local
1 change: 1 addition & 0 deletions .ci/docker/docker-compose-node-edge-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
PGUSER: 'postgres'
MEMCACHED_HOST: 'memcached'
LOCALSTACK_HOST: 'localstack:4566'
KAFKA_HOST: 'kafka:9093'
NODE_VERSION: ${NODE_VERSION}
NODE_FULL_VERSION: ${NODE_FULL_VERSION}
NVM_NODEJS_ORG_MIRROR: ${NVM_NODEJS_ORG_MIRROR}
Expand Down
1 change: 1 addition & 0 deletions .ci/docker/docker-compose-node-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
PGUSER: 'postgres'
MEMCACHED_HOST: 'memcached'
LOCALSTACK_HOST: 'localstack:4566'
KAFKA_HOST: 'kafka:9093'
NODE_VERSION: ${NODE_VERSION}
TAV: ${TAV_MODULE}
ELASTIC_APM_CONTEXT_MANAGER: ${ELASTIC_APM_CONTEXT_MANAGER}
Expand Down
41 changes: 41 additions & 0 deletions .ci/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,43 @@ services:
volumes:
- nodelocalstackdata:/var/lib/localstack

zookeeper:
# https://hub.docker.com/r/bitnami/zookeeper/tags
image: bitnami/zookeeper:3.9.1
ports:
- "2181:2181"
volumes:
- nodezookeeperdata:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
# https://hub.docker.com/r/bitnami/kafka/tags
image: bitnami/kafka:3.3.2
ports:
- "9093:9093"
volumes:
- nodekafkadata:/var/lib/kafka/data
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
depends_on:
- zookeeper
# TODO: maybe not necessary but figure out how to do this
healthcheck:
# use netcat to check tcp connection available
# test: nc -z localhost 9093 || exit -1
# start_period: 15s
# interval: 5s
# timeout: 10s
# retries: 5

volumes:
nodepgdata:
driver: local
Expand All @@ -155,3 +192,7 @@ volumes:
driver: local
nodelocalstackdata:
driver: local
nodekafkadata:
driver: local
nodezookeeperdata:
driver: local
3 changes: 3 additions & 0 deletions .ci/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ elif [[ -n "${TAV_MODULE}" ]]; then
aws-sdk|@aws-sdk/client-s3|@aws-sdk/client-dynamodb|@aws-sdk/client-sns|@aws-sdk/client-sqs)
DOCKER_COMPOSE_FILE=docker-compose-localstack.yml
;;
kafkajs)
DOCKER_COMPOSE_FILE=docker-compose-kafka.yml
;;
*)
# Just the "node_tests" container. No additional services needed for testing.
DOCKER_COMPOSE_FILE=docker-compose-node-test.yml
Expand Down
1 change: 1 addition & 0 deletions .ci/tav.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{ "name": "generic-pool", "minMajorVersion": 8 },
{ "name": "graphql", "minMajorVersion": 8 },
{ "name": "ioredis", "minMajorVersion": 8 },
{ "name": "kafkajs", "minMajorVersion": 14 },
{ "name": "knex", "minMajorVersion": 8 },
{ "name": "memcached", "minMajorVersion": 8 },
{ "name": "mongodb", "minMajorVersion": 8 },
Expand Down
25 changes: 25 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ jobs:
volumes:
- nodelocalstackdata:/var/lib/localstack

zookeeper:
image: bitnami/zookeeper:3.9.1
env:
ALLOW_ANONYMOUS_LOGIN: 'yes'
ports:
- "2181:2181"
volumes:
- nodezookeeperdata:/var/lib/zookeeper/data

kafka:
image: bitnami/kafka:3.3.2
ports:
- "9093:9093"
volumes:
- nodekafkadata:/var/lib/kafka/data
env:
KAFKA_BROKER_ID: '1'
KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_CFG_LISTENERS: 'CLIENT://:9092,EXTERNAL://:9093'
KAFKA_CFG_ADVERTISED_LISTENERS: 'CLIENT://kafka:9092,EXTERNAL://localhost:9093'
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'CLIENT'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'

strategy:
fail-fast: false
matrix:
Expand Down
6 changes: 6 additions & 0 deletions .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,9 @@ undici:
mode: max-7
include: '>=4.7.1 <6'
commands: node test/instrumentation/modules/undici/undici.test.js

kafkajs:
versions:
mode: latest-minors
include: '>=2 <3'
commands: node test/instrumentation/modules/kafkajs/kafkajs.test.js
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ See the <<upgrade-to-v4>> guide.

* Update <<opentelemetry-bridge>> support to `@opentelemetry/api` version 1.8.0.
* Add support for `tedious` version v17. ({pull}3901[#3901])
* Add support for `kafkajs` version v2. ({issues}2905[#2905])

[float]
===== Bug fixes
Expand Down
1 change: 1 addition & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ so those should be supported as well.
|https://www.npmjs.com/package/tedious[tedious] |>=1.9 <18.0.0 | (Excluding v4.0.0.) Will instrument all queries
|https://www.npmjs.com/package/undici[undici] | >=4.7.1 <6 | Will instrument undici HTTP requests, except HTTP CONNECT. Requires node v14.17.0 or later, or the user to have installed the https://www.npmjs.com/package/diagnostics_channel['diagnostics_channel' polyfill].
|https://www.npmjs.com/package/ws[ws] |>=1.0.0 <8.0.0 |Will instrument outgoing WebSocket messages
|https://www.npmjs.com/package/kafkajs[kafkajs] |>=2.0.0 <3.0.0 |Will instrument all send methods for producers and message and batch processing for consumers.
|=======================================================================

[float]
Expand Down
89 changes: 89 additions & 0 deletions examples/trace-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env node --unhandled-rejections=strict

/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/

// A small example showing Elastic APM tracing the 'kadfkajs' package.
//
// This assumes a Kafka server running on localhost. You can use:
// npm run docker:start kafka
// to start a Kafka container. Then `npm run docker:stop` to stop it.

// eslint-disable-next-line no-unused-vars
const apm = require('../').start({
serviceName: 'example-trace-kafka',
});

const { Buffer } = require('buffer');
const { TextEncoder } = require('util');

const { Kafka } = require('kafkajs');

const topic = 'trace-kafka-topic';
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9093'] });
const admin = kafka.admin();

const headerStr = 'value inside buffer';
const headerEnc = new TextEncoder().encode(headerStr);
const headerBuf = Buffer.from(headerEnc);

let producer, consumer;
let messagesConsumed = 0;

async function run() {
await admin.connect();
await admin.createTopics({ topics: [{ topic }] });

consumer = kafka.consumer({ groupId: 'trace-group' });
producer = kafka.producer();

await producer.connect();
await producer.send({
topic,
messages: [
{ value: 'message 1', headers: { foo: 'bar' } },
{ value: 'message 2', headers: { foo: headerBuf } },
{ value: 'message 3' },
],
});

await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async function ({ topic, partition, message }) {
console.log(`message from topic(${topic}): ${message.value.toString()}`);
console.log(`message header ${message.headers.foo}`);
messagesConsumed++;
},
});

await new Promise((resolve, reject) => {
let count = 0;
const id = setInterval(() => {
count++;
if (messagesConsumed === 3) {
clearInterval(id);
resolve();
} else if (count > 10) {
// set a limit of 10s/retries
clearInterval(id);
reject(new Error('not receiving all messages after 10s'));
}
}, 1000);
});
}

run()
.catch((err) => {
console.warn('run err:', err);
})
.finally(async () => {
console.log('disconnecting Kafkajs client');
await producer.disconnect();
await consumer.disconnect();
await admin.deleteTopics({ topics: [topic] });
await admin.disconnect();
});
1 change: 1 addition & 0 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var MODULE_PATCHERS = [
{ modPath: 'http2' },
{ modPath: 'ioredis' },
{ modPath: 'jade' },
{ modPath: 'kafkajs' },
{ modPath: 'knex' },
{ modPath: 'koa' },
{ modPath: 'koa-router' },
Expand Down
Loading
Loading