From 9ffb260f871c39c19b213aa99dd2e3627a5b1c2b Mon Sep 17 00:00:00 2001 From: David Luna Date: Wed, 14 Feb 2024 16:43:02 +0100 Subject: [PATCH] chore: honor captureHeaders config option --- lib/instrumentation/modules/kafkajs.js | 2 +- .../fixtures/use-kafkajs-each-message.js | 2 - .../modules/kafkajs/kafkajs.test.js | 37 ++++++------------- 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/lib/instrumentation/modules/kafkajs.js b/lib/instrumentation/modules/kafkajs.js index 3be2cd7cd10..410b18eb380 100644 --- a/lib/instrumentation/modules/kafkajs.js +++ b/lib/instrumentation/modules/kafkajs.js @@ -141,7 +141,7 @@ module.exports = function (mod, agent, { version, enabled }) { messageCtx.body = message.value.toString(); } - if (message.headers) { + if (message.headers && config.captureHeaders) { // Make sure there is no sensitive data // and transform non-redacted buffers messageCtx.headers = redactKeysFromObject( diff --git a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js index 4753fe7f0d7..d3ede96c11c 100644 --- a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js +++ b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js @@ -15,8 +15,6 @@ const apm = require('../../../../..').start({ stackTraceLimit: 4, // get it smaller for reviewing output logLevel: 'info', ignoreMessageQueues: ['*-ignore'], - captureBody: process.env.TEST_CAPTURE_BODY || 'off', - captureHeaders: process.env.TEST_CAPTURE_HEADERS || false, }); const { Buffer } = require('buffer'); diff --git a/test/instrumentation/modules/kafkajs/kafkajs.test.js b/test/instrumentation/modules/kafkajs/kafkajs.test.js index 51923d296cb..2ca5a1056ff 100644 --- a/test/instrumentation/modules/kafkajs/kafkajs.test.js +++ b/test/instrumentation/modules/kafkajs/kafkajs.test.js @@ -158,7 +158,9 @@ const testFixtures = [ delete t.context.message.age; }); - // Check message handling transactions + // Check message handling transactions. + // Headers should be captured by default and redacted + // according to the default value of `sanitizeFieldNames` t.deepEqual(transactions.shift(), { name: `Kafka RECEIVE from ${kafkaTopic}`, type: 'messaging', @@ -336,7 +338,7 @@ const testFixtures = [ }, }, { - name: 'simple Kafkajs usage scenario for headers and body capturing on message reception', + name: 'simple Kafkajs usage scenario for `captureHeaders=false` and `capturebody=all` on message reception', script: 'fixtures/use-kafkajs-each-message.js', cwd: __dirname, timeout: 20000, @@ -345,8 +347,8 @@ const testFixtures = [ TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, TEST_TOPIC: kafkaTopic, TEST_KAFKA_HOST: kafkaHost, - TEST_CAPTURE_HEADERS: 'true', - TEST_CAPTURE_BODY: 'all', + ELASTIC_APM_CAPTURE_HEADERS: 'false', + ELASTIC_APM_CAPTURE_BODY: 'all', // Suppres warinings about new default partitioner // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner KAFKAJS_NO_PARTITIONER_WARNING: '1', @@ -449,9 +451,9 @@ const testFixtures = [ // NOTE: messages could arrive in different order so we sort them // to properly do the assertions transactions.sort((t1, t2) => { - const header1 = t1.context.message.headers.foo || 'undefined'; - const header2 = t2.context.message.headers.foo || 'undefined'; - return header1 < header2 ? -1 : 1; + const body1 = t1.context.message.body || 'undefined'; + const body2 = t2.context.message.body || 'undefined'; + return body1 < body2 ? -1 : 1; }); transactions.forEach((t) => { // Remove variable and common fields to facilitate t.deepEqual below. @@ -479,12 +481,7 @@ const testFixtures = [ service: {}, message: { queue: { name: kafkaTopic }, - headers: { - foo: 'buffer', - traceparent: `00-${tx.trace_id}-${parentId}-01`, - tracestate: 'es=s:1', - }, - body: 'each message 2', + body: 'each message 1', }, }, outcome: 'success', @@ -497,12 +494,7 @@ const testFixtures = [ service: {}, message: { queue: { name: kafkaTopic }, - headers: { - foo: 'string', - traceparent: `00-${tx.trace_id}-${parentId}-01`, - tracestate: 'es=s:1', - }, - body: 'each message 1', + body: 'each message 2', }, }, outcome: 'success', @@ -515,11 +507,6 @@ const testFixtures = [ service: {}, message: { queue: { name: kafkaTopic }, - headers: { - auth: '[REDACTED]', - traceparent: `00-${tx.trace_id}-${parentId}-01`, - tracestate: 'es=s:1', - }, body: 'each message 3', }, }, @@ -538,7 +525,6 @@ const testFixtures = [ TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, TEST_TOPIC: kafkaTopic, TEST_KAFKA_HOST: kafkaHost, - TEST_CAPTURE_HEADERS: 'true', TEST_MODE: 'send', // Suppres warinings about new default partitioner // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner @@ -567,7 +553,6 @@ const testFixtures = [ TEST_GROUP_ID: `elastictest-kafka-group-${rand}`, TEST_TOPIC: kafkaTopic, TEST_KAFKA_HOST: kafkaHost, - TEST_CAPTURE_HEADERS: 'true', TEST_MODE: 'consume', // Suppres warinings about new default partitioner // https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner