Skip to content

Commit 14c90a3

Browse files
committed
update
1 parent d79c61d commit 14c90a3

File tree

2 files changed

+72
-83
lines changed

2 files changed

+72
-83
lines changed

kafka_consumer/datadog_checks/kafka_consumer/client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,15 @@ def describe_consumer_group(self, consumer_group):
177177
return desc.state.name
178178

179179
def get_message(self, topic, partition, offset):
180-
consumer = self.__create_consumer('datadog')
181-
consumer.assign([TopicPartition(topic, partition, offset)])
182-
message = consumer.poll(timeout=1)
183-
consumer.close()
180+
self.open_consumer('datadog')
181+
self._consumer.assign([TopicPartition(topic, partition, offset)])
182+
message = self._consumer.poll(timeout=1)
183+
self._consumer.close()
184184
if message is None:
185185
return None
186186
return message.value()
187187

188+
189+
188190
def close_admin_client(self):
189-
self._kafka_client = None
191+
self._kafka_client = None

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 65 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import json
55
from collections import defaultdict
66
from time import time
7+
78
import yaml
89

910
from datadog_checks.base import AgentCheck, is_affirmative
@@ -32,23 +33,55 @@ def __init__(self, name, init_config, instances):
3233
self.topic_partition_cache = {}
3334
self.check_initializations.insert(0, self.config.validate_config)
3435

35-
3636
def log_message(self):
3737
print("logging message")
38-
yamlConfig = datadog_agent.get_remote_config("test changed")
39-
print("yaml config ", yamlConfig, type(yamlConfig))
40-
parsedConfig = yaml.safe_load(str(yamlConfig))
38+
yaml_config = datadog_agent.get_remote_config("test changed")
39+
# yaml_config = """
40+
# configs:
41+
# - topic: marvel
42+
# partition: 0
43+
# offset: 0
44+
# n_messages: 1
45+
# """
46+
print("yaml config ", yaml_config, type(yaml_config))
47+
parsedConfig = yaml.safe_load(str(yaml_config))
4148
print("parsed config is ", parsedConfig)
4249
for cfg in parsedConfig.get("configs", []):
4350
print("config is ", cfg)
51+
if not 'kafka' in cfg:
52+
print("skipping config without kafka key")
53+
continue
54+
cfg = cfg['kafka']
4455
topic = cfg.get("topic", None)
4556
partition = cfg.get("partition", None)
46-
offset = cfg.get("offset", None)
47-
print("topic is ", topic, "partition is ", partition, "offset is ", offset)
48-
if topic is None or partition is None or offset is None:
57+
offset = cfg.get("start_offset", None)
58+
n_messages = cfg.get("n_messages", None)
59+
cluster = cfg.get("cluster", None)
60+
print("topic is ", topic, "partition is ", partition, "offset is ", offset, "n_messages is ", n_messages, "cluster is ", cluster)
61+
if topic is None or partition is None or offset is None or n_messages is None or cluster is None:
62+
print("skipping config with missing keys")
4963
continue
5064
message = self.client.get_message(topic, partition, offset)
51-
self.send_event("Kafka message", message, ["topic:{}".format(topic), "partition:{}".format(partition), "offset:{}".format(offset)], 'kafka', "", severity="info")
65+
self.send_event(
66+
"Kafka message",
67+
message,
68+
["topic:{}".format(topic), "partition:{}".format(partition), "offset:{}".format(offset)],
69+
'kafka',
70+
"",
71+
severity="info",
72+
)
73+
data = {
74+
'timestamp': int(time()),
75+
'technology': 'kafka',
76+
'cluster': str(cluster),
77+
'topic': str(topic),
78+
'partition': str(partition),
79+
'offset': str(offset),
80+
'data': 'logging message',
81+
}
82+
print("data is ", data)
83+
self.send_log(data)
84+
self.send_log({'message': 'foo piotr', 'timestamp': 1722958617.2842212})
5285
print("message is ", message)
5386
# print("now the last message")
5487
# message = self.client.get_message('marvel', 0, 75)
@@ -57,81 +90,35 @@ def log_message(self):
5790

5891
def check(self, _):
5992
"""The main entrypoint of the check."""
60-
# Fetch Kafka consumer offsets
61-
62-
consumer_offsets = {}
63-
93+
print("Starting kafka_consumer check")
6494
try:
65-
self.client.request_metadata_update()
66-
except:
67-
raise Exception(
68-
"Unable to connect to the AdminClient. This is likely due to an error in the configuration."
69-
)
70-
71-
try:
72-
# Fetch consumer offsets
73-
# Expected format: {consumer_group: {(topic, partition): offset}}
95+
# Get consumer offsets
96+
print("Getting consumer offsets")
7497
consumer_offsets = self.get_consumer_offsets()
75-
except Exception:
76-
self.log.exception("There was a problem collecting consumer offsets from Kafka.")
77-
# don't raise because we might get valid broker offsets
78-
79-
# Fetch the broker highwater offsets
80-
highwater_offsets = {}
81-
broker_timestamps = defaultdict(dict)
82-
cluster_id = ""
83-
persistent_cache_key = "broker_timestamps_"
84-
try:
85-
if len(consumer_offsets) < self._context_limit:
86-
# Fetch highwater offsets
87-
# Expected format: ({(topic, partition): offset}, cluster_id)
88-
highwater_offsets, cluster_id = self.get_highwater_offsets(consumer_offsets)
89-
if self._data_streams_enabled:
90-
broker_timestamps = self._load_broker_timestamps(persistent_cache_key)
91-
self._add_broker_timestamps(broker_timestamps, highwater_offsets)
92-
self._save_broker_timestamps(broker_timestamps, persistent_cache_key)
93-
else:
94-
self.warning("Context limit reached. Skipping highwater offset collection.")
95-
except Exception:
96-
self.log.exception("There was a problem collecting the highwater mark offsets.")
97-
# Unlike consumer offsets, fail immediately because we can't calculate consumer lag w/o highwater_offsets
98-
if self.config._close_admin_client:
99-
self.client.close_admin_client()
100-
raise
101-
102-
total_contexts = sum(len(v) for v in consumer_offsets.values()) + len(highwater_offsets)
103-
self.log.debug(
104-
"Total contexts: %s, Consumer offsets: %s, Highwater offsets: %s",
105-
total_contexts,
106-
consumer_offsets,
107-
highwater_offsets,
108-
)
109-
if total_contexts >= self._context_limit:
110-
self.warning(
111-
"""Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the
112-
check. Please narrow your target by specifying in your kafka_consumer.yaml the consumer groups, topics
113-
and partitions you wish to monitor.""",
114-
total_contexts,
115-
self._context_limit,
98+
print(f"Got consumer offsets: {consumer_offsets}")
99+
100+
# Get highwater offsets
101+
print("Getting highwater offsets")
102+
highwater_offsets, cluster_id = self.get_highwater_offsets(consumer_offsets)
103+
print(f"Got highwater offsets: {highwater_offsets}")
104+
print(f"Cluster ID: {cluster_id}")
105+
106+
# Report metrics
107+
print("Reporting metrics")
108+
self.report_highwater_offsets(highwater_offsets, self._context_limit, cluster_id)
109+
self.report_consumer_offsets_and_lag(
110+
consumer_offsets,
111+
highwater_offsets,
112+
self._context_limit - len(highwater_offsets),
113+
{},
114+
cluster_id,
116115
)
117-
118-
self.report_highwater_offsets(highwater_offsets, self._context_limit, cluster_id)
119-
self.report_consumer_offsets_and_lag(
120-
consumer_offsets,
121-
highwater_offsets,
122-
self._context_limit - len(highwater_offsets),
123-
broker_timestamps,
124-
cluster_id,
125-
)
126-
127-
try:
116+
print("Finished reporting metrics")
128117
self.log_message()
129-
except Exception as e:
130-
print("oops", e)
131-
self.log.exception("Error retrieving payload from Kafka for Data Streams %s", str(e))
132118

133-
if self.config._close_admin_client:
134-
self.client.close_admin_client()
119+
except Exception as e:
120+
print(f"Error in check: {e}")
121+
raise
135122

136123
def get_consumer_offsets(self):
137124
# {(consumer_group, topic, partition): offset}

0 commit comments

Comments
 (0)