Skip to content

Commit 61a7887

Browse files
committed
Create one row per queryid in _apply_deltas.
1 parent decf7e8 commit 61a7887

File tree

3 files changed

+35
-34
lines changed

3 files changed

+35
-34
lines changed

postgres/datadog_checks/postgres/statements.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -459,33 +459,33 @@ def _emit_pg_stat_statements_metrics(self):
459459
self._log.warning("Failed to query for pg_stat_statements count: %s", e)
460460

461461
# _apply_deltas expects normalized rows before any merging of duplicates.
462-
# It takes the partial rows from pg_stat_statements and aggregates metric values
463-
# for queryids that map to the same query_signature
464-
def _apply_deltas(self, rows, metrics):
462+
# It takes the incremental pg_stat_statements rows and constructs the full set of rows
463+
# by adding the existing values in the baseline_metrics cache. This is equivalent to
464+
# fetching the full set of rows from pg_stat_statements, but we avoid paying the price of
465+
# actually querying the rows.
466+
def _apply_deltas(self, rows):
467+
465468
# Apply called queries to baseline_metrics
466469
for row in rows:
467470
query_signature = row['query_signature']
468471
queryid = row['queryid']
472+
baseline_row = copy.copy(row)
469473
if query_signature not in self._baseline_metrics:
470-
self._baseline_metrics[query_signature] = {queryid: copy.copy(row)}
474+
self._baseline_metrics[query_signature] = {queryid: baseline_row}
471475
else:
472-
self._baseline_metrics[query_signature][queryid] = copy.copy(row)
476+
self._baseline_metrics[query_signature][queryid] = baseline_row
473477

474-
# Aggregate multiple queryids into one row per query_signature
475-
aggregated_rows = []
478+
# Apply query text, so it doesn't have to be cached.
479+
query_text = {row['query_signature']: row['query'] for row in rows}
480+
applied_rows = []
476481
for query_signature, query_sig_metrics in self._baseline_metrics.items():
477-
aggregated_row = {}
478482
for queryid, row in query_sig_metrics.items():
479-
if 'query_signature' not in aggregated_row:
480-
aggregated_row = copy.copy(row)
483+
if query_signature in query_text:
484+
applied_rows.append({**row, 'query': query_text[query_signature]})
481485
else:
482-
for metric in metrics:
483-
if metric in row:
484-
aggregated_row[metric] += row[metric]
485-
486-
aggregated_rows.append(aggregated_row)
486+
applied_rows.append(row)
487487

488-
return aggregated_rows
488+
return applied_rows
489489

490490
# To prevent the baseline metrics cache from growing indefinitely (as can happen) because of
491491
# pg_stat_statements eviction), we clear it out periodically to force a full refetch.
@@ -503,12 +503,13 @@ def _collect_metrics_rows(self):
503503
self._check_baseline_metrics_expiry()
504504
rows = self._load_pg_stat_statements()
505505
rows = self._normalize_queries(rows)
506+
rows = self._apply_deltas(rows)
507+
506508
if not rows:
507509
return []
508510

509511
available_columns = set(rows[0].keys())
510512
metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS
511-
rows = self._apply_deltas(rows, metric_columns)
512513

513514
rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key)
514515
self._check.gauge(

postgres/tests/test_apply_deltas.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,34 @@ def test_apply_deltas_base_case(pg_instance, integration_check):
1010
check = integration_check(pg_instance)
1111

1212
rows = [
13-
{'queryid': 1, 'query_signature': 'abc', 'calls': 1},
14-
{'queryid': 2, 'query_signature': 'abc', 'calls': 2},
13+
{'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'},
14+
{'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'},
1515
]
16-
metrics = ['calls']
1716

18-
rows = check.statement_metrics._apply_deltas(rows, metrics)
17+
rows = check.statement_metrics._apply_deltas(rows)
1918

20-
assert len(rows) == 1
21-
assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 3}
19+
assert rows == [
20+
{'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'},
21+
{'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}
22+
]
2223

2324

2425
def test_apply_deltas_multiple_runs(pg_instance, integration_check):
2526
check = integration_check(pg_instance)
2627

2728
rows = [
28-
{'queryid': 1, 'query_signature': 'abc', 'calls': 1},
29-
{'queryid': 2, 'query_signature': 'abc', 'calls': 2},
29+
{'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'},
30+
{'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'},
3031
]
31-
metrics = ['calls']
3232

33-
rows = check.statement_metrics._apply_deltas(rows, metrics)
33+
rows = check.statement_metrics._apply_deltas(rows)
3434

3535
second_rows = [
36-
{'queryid': 2, 'query_signature': 'abc', 'calls': 3},
36+
{'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'},
3737
]
38-
rows = check.statement_metrics._apply_deltas(second_rows, metrics)
38+
rows = check.statement_metrics._apply_deltas(second_rows)
3939

40-
assert len(rows) == 1
41-
assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 4}
40+
assert rows == [
41+
{'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'},
42+
{'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}
43+
]

postgres/tests/test_statements.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ def test_statement_metrics_multiple_pgss_rows_single_query_signature(
8686
# don't need samples for this test
8787
dbm_instance['query_samples'] = {'enabled': False}
8888
dbm_instance['query_activity'] = {'enabled': False}
89-
# very low collection interval for test purposes
90-
dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1}
9189
connections = {}
9290

9391
def normalize_query(q):
@@ -144,7 +142,7 @@ def _run_query(idx):
144142
run_one_check(check, dbm_instance, cancel=False)
145143
aggregator.reset()
146144

147-
# Call other query
145+
# Call other query that maps to same query signature
148146
_run_query(1)
149147
run_one_check(check, dbm_instance, cancel=False)
150148

0 commit comments

Comments
 (0)