@@ -49,15 +49,16 @@ def check(self, _):
49
49
highwater_offsets = {}
50
50
broker_timestamps = defaultdict (dict )
51
51
cluster_id = ""
52
+ persistent_cache_key = "broker_timestamps_"
52
53
try :
53
54
if len (consumer_offsets ) < self ._context_limit :
54
55
# Fetch highwater offsets
55
56
# Expected format: ({(topic, partition): offset}, cluster_id)
56
57
highwater_offsets , cluster_id = self .client .get_highwater_offsets (consumer_offsets )
57
58
if self ._data_streams_enabled :
58
- broker_timestamps = self ._load_broker_timestamps ()
59
+ broker_timestamps = self ._load_broker_timestamps (persistent_cache_key )
59
60
self ._add_broker_timestamps (broker_timestamps , highwater_offsets )
60
- self ._save_broker_timestamps (broker_timestamps )
61
+ self ._save_broker_timestamps (broker_timestamps , persistent_cache_key )
61
62
else :
62
63
self .warning ("Context limit reached. Skipping highwater offset collection." )
63
64
except Exception :
@@ -94,11 +95,11 @@ def check(self, _):
94
95
if self .config ._close_admin_client :
95
96
self .client .close_admin_client ()
96
97
97
- def _load_broker_timestamps (self ):
98
+ def _load_broker_timestamps (self , persistent_cache_key ):
98
99
"""Loads broker timestamps from persistent cache."""
99
100
broker_timestamps = defaultdict (dict )
100
101
try :
101
- for topic_partition , content in json .loads (self .read_persistent_cache ("broker_timestamps_" )).items ():
102
+ for topic_partition , content in json .loads (self .read_persistent_cache (persistent_cache_key )).items ():
102
103
for offset , timestamp in content .items ():
103
104
broker_timestamps [topic_partition ][int (offset )] = timestamp
104
105
except Exception as e :
@@ -113,9 +114,9 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
113
114
if len (timestamps ) > self ._max_timestamps :
114
115
del timestamps [min (timestamps )]
115
116
116
- def _save_broker_timestamps (self , broker_timestamps ):
117
+ def _save_broker_timestamps (self , broker_timestamps , persistent_cache_key ):
117
118
"""Saves broker timestamps to persistent cache."""
118
- self .write_persistent_cache ("broker_timestamps" , json .dumps (broker_timestamps ))
119
+ self .write_persistent_cache (persistent_cache_key , json .dumps (broker_timestamps ))
119
120
120
121
def report_highwater_offsets (self , highwater_offsets , contexts_limit , cluster_id ):
121
122
"""Report the broker highwater offsets."""
0 commit comments