Skip to content

Commit f800fbf

Browse files
authored
Kafka consumer : add consumer group state as tag (#17686)
* kafka consumer group state * kafka consumer group state 1 * kafka consumer group state fix test * kafka consumer group state linter and changelog * kafka consumer group state update * kafka consumer group state remove expception * kafka consumer group state remove expception 2 * kafka consumer group state remove expception 3 * kafka consumer group state remove expception 3\4 * kafka consumer group state fix tests
1 parent e1e61e5 commit f800fbf

File tree

5 files changed

+42
-5
lines changed

5 files changed

+42
-5
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Kafka consumer : add consumer group state as tag

kafka_consumer/datadog_checks/kafka_consumer/client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,39 @@ def _get_consumer_groups(self):
254254
else:
255255
return self.config._consumer_groups
256256

257+
def get_consumer_group_state(self, consumer_group):
258+
consumer_group_state = ""
259+
# Get the consumer group state if present
260+
consumer_groups_future = self._describe_consumer_groups(consumer_group)
261+
consumer_groups_result = consumer_groups_future[consumer_group].result()
262+
self.log.debug(
263+
"Consumer group: %s in state %s",
264+
consumer_groups_result.group_id,
265+
consumer_groups_result.state,
266+
)
267+
consumer_group_result_state = str(consumer_groups_result.state)
268+
consumer_group_state = consumer_group_result_state.split('.')[1]
269+
270+
return consumer_group_state
271+
257272
def _list_consumer_group_offsets(self, cg_tp):
273+
"""
274+
:returns: A dict of futures for each group, keyed by the group id.
275+
The future result() method returns :class:`ConsumerGroupTopicPartitions`.
276+
277+
:rtype: dict[str, future]
278+
"""
258279
return self.kafka_client.list_consumer_group_offsets([cg_tp])
259280

281+
def _describe_consumer_groups(self, consumer_group):
282+
"""
283+
:returns: A dict of futures for each group, keyed by the group_id.
284+
The future result() method returns :class:`ConsumerGroupDescription`.
285+
286+
:rtype: dict[str, future]
287+
"""
288+
return self.kafka_client.describe_consumer_groups([consumer_group])
289+
260290
def close_admin_client(self):
261291
self._kafka_client = None
262292

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ def report_consumer_offsets_and_lag(
138138
reported_contexts = 0
139139
self.log.debug("Reporting consumer offsets and lag metrics")
140140
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
141+
consumer_group_state = self.client.get_consumer_group_state(consumer_group)
141142
if reported_contexts >= contexts_limit:
142143
self.log.debug(
143144
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
@@ -151,6 +152,7 @@ def report_consumer_offsets_and_lag(
151152
'partition:%s' % partition,
152153
'consumer_group:%s' % consumer_group,
153154
'kafka_cluster_id:%s' % cluster_id,
155+
'consumer_group_state:%s' % consumer_group_state,
154156
]
155157
consumer_group_tags.extend(self.config._custom_tags)
156158

kafka_consumer/tests/common.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ def assert_check_kafka(aggregator, consumer_groups):
102102
aggregator.assert_metric(mname, tags=tags, count=1)
103103

104104
for mname in CONSUMER_METRICS:
105-
aggregator.assert_metric(
106-
mname,
107-
tags=tags + [f"consumer_group:{name}"],
108-
count=1,
109-
)
105+
aggregator.assert_metric(mname)
106+
t = tags + [f"consumer_group:{name}"]
107+
if aggregator.assert_metric_has_tags(mname, t):
108+
# Check for the tag consumer_group_state
109+
aggregator.assert_metric_has_tag_prefix(mname, tag_prefix='consumer_group_state')
110110

111111
aggregator.assert_all_metrics_covered()
112112
aggregator.assert_metrics_using_metadata(get_metadata_metrics())

kafka_consumer/tests/test_unit.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(
162162
mock_client.get_consumer_offsets.return_value = consumer_offset
163163
mock_client.get_highwater_offsets.return_value = (highwater_offset, "cluster_id")
164164
mock_client.get_partitions_for_topic.return_value = ['partition1']
165+
mock_client.get_consumer_group_state.return_value = "STABLE"
165166
mock_generic_client.return_value = mock_client
166167

167168
# When
@@ -183,6 +184,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(
183184
'partition:partition1',
184185
'topic:topic1',
185186
'kafka_cluster_id:cluster_id',
187+
'consumer_group_state:STABLE',
186188
],
187189
)
188190
aggregator.assert_metric(
@@ -194,6 +196,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(
194196
'partition:partition1',
195197
'topic:topic1',
196198
'kafka_cluster_id:cluster_id',
199+
'consumer_group_state:STABLE',
197200
],
198201
)
199202
aggregator.assert_event(
@@ -208,6 +211,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(
208211
'partition:partition1',
209212
'topic:topic1',
210213
'kafka_cluster_id:cluster_id',
214+
'consumer_group_state:STABLE',
211215
],
212216
)
213217

0 commit comments

Comments
 (0)