@@ -85,12 +85,19 @@ def __init__(self, check, config, connection_args):
85
85
self .log = get_check_logger ()
86
86
self ._state = StatementMetrics ()
87
87
self ._obfuscate_options = to_native_string (json .dumps (self ._config .obfuscator_options ))
88
+ # last_seen: the last query execution time seen by the check
89
+ # This is used to limit the queries to fetch from the performance schema to only the new ones
90
+ self ._last_seen = '1970-01-01'
88
91
# full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature
89
92
self ._full_statement_text_cache = TTLCache (
90
93
maxsize = self ._config .full_statement_text_cache_max_size ,
91
94
ttl = 60 * 60 / self ._config .full_statement_text_samples_per_hour_per_query ,
92
95
) # type: TTLCache
93
96
97
+ # statement_rows: cache of all rows for each digest, keyed by (schema_name, query_signature)
98
+ # This is used to cache the metrics for queries that have the same query_signature but different digests
99
+ self ._statement_rows = {} # type: Dict[(str, str), Dict[str, PyMysqlRow]]
100
+
94
101
def _get_db_connection (self ):
95
102
"""
96
103
lazy reconnect db
@@ -111,7 +118,14 @@ def _close_db_conn(self):
111
118
self ._db = None
112
119
113
120
def run_job (self ):
121
+ start = time .time ()
114
122
self .collect_per_statement_metrics ()
123
+ self ._check .gauge (
124
+ "dd.mysql.statement_metrics.collect_metrics.elapsed_ms" ,
125
+ (time .time () - start ) * 1000 ,
126
+ tags = self ._check .tags + self ._check ._get_debug_tags (),
127
+ hostname = self ._check .resolved_hostname ,
128
+ )
115
129
116
130
@tracked_method (agent_check_getter = attrgetter ('_check' ))
117
131
def collect_per_statement_metrics (self ):
@@ -134,12 +148,14 @@ def collect_per_statement_metrics(self):
134
148
)
135
149
return
136
150
137
- rows = self ._collect_per_statement_metrics ()
138
- if not rows :
139
- return
140
151
# Omit internal tags for dbm payloads since those are only relevant to metrics processed directly
141
152
# by the agent
142
153
tags = [t for t in self ._tags if not t .startswith ('dd.internal' )]
154
+
155
+ rows = self ._collect_per_statement_metrics (tags )
156
+ if not rows :
157
+ # No rows to process, can skip the rest of the payload generation and avoid an empty payload
158
+ return
143
159
for event in self ._rows_to_fqt_events (rows , tags ):
144
160
self ._check .database_monitoring_query_sample (json .dumps (event , default = default_json_event_encoding ))
145
161
payload = {
@@ -156,20 +172,45 @@ def collect_per_statement_metrics(self):
156
172
'mysql_rows' : rows ,
157
173
}
158
174
self ._check .database_monitoring_query_metrics (json .dumps (payload , default = default_json_event_encoding ))
159
- self ._check .count (
175
+ self ._check .gauge (
160
176
"dd.mysql.collect_per_statement_metrics.rows" ,
161
177
len (rows ),
162
178
tags = tags + self ._check ._get_debug_tags (),
163
179
hostname = self ._check .resolved_hostname ,
164
180
)
165
181
166
- def _collect_per_statement_metrics (self ):
167
- # type: () -> List[PyMysqlRow]
182
+ def _collect_per_statement_metrics (self , tags ):
183
+ # type: (List[str]) -> List[PyMysqlRow]
184
+
185
+ self ._get_statement_count (tags )
186
+
168
187
monotonic_rows = self ._query_summary_per_statement ()
188
+ self ._check .gauge (
189
+ "dd.mysql.statement_metrics.query_rows" ,
190
+ len (monotonic_rows ),
191
+ tags = tags + self ._check ._get_debug_tags (),
192
+ hostname = self ._check .resolved_hostname ,
193
+ )
194
+
195
+ monotonic_rows = self ._filter_query_rows (monotonic_rows )
169
196
monotonic_rows = self ._normalize_queries (monotonic_rows )
197
+ monotonic_rows = self ._add_associated_rows (monotonic_rows )
170
198
rows = self ._state .compute_derivative_rows (monotonic_rows , METRICS_COLUMNS , key = _row_key )
171
199
return rows
172
200
201
+ def _get_statement_count (self , tags ):
202
+ with closing (self ._get_db_connection ().cursor (CommenterDictCursor )) as cursor :
203
+ cursor .execute ("SELECT count(*) AS count from performance_schema.events_statements_summary_by_digest" )
204
+
205
+ rows = cursor .fetchall () or [] # type: ignore
206
+ if rows :
207
+ self ._check .gauge (
208
+ "dd.mysql.statement_metrics.events_statements_summary_by_digest.total_rows" ,
209
+ rows [0 ]['count' ],
210
+ tags = tags + self ._check ._get_debug_tags (),
211
+ hostname = self ._check .resolved_hostname ,
212
+ )
213
+
173
214
def _query_summary_per_statement (self ):
174
215
# type: () -> List[PyMysqlRow]
175
216
"""
@@ -178,6 +219,14 @@ def _query_summary_per_statement(self):
178
219
values to get the counts for the elapsed period. This is similar to monotonic_count, but
179
220
several fields must be further processed from the delta values.
180
221
"""
222
+ only_query_recent_statements = self ._config .statement_metrics_config .get ('only_query_recent_statements' , False )
223
+ condition = (
224
+ "WHERE `last_seen` >= %s"
225
+ if only_query_recent_statements
226
+ else """WHERE `digest_text` NOT LIKE 'EXPLAIN %' OR `digest_text` IS NULL
227
+ ORDER BY `count_star` DESC
228
+ LIMIT 10000"""
229
+ )
181
230
182
231
sql_statement_summary = """\
183
232
SELECT `schema_name`,
@@ -193,19 +242,34 @@ def _query_summary_per_statement(self):
193
242
`sum_select_scan`,
194
243
`sum_select_full_join`,
195
244
`sum_no_index_used`,
196
- `sum_no_good_index_used`
245
+ `sum_no_good_index_used`,
246
+ `last_seen`
197
247
FROM performance_schema.events_statements_summary_by_digest
198
- WHERE `digest_text` NOT LIKE 'EXPLAIN %' OR `digest_text` IS NULL
199
- ORDER BY `count_star` DESC
200
- LIMIT 10000"""
248
+ {}
249
+ """ .format (
250
+ condition
251
+ )
201
252
202
253
with closing (self ._get_db_connection ().cursor (CommenterDictCursor )) as cursor :
203
- cursor .execute (sql_statement_summary )
254
+ args = [self ._last_seen ] if only_query_recent_statements else None
255
+ cursor .execute (sql_statement_summary , args )
204
256
205
257
rows = cursor .fetchall () or [] # type: ignore
206
258
259
+ if rows :
260
+ self ._last_seen = max (row ['last_seen' ] for row in rows )
261
+
207
262
return rows
208
263
264
+ def _filter_query_rows (self , rows ):
265
+ # type: (List[PyMysqlRow]) -> List[PyMysqlRow]
266
+ """
267
+ Filter out rows that are EXPLAIN statements
268
+ """
269
+ return [
270
+ row for row in rows if row ['digest_text' ] is None or not row ['digest_text' ].lower ().startswith ('explain' )
271
+ ]
272
+
209
273
def _normalize_queries (self , rows ):
210
274
normalized_rows = []
211
275
for row in rows :
@@ -227,6 +291,23 @@ def _normalize_queries(self, rows):
227
291
228
292
return normalized_rows
229
293
294
+ def _add_associated_rows (self , rows ):
295
+ """
296
+ If two or more statements with different digests have the same query_signature, they are considered the same
297
+ Because only one digest statement may be updated, we cache all the rows for each digest,
298
+ update with any new rows and then return all the rows for all the query_signatures.
299
+
300
+ We return all rows to guard against the case where a signature wasn't collected on the immediately previous run
301
+ but was present on runs before that.
302
+ """
303
+ for row in rows :
304
+ key = (row ['schema_name' ], row ['query_signature' ])
305
+ if key not in self ._statement_rows :
306
+ self ._statement_rows [key ] = {}
307
+ self ._statement_rows [key ][row ['digest' ]] = row
308
+
309
+ return [row for statement_row in self ._statement_rows .values () for row in statement_row .values ()]
310
+
230
311
def _rows_to_fqt_events (self , rows , tags ):
231
312
for row in rows :
232
313
query_cache_key = _row_key (row )
0 commit comments