From 592f3fb90278edb8e0638e7cf368f5a7e198fd14 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Wed, 1 May 2024 22:35:31 -0400 Subject: [PATCH 01/35] Revert "Revert "[SDBM-842] Postgres integration performance optimization: Limit how many records are pulled from pg_stat_statements. (#17187)" (#17397)" This reverts commit db9b87e0fc6f7f0bc352f9fe70f10fb07cd9d8fa. --- .../postgres/query_calls_cache.py | 37 +++++++++++++ .../datadog_checks/postgres/statements.py | 53 +++++++++++++++++- postgres/tests/test_query_calls_cache.py | 55 +++++++++++++++++++ 3 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 postgres/datadog_checks/postgres/query_calls_cache.py create mode 100644 postgres/tests/test_query_calls_cache.py diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py new file mode 100644 index 0000000000000..2d49f1625219c --- /dev/null +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -0,0 +1,37 @@ +# (C) Datadog, Inc. 2024-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +class QueryCallsCache: + """Maintains a cache of the last-known number of calls per queryid, as per pg_stat_statements""" + + def __init__(self): + self.cache = {} + self.next_cache = {} + + def end_query_call_snapshot(self): + """To prevent evicted statements from building up in the cache we + replace the cache outright after each sampling of pg_stat_statements.""" + self.cache = self.next_cache + self.next_cache = {} + + def set_calls(self, queryid, calls): + """Updates the cache of calls per query id. + + Returns whether or not the number of calls changed based on + the newly updated value. The first seen update for a queryid + does not count as a change in values since that would result + in an inflated value.""" + calls_changed = False + if queryid in self.cache: + diff = calls - self.cache[queryid] + # Positive deltas mean the statement remained in pg_stat_statements + # between check calls. Negative deltas mean the statement was evicted + # and replaced with a new call count. Both cases should count as a call + # change. + calls_changed = diff != 0 + else: + calls_changed = True + + self.next_cache[queryid] = calls + + return calls_changed diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 65d843d24df4b..71ae00486b886 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -19,6 +19,7 @@ from datadog_checks.base.utils.tracking import tracked_method from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor +from .query_calls_cache import QueryCallsCache from .util import DatabaseConfigurationError, payload_pg_version, warning_with_tags from .version_utils import V9_4, V14 @@ -27,6 +28,12 @@ except ImportError: from ..stubs import datadog_agent +QUERYID_TO_CALLS_QUERY = """ +SELECT queryid, calls + FROM {pg_stat_statements_view} + WHERE queryid IS NOT NULL +""" + STATEMENTS_QUERY = """ SELECT {cols} FROM {pg_stat_statements_view} as pg_stat_statements @@ -34,8 +41,8 @@ ON pg_stat_statements.userid = pg_roles.oid LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid - WHERE query != '' - AND query NOT LIKE 'EXPLAIN %%' + WHERE query NOT LIKE 'EXPLAIN %%' + AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) {filters} {extra_clauses} """ @@ -146,6 +153,7 @@ def __init__(self, check, config, shutdown_callback): self.tags = None self._state = StatementMetrics() self._stat_column_cache = [] + self._query_calls_cache = QueryCallsCache() self._track_io_timing_cache = None self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options)) # full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature @@ -177,7 +185,11 @@ def _get_pg_stat_statements_columns(self): # Querying over '*' with limit 0 allows fetching only the column names from the cursor without data query = STATEMENTS_QUERY.format( - cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters="" + cols='*', + pg_stat_statements_view=self._config.pg_stat_statements_view, + extra_clauses="LIMIT 0", + filters="", + called_queryids="", ) with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: @@ -186,6 +198,39 @@ def _get_pg_stat_statements_columns(self): self._stat_column_cache = col_names return col_names + def _check_called_queries(self): + pgss_view_without_query_text = self._config.pg_stat_statements_view + if pgss_view_without_query_text == "pg_stat_statements": + # Passing false for the showtext argument leads to a huge performance increase. This + # allows the engine to avoid retrieving the potentially large amount of text data. + # The query count query does not depend on the statement text, so it's safe for this use case. + # For more info: https://www.postgresql.org/docs/current/pgstatstatements.html#PGSTATSTATEMENTS-FUNCS + pgss_view_without_query_text = "pg_stat_statements(false)" + + with self._check._get_main_db() as conn: + with conn.cursor(cursor_factory=CommenterCursor) as cursor: + called_queryids = [] + query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) + rows = self._execute_query(cursor, query, params=(self._config.dbname,)) + for row in rows: + queryid = row[0] + if queryid is None: + continue + + calls = row[1] + calls_changed = self._query_calls_cache.set_calls(queryid, calls) + if calls_changed: + called_queryids.append(queryid) + self._query_calls_cache.end_query_call_snapshot() + self._check.gauge( + "dd.postgresql.pg_stat_statements.calls_changed", + len(called_queryids), + tags=self.tags, + hostname=self._check.resolved_hostname, + ) + + return called_queryids + def run_job(self): # do not emit any dd.internal metrics for DBM specific check code self.tags = [t for t in self._tags if not t.startswith('dd.internal')] @@ -222,6 +267,7 @@ def collect_per_statement_metrics(self): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _load_pg_stat_statements(self): try: + called_queryids = self._check_called_queries() available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -296,6 +342,7 @@ def _load_pg_stat_statements(self): pg_stat_statements_view=self._config.pg_stat_statements_view, filters=filters, extra_clauses="", + called_queryids=', '.join([str(i) for i in called_queryids]), ), params=params, ) diff --git a/postgres/tests/test_query_calls_cache.py b/postgres/tests/test_query_calls_cache.py new file mode 100644 index 0000000000000..81e5e34f251cc --- /dev/null +++ b/postgres/tests/test_query_calls_cache.py @@ -0,0 +1,55 @@ +# (C) Datadog, Inc. 2024-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +from datadog_checks.postgres.query_calls_cache import QueryCallsCache + +pytestmark = [pytest.mark.unit] + + +def test_statement_queryid_cache_same_calls_does_not_change(): + cache = QueryCallsCache() + cache.set_calls(123, 1) + cache.end_query_call_snapshot() + changed = cache.set_calls(123, 1) + + assert changed is False + + +def test_statement_queryid_cache_multiple_calls_change(): + cache = QueryCallsCache() + cache.set_calls(123, 1) + cache.end_query_call_snapshot() + changed = cache.set_calls(123, 2) + + assert changed is True + + +def test_statement_queryid_cache_after_pg_stat_statement_eviction(): + cache = QueryCallsCache() + cache.set_calls(123, 100) + cache.end_query_call_snapshot() + changed = cache.set_calls(123, 5) + + assert changed is True + + +def test_statement_queryid_cache_snapshot_eviction(): + cache = QueryCallsCache() + cache.set_calls(123, 100) + cache.end_query_call_snapshot() + cache.set_calls(124, 5) + cache.end_query_call_snapshot() + + assert cache.cache.get(123, None) is None + + +def test_statement_queryid_multiple_inserts(): + cache = QueryCallsCache() + cache.set_calls(123, 100) + cache.set_calls(124, 5) + cache.end_query_call_snapshot() + + assert cache.cache[123] == 100 + assert cache.cache[124] == 5 From cb6a05109bff69e9fbfa5bcc5be9f83302d86259 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 7 May 2024 15:00:22 -0400 Subject: [PATCH 02/35] Logging --- .../datadog_checks/postgres/query_calls_cache.py | 4 ++++ postgres/datadog_checks/postgres/statements.py | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index 2d49f1625219c..48439ddf0330d 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -29,6 +29,10 @@ def set_calls(self, queryid, calls): # and replaced with a new call count. Both cases should count as a call # change. calls_changed = diff != 0 + + if calls_changed: + print("[AMW] detected called query" + str(queryid) + " with " + str(calls) + " calls") + else: calls_changed = True diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 71ae00486b886..5115b2186a778 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -47,6 +47,7 @@ {extra_clauses} """ + # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 = "SELECT COUNT(*) FROM pg_stat_statements" @@ -205,7 +206,7 @@ def _check_called_queries(self): # allows the engine to avoid retrieving the potentially large amount of text data. # The query count query does not depend on the statement text, so it's safe for this use case. # For more info: https://www.postgresql.org/docs/current/pgstatstatements.html#PGSTATSTATEMENTS-FUNCS - pgss_view_without_query_text = "pg_stat_statements(false)" + pgss_view_without_query_text = "pg_stat_statements(false)" with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: @@ -218,6 +219,7 @@ def _check_called_queries(self): continue calls = row[1] + print("[AMW] queryid: " + str(queryid) + " | calls: " + str(calls)) calls_changed = self._query_calls_cache.set_calls(queryid, calls) if calls_changed: called_queryids.append(queryid) @@ -268,6 +270,7 @@ def collect_per_statement_metrics(self): def _load_pg_stat_statements(self): try: called_queryids = self._check_called_queries() +# print("[AMW] called queries: " + str(called_queryids)) available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -345,7 +348,7 @@ def _load_pg_stat_statements(self): called_queryids=', '.join([str(i) for i in called_queryids]), ), params=params, - ) + ) except psycopg2.Error as e: error_tag = "error:database-{}".format(type(e).__name__) @@ -461,17 +464,26 @@ def _collect_metrics_rows(self): rows = self._normalize_queries(rows) if not rows: + print("[AMW] no normalized rows, returning") return [] available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) +# print("[AMW] sending dd.postgres.queries.query_rows_raw - " + str(len(rows))) self._check.gauge( 'dd.postgres.queries.query_rows_raw', len(rows), tags=self.tags + self._check._get_debug_tags(), hostname=self._check.resolved_hostname, ) + print("[AMW] called queries") + for row in rows: + if "pg_" in row['query']: + continue + print("QueryId: " + str(row['queryid']) + " | Query: " + str(row['query']) + " | Calls: " + str(row['calls'])) + print("-------------------\n") + return rows def _normalize_queries(self, rows): From 2dcfba631a728d106dda3702b77f55f71d646d53 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 7 May 2024 16:08:37 -0400 Subject: [PATCH 03/35] Add targeted logging --- .../datadog_checks/base/utils/db/statement_metrics.py | 3 +++ postgres/datadog_checks/postgres/query_calls_cache.py | 7 +++---- postgres/datadog_checks/postgres/statements.py | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index be0a238bcd80d..ba80e790b19a5 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -57,6 +57,9 @@ def compute_derivative_rows(self, rows, metrics, key): if prev is None: continue + if row['queryid'] == '-4540465645740005644': + print("[AMW] computing derivative values, prev calls is: " + str(prev['calls']) + " | current calls is: " + str(row['calls']) + " | diff is: " + str(row['calls'] - prev['calls']) + " | prev row: " + str(prev) + " | current row: " + str(row) + " | prev row key: " + str(key(prev)) + " | current row key: " + str(key(row)) + " | prev row key == current row key: " + str(key(prev) == key(row)) + " | prev row key == row key: " + str(key(prev) == row_key) + " | current row key == row key: " + str(key(row) == row_key) + " | prev row key == row key == current row key: " + str(key(prev) == row_key == key(row))) + metric_columns = metrics & set(row.keys()) # Take the diff of all metric values between the current row and the previous run's row. diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index 48439ddf0330d..f53ac57c1d1ce 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -22,6 +22,9 @@ def set_calls(self, queryid, calls): does not count as a change in values since that would result in an inflated value.""" calls_changed = False + if queryid == '-4540465645740005644': + print("[AMW] In query calls cache for " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) + if queryid in self.cache: diff = calls - self.cache[queryid] # Positive deltas mean the statement remained in pg_stat_statements @@ -29,10 +32,6 @@ def set_calls(self, queryid, calls): # and replaced with a new call count. Both cases should count as a call # change. calls_changed = diff != 0 - - if calls_changed: - print("[AMW] detected called query" + str(queryid) + " with " + str(calls) + " calls") - else: calls_changed = True diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 5115b2186a778..82ac508a1a156 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -219,7 +219,7 @@ def _check_called_queries(self): continue calls = row[1] - print("[AMW] queryid: " + str(queryid) + " | calls: " + str(calls)) + # print("[AMW] queryid: " + str(queryid) + " | calls: " + str(calls)) calls_changed = self._query_calls_cache.set_calls(queryid, calls) if calls_changed: called_queryids.append(queryid) @@ -479,8 +479,9 @@ def _collect_metrics_rows(self): ) print("[AMW] called queries") for row in rows: - if "pg_" in row['query']: + if row['queryid'] != '-4540465645740005644': continue + print("QueryId: " + str(row['queryid']) + " | Query: " + str(row['query']) + " | Calls: " + str(row['calls'])) print("-------------------\n") From d586d38fc023f89fee8a415ca5ba790f529aabd9 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 7 May 2024 18:06:47 -0400 Subject: [PATCH 04/35] Improve logging --- .../datadog_checks/base/utils/db/statement_metrics.py | 2 +- postgres/datadog_checks/postgres/query_calls_cache.py | 5 +++-- postgres/datadog_checks/postgres/statements.py | 6 ++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index ba80e790b19a5..778e5548d6f9f 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -57,7 +57,7 @@ def compute_derivative_rows(self, rows, metrics, key): if prev is None: continue - if row['queryid'] == '-4540465645740005644': + if 'pg_' not in row['query']: print("[AMW] computing derivative values, prev calls is: " + str(prev['calls']) + " | current calls is: " + str(row['calls']) + " | diff is: " + str(row['calls'] - prev['calls']) + " | prev row: " + str(prev) + " | current row: " + str(row) + " | prev row key: " + str(key(prev)) + " | current row key: " + str(key(row)) + " | prev row key == current row key: " + str(key(prev) == key(row)) + " | prev row key == row key: " + str(key(prev) == row_key) + " | current row key == row key: " + str(key(row) == row_key) + " | prev row key == row key == current row key: " + str(key(prev) == row_key == key(row))) metric_columns = metrics & set(row.keys()) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index f53ac57c1d1ce..456ba46787653 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -22,10 +22,10 @@ def set_calls(self, queryid, calls): does not count as a change in values since that would result in an inflated value.""" calls_changed = False - if queryid == '-4540465645740005644': - print("[AMW] In query calls cache for " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) if queryid in self.cache: + print("[AMW] Query in cache " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) + diff = calls - self.cache[queryid] # Positive deltas mean the statement remained in pg_stat_statements # between check calls. Negative deltas mean the statement was evicted @@ -33,6 +33,7 @@ def set_calls(self, queryid, calls): # change. calls_changed = diff != 0 else: + print("[AMW] Query not in cache " + str(queryid) + " | new calls: " + str(calls)) calls_changed = True self.next_cache[queryid] = calls diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 82ac508a1a156..4b1b6c95a4a97 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -479,10 +479,8 @@ def _collect_metrics_rows(self): ) print("[AMW] called queries") for row in rows: - if row['queryid'] != '-4540465645740005644': - continue - - print("QueryId: " + str(row['queryid']) + " | Query: " + str(row['query']) + " | Calls: " + str(row['calls'])) + if 'pg_' not in row['query']: + print("QueryId: " + str(row['queryid']) + " | Query: " + str(row['query']) + " | Calls: " + str(row['calls'])) print("-------------------\n") return rows From 32d27dc3f5d7b1aaee78214572d823f9dcdc6a33 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 7 May 2024 18:55:21 -0400 Subject: [PATCH 05/35] Observe logs based on querying all pgss rows. --- postgres/datadog_checks/postgres/statements.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 4b1b6c95a4a97..03da50e3a4f27 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -42,11 +42,13 @@ LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid WHERE query NOT LIKE 'EXPLAIN %%' - AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) {filters} {extra_clauses} """ +# AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) + + # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" From e552d9cd76209fef596341a78e9795d369db7149 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Thu, 9 May 2024 22:40:16 -0400 Subject: [PATCH 06/35] Reproduce inflated metrics in test. --- .../base/utils/db/statement_metrics.py | 2 +- .../postgres/query_calls_cache.py | 4 +- .../datadog_checks/postgres/statements.py | 20 +- postgres/tests/test_statements.py | 355 +++++++++++++++++- 4 files changed, 372 insertions(+), 9 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index 778e5548d6f9f..c0d5460a89dcf 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -58,7 +58,7 @@ def compute_derivative_rows(self, rows, metrics, key): continue if 'pg_' not in row['query']: - print("[AMW] computing derivative values, prev calls is: " + str(prev['calls']) + " | current calls is: " + str(row['calls']) + " | diff is: " + str(row['calls'] - prev['calls']) + " | prev row: " + str(prev) + " | current row: " + str(row) + " | prev row key: " + str(key(prev)) + " | current row key: " + str(key(row)) + " | prev row key == current row key: " + str(key(prev) == key(row)) + " | prev row key == row key: " + str(key(prev) == row_key) + " | current row key == row key: " + str(key(row) == row_key) + " | prev row key == row key == current row key: " + str(key(prev) == row_key == key(row))) + print("[AMW] computing derivative values" + " | prev query " + prev['query'] + " | curr query " + row['query'] + " | prev queryid " + str(prev['queryid']) + " | curr queryid " + str(row['queryid']) + " | prev query_sig: " + prev['query_signature'] + " | cur query sig: " + row['query_signature'] + " | prev calls is: " + str(prev['calls']) + " | current calls is: " + str(row['calls']) + " | diff is: " + str(row['calls'] - prev['calls'])) metric_columns = metrics & set(row.keys()) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index 456ba46787653..08303f2737fd1 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -24,7 +24,7 @@ def set_calls(self, queryid, calls): calls_changed = False if queryid in self.cache: - print("[AMW] Query in cache " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) +# print("[AMW] Query in cache " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) diff = calls - self.cache[queryid] # Positive deltas mean the statement remained in pg_stat_statements @@ -33,7 +33,7 @@ def set_calls(self, queryid, calls): # change. calls_changed = diff != 0 else: - print("[AMW] Query not in cache " + str(queryid) + " | new calls: " + str(calls)) + # print("[AMW] Query not in cache " + str(queryid) + " | new calls: " + str(calls)) calls_changed = True self.next_cache[queryid] = calls diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 03da50e3a4f27..d383f5499731d 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -42,14 +42,11 @@ LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid WHERE query NOT LIKE 'EXPLAIN %%' + AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) {filters} {extra_clauses} """ -# AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) - - - # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 = "SELECT COUNT(*) FROM pg_stat_statements" @@ -215,6 +212,7 @@ def _check_called_queries(self): called_queryids = [] query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) rows = self._execute_query(cursor, query, params=(self._config.dbname,)) + for row in rows: queryid = row[0] if queryid is None: @@ -272,7 +270,7 @@ def collect_per_statement_metrics(self): def _load_pg_stat_statements(self): try: called_queryids = self._check_called_queries() -# print("[AMW] called queries: " + str(called_queryids)) + print("[AMW] called queries: " + str(called_queryids)) available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -352,6 +350,7 @@ def _load_pg_stat_statements(self): params=params, ) except psycopg2.Error as e: + print("_load_pg_stat_statements error") error_tag = "error:database-{}".format(type(e).__name__) if ( @@ -464,11 +463,18 @@ def _collect_metrics_rows(self): self._emit_pg_stat_statements_dealloc() rows = self._load_pg_stat_statements() + for row in rows: + if 'pg_' not in row['query']: + print('[AMW] query from load_pg_stat_statments | ' + str(row['queryid'])) rows = self._normalize_queries(rows) if not rows: print("[AMW] no normalized rows, returning") return [] + for row in rows: + if 'pg_' not in row['query']: + print('[AMW] rows after normalize_queries | ' + str(row['queryid'])) + available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) @@ -488,6 +494,7 @@ def _collect_metrics_rows(self): return rows def _normalize_queries(self, rows): + print("Normalizing queries") normalized_rows = [] for row in rows: normalized_row = dict(copy.copy(row)) @@ -501,6 +508,8 @@ def _normalize_queries(self, rows): continue obfuscated_query = statement['query'] + # print("Obfuscated query: " + statement['query'] + " | query_signature: " + compute_sql_signature(obfuscated_query)) + normalized_row['query'] = obfuscated_query normalized_row['query_signature'] = compute_sql_signature(obfuscated_query) metadata = statement['metadata'] @@ -509,6 +518,7 @@ def _normalize_queries(self, rows): normalized_row['dd_comments'] = metadata.get('comments', None) normalized_rows.append(normalized_row) + print("\n\n") return normalized_rows def _rows_to_fqt_events(self, rows): diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index a5188069626ad..531bcb469ec09 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -77,8 +77,361 @@ def test_dbm_enabled_config(integration_check, dbm_instance, dbm_enabled_key, db assert check._config.dbm_enabled == dbm_enabled -statement_samples_keys = ["query_samples", "statement_samples"] +def test_statement_metrics_optimization( + aggregator, + integration_check, + dbm_instance, +): + print("[AMW] statement metrics optimization test") + # don't need samples for this test + dbm_instance['query_samples'] = {'enabled': False} + dbm_instance['query_activity'] = {'enabled': False} + # very low collection interval for test purposes + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} + connections = {} + + def _run_queries(): + for user, password, dbname, query, arg in SAMPLE_QUERIES: + if dbname not in connections: + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname].cursor().execute(query, (arg,)) + + # Reset pgss + postgres_conn = _get_superconn(dbm_instance) + with postgres_conn.cursor() as cur: + cur.execute("SELECT pg_stat_statements_reset();") + + check = integration_check(dbm_instance) + check._connect() + + # Run multiple times to fill the initial previous_statements with queries from the agent + run_one_check(check, dbm_instance, cancel=False) + run_one_check(check, dbm_instance, cancel=False) + expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) + expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') + aggregator.reset() + + # Get the base number of calls generated by the agent + run_one_check(check, dbm_instance, cancel=False) + base_value = aggregator.metrics('dd.postgres.queries.query_rows_raw')[0].value + print("[AMW] base value is " + str(base_value)) + assert base_value > 0 + aggregator.reset() + + # First check run, no rows generated yet + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, + tags=expected_dbm_metrics_tags) + aggregator.reset() + + # Second check run, we should have one row per sample + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, + tags=expected_dbm_metrics_tags) + + events = aggregator.get_event_platform_events("dbm-metrics") + print("[AMW] events: " + str(events)) +# assert len(events) == 2 + event = events[0] # first item is from the initial dummy check to load pg_settings + + obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' + for username, _, dbname, query, _ in SAMPLE_QUERIES: + expected_query = query % obfuscated_param + query_signature = compute_sql_signature(expected_query) + matching_rows = [r for r in event['postgres_rows'] if r['query_signature'] == query_signature] + + # metrics + assert len(matching_rows) == 1 + row = matching_rows[0] + assert row['calls'] == 1 + assert row['datname'] == dbname + assert row['rolname'] == username + assert row['query'] == expected_query + + aggregator.reset() + + for conn in connections.values(): + conn.close() + +def test_statement_metrics_multiple_pgss_rows_single_query_signature( + aggregator, + integration_check, + dbm_instance, + datadog_agent, +): + # don't need samples for this test + dbm_instance['query_samples'] = {'enabled': False} + dbm_instance['query_activity'] = {'enabled': False} + # very low collection interval for test purposes + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} + connections = {} + + def normalize_query(q): + # Remove the quotes from below: + normalized = "" + for s in ["'one'", "'two'"]: + if s in q: + normalized = q.replace(s, "?") + break + + return normalized + + def obfuscate_sql(query, options=None): + if query.startswith('SET application_name'): + return json.dumps({'query': normalize_query(query), 'metadata': {}}) + return json.dumps({'query': query, 'metadata': {}}) + + queries = [ + "SET application_name = %s", + "SET application_name = %s" + ] + # These queries will have the same query signature but different queryids in pg_stat_statements + def _run_query(idx): + query = queries[idx] + user = "bob" + password = "bob" + dbname = "datadog_test" + if dbname not in connections: + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) + + args = ('two',) + if idx == 1: + args = ('one',) + + print("[AMW] Executing query " + str(query) + " with args " + str(args)) + connections[dbname].cursor().execute(query, args) + + check = integration_check(dbm_instance) + check._connect() + # Execute the query with the mocked obfuscate_sql. The result should produce an event payload with the metadata. + with mock.patch.object(datadog_agent, 'obfuscate_sql', passthrough=True) as mock_agent: + mock_agent.side_effect = obfuscate_sql + + # Reset pgss + postgres_conn = _get_superconn(dbm_instance) + with postgres_conn.cursor() as cur: + cur.execute("SELECT pg_stat_statements_reset();") + + check = integration_check(dbm_instance) + check._connect() + + # Seed a bunch of calls into pg_stat_statements + for _ in range(10): + _run_query(1) + + _run_query(0) + run_one_check(check, dbm_instance, cancel=False) + + print("[AMW] clear previous statements") + run_one_check(check, dbm_instance, cancel=False) + + # Call one query + _run_query(0) + run_one_check(check, dbm_instance, cancel=False) + aggregator.reset() + + # Call other query + print('[AMW] Call other query') + _run_query(1) + run_one_check(check, dbm_instance, cancel=False) + + with postgres_conn.cursor() as cur: + cur.execute("SELECT * from pg_stat_statements where query not ilike '%pg_%'") + for row in cur.fetchall(): + print("[AMW] PGSS: " + str(row)) + +# obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' + query0 = queries[0] % ('?',) + query_signature = compute_sql_signature(query0) + events = aggregator.get_event_platform_events("dbm-metrics") + + query1 = queries[1] % ('?',) + query1_sig = compute_sql_signature(query1) + print("[AMW] Looking for query signature for " + query0 + ": " + query_signature) + print("[AMW] other query sig for " + query1 + ": " + query1_sig) + + assert len(events) > 0 + + print("[AMW] Event:") + print(events[0]) + + matching_rows = [r for r in events[0]['postgres_rows'] if r['query_signature'] == query_signature] + + assert len(matching_rows) == 1 + + assert matching_rows[0]['calls'] == 1 + + for conn in connections.values(): + conn.close() + + fail("Fail") + +# Simulate an agent starting up into a long-running DB instance +# with pg_stat_statements already populated +def test_statement_metrics_existing_calls( + aggregator, + integration_check, + dbm_instance, +): + # don't need samples for this test + dbm_instance['query_samples'] = {'enabled': False} + dbm_instance['query_activity'] = {'enabled': False} + # very low collection interval for test purposes + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} + connections = {} + + def _run_queries(): + user, password, dbname, query, arg = SAMPLE_QUERIES[0] + if dbname not in connections: + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname].cursor().execute(query, (arg,)) + + # Reset pgss + postgres_conn = _get_superconn(dbm_instance) + with postgres_conn.cursor() as cur: + cur.execute("SELECT pg_stat_statements_reset();") + + check = integration_check(dbm_instance) + check._connect() + + # Populate pg_stat_statements with a bunch of executions of the same query + print("Running queries to populate pg_stat_statements") + for _ in range(500): + _run_queries() + + # Run multiple times to fill the initial previous_statements with queries from the agent + run_one_check(check, dbm_instance, cancel=False) + expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) + expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') + + print("Running single query") + + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + + aggregator.reset() + + print("Running single query") + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' + expected_query = SAMPLE_QUERIES[0][3] % obfuscated_param + + query_signature = compute_sql_signature(expected_query) + count_metrics = aggregator.metrics('postgresql.queries.count') + expected_metric = None + print("[AMW] count metrics: " + str(count_metrics)) + for metric in count_metrics: + print("[AMW] metric: " + str(metric)) + if metric.tags['query_signature'] == query_signature: + expected_metric = metric + break + + assert expected_metric is not None + assert expected_metric.value == 1 + + for conn in connections.values(): + conn.close() + +# This results in metrics being omitted +def test_statement_metrics_unchanged( + aggregator, + integration_check, + dbm_instance, +): + # don't need samples for this test + dbm_instance['query_samples'] = {'enabled': False} + dbm_instance['query_activity'] = {'enabled': False} + # very low collection interval for test purposes + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} + connections = {} + + def _run_queries(): + for user, password, dbname, query, arg in SAMPLE_QUERIES: + if dbname not in connections: + connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) + connections[dbname].cursor().execute(query, (arg,)) + + # Reset pgss + postgres_conn = _get_superconn(dbm_instance) + with postgres_conn.cursor() as cur: + cur.execute("SELECT pg_stat_statements_reset();") + + check = integration_check(dbm_instance) + check._connect() + + print("[AMW] run agent queries") + + # Run multiple times to fill the initial previous_statements with queries from the agent + run_one_check(check, dbm_instance, cancel=False) + run_one_check(check, dbm_instance, cancel=False) + expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) + expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') + aggregator.reset() + + # Get the base number of calls generated by the agent + run_one_check(check, dbm_instance, cancel=False) + base_value = aggregator.metrics('dd.postgres.queries.query_rows_raw')[0].value + assert base_value > 0 + aggregator.reset() + + print("[AMW] base value: " + str(base_value)) + + print("[AMW] run queries 1") + + + # First check run, no rows generated yet + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, + tags=expected_dbm_metrics_tags) + aggregator.reset() + + print("[AMW] run queries 2") + + + # Second check run, we should have one row per sample + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, + tags=expected_dbm_metrics_tags) + aggregator.reset() + + print("[AMW] run queries 3") + + + # Third check run, no metrics generated since no queries were called + run_one_check(check, dbm_instance, cancel=False) + aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, + tags=expected_dbm_metrics_tags) + aggregator.reset() + + print("[AMW] run queries 4") + + # Fourth check run, we should have queries being reported + _run_queries() +# _run_queries() + run_one_check(check, dbm_instance, cancel=False) + + # Now, a metric won't be sent because previous_statements is empty +# aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, +# tags=expected_dbm_metrics_tags) + _run_queries() + run_one_check(check, dbm_instance, cancel=False) + + # This probably results in an inflated metric being sent + + + for conn in connections.values(): + conn.close() + + fail("Fail") + + +statement_samples_keys = ["query_samples", "statement_samples"] @pytest.mark.parametrize("statement_samples_key", statement_samples_keys) @pytest.mark.parametrize("statement_samples_enabled", [True, False]) From 48d4bd9d72c8abb2702681ed1e2103186917954a Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 09:26:31 -0400 Subject: [PATCH 07/35] Keep baseline metric cache. --- .../datadog_checks/postgres/statements.py | 58 ++++++++++++++----- postgres/tests/test_apply_deltas.py | 22 +++++++ 2 files changed, 65 insertions(+), 15 deletions(-) create mode 100644 postgres/tests/test_apply_deltas.py diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index d383f5499731d..a55c3ef4beecc 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -154,6 +154,7 @@ def __init__(self, check, config, shutdown_callback): self._state = StatementMetrics() self._stat_column_cache = [] self._query_calls_cache = QueryCallsCache() + self._baseline_metrics = {} self._track_io_timing_cache = None self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options)) # full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature @@ -219,7 +220,6 @@ def _check_called_queries(self): continue calls = row[1] - # print("[AMW] queryid: " + str(queryid) + " | calls: " + str(calls)) calls_changed = self._query_calls_cache.set_calls(queryid, calls) if calls_changed: called_queryids.append(queryid) @@ -457,28 +457,60 @@ def _emit_pg_stat_statements_metrics(self): except psycopg2.Error as e: self._log.warning("Failed to query for pg_stat_statements count: %s", e) + # _apply_deltas expects normalized rows before any merging of duplicates. + # It takes the partial rows from pg_stat_statements and aggregates metric values + # for queryids that map to the same query_signature + def _apply_deltas(self, rows, metrics): + # Apply called queries to baseline_metrics + for row in rows: + query_signature = row['query_signature'] + queryid = row['queryid'] + if query_signature not in self._baseline_metrics: + self._baseline_metrics[query_signature] = { + queryid: row + } + else: + query_sig_metrics = self._baseline_metrics[query_signature] + if queryid not in query_sig_metrics: + query_sig_metrics[queryid] = row + else: + # Test this case + baseline_row = query_sig_metrics[queryid] + for metric in metrics: + if metric in row: + baseline_row[metric] += row[metric] + + print("Built baseline: " + str(self._baseline_metrics)) + # Aggregate multiple queryids into one row per query_signature + aggregated_rows = [] + for query_signature, query_sig_metrics in self._baseline_metrics.items(): + aggregated_row = {} + for queryid, row in query_sig_metrics.items(): + if 'query_signature' not in aggregated_row: + aggregated_row = row + else: + for metric in metrics: + if metric in row: + aggregated_row[metric] += row[metric] + + aggregated_rows.append(aggregated_row) + + return aggregated_rows + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() rows = self._load_pg_stat_statements() - - for row in rows: - if 'pg_' not in row['query']: - print('[AMW] query from load_pg_stat_statments | ' + str(row['queryid'])) rows = self._normalize_queries(rows) if not rows: - print("[AMW] no normalized rows, returning") return [] - for row in rows: - if 'pg_' not in row['query']: - print('[AMW] rows after normalize_queries | ' + str(row['queryid'])) - available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS + rows = self._apply_deltas(rows, metric_columns) + rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) -# print("[AMW] sending dd.postgres.queries.query_rows_raw - " + str(len(rows))) self._check.gauge( 'dd.postgres.queries.query_rows_raw', len(rows), @@ -494,7 +526,6 @@ def _collect_metrics_rows(self): return rows def _normalize_queries(self, rows): - print("Normalizing queries") normalized_rows = [] for row in rows: normalized_row = dict(copy.copy(row)) @@ -508,8 +539,6 @@ def _normalize_queries(self, rows): continue obfuscated_query = statement['query'] - # print("Obfuscated query: " + statement['query'] + " | query_signature: " + compute_sql_signature(obfuscated_query)) - normalized_row['query'] = obfuscated_query normalized_row['query_signature'] = compute_sql_signature(obfuscated_query) metadata = statement['metadata'] @@ -518,7 +547,6 @@ def _normalize_queries(self, rows): normalized_row['dd_comments'] = metadata.get('comments', None) normalized_rows.append(normalized_row) - print("\n\n") return normalized_rows def _rows_to_fqt_events(self, rows): diff --git a/postgres/tests/test_apply_deltas.py b/postgres/tests/test_apply_deltas.py new file mode 100644 index 0000000000000..54e335f463e6d --- /dev/null +++ b/postgres/tests/test_apply_deltas.py @@ -0,0 +1,22 @@ +# (C) Datadog, Inc. 2024-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +pytestmark = [pytest.mark.unit] + + +def test_apply_deltas(pg_instance, integration_check): + pg_instance['collect_database_size_metrics'] = False + check = integration_check(pg_instance) + + rows = [ + {'queryid': 1, 'query_signature': 'abc', 'calls': 1}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2}, + ] + metrics = ['calls'] + + rows = check.statement_metrics._apply_deltas(rows, metrics) + + assert len(rows) == 1 + assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 3} From d89db22124826c2a041097a61bc6e102c968e16f Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 09:44:57 -0400 Subject: [PATCH 08/35] Replace baseline outright --- .../datadog_checks/postgres/statements.py | 15 +--- postgres/tests/test_apply_deltas.py | 23 ++++- postgres/tests/test_statements.py | 86 +------------------ 3 files changed, 27 insertions(+), 97 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index a55c3ef4beecc..8b44853f8b773 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -467,27 +467,18 @@ def _apply_deltas(self, rows, metrics): queryid = row['queryid'] if query_signature not in self._baseline_metrics: self._baseline_metrics[query_signature] = { - queryid: row + queryid: copy.copy(row) } else: - query_sig_metrics = self._baseline_metrics[query_signature] - if queryid not in query_sig_metrics: - query_sig_metrics[queryid] = row - else: - # Test this case - baseline_row = query_sig_metrics[queryid] - for metric in metrics: - if metric in row: - baseline_row[metric] += row[metric] + self._baseline_metrics[query_signature][queryid] = copy.copy(row) - print("Built baseline: " + str(self._baseline_metrics)) # Aggregate multiple queryids into one row per query_signature aggregated_rows = [] for query_signature, query_sig_metrics in self._baseline_metrics.items(): aggregated_row = {} for queryid, row in query_sig_metrics.items(): if 'query_signature' not in aggregated_row: - aggregated_row = row + aggregated_row = copy.copy(row) else: for metric in metrics: if metric in row: diff --git a/postgres/tests/test_apply_deltas.py b/postgres/tests/test_apply_deltas.py index 54e335f463e6d..9f002533d3d68 100644 --- a/postgres/tests/test_apply_deltas.py +++ b/postgres/tests/test_apply_deltas.py @@ -6,8 +6,7 @@ pytestmark = [pytest.mark.unit] -def test_apply_deltas(pg_instance, integration_check): - pg_instance['collect_database_size_metrics'] = False +def test_apply_deltas_base_case(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ @@ -20,3 +19,23 @@ def test_apply_deltas(pg_instance, integration_check): assert len(rows) == 1 assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 3} + + +def test_apply_deltas_multiple_runs(pg_instance, integration_check): + check = integration_check(pg_instance) + + rows = [ + {'queryid': 1, 'query_signature': 'abc', 'calls': 1}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2}, + ] + metrics = ['calls'] + + rows = check.statement_metrics._apply_deltas(rows, metrics) + + second_rows = [ + {'queryid': 2, 'query_signature': 'abc', 'calls': 3}, + ] + rows = check.statement_metrics._apply_deltas(second_rows, metrics) + + assert len(rows) == 1 + assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 4} diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 531bcb469ec09..6b97b03461935 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -238,26 +238,14 @@ def _run_query(idx): _run_query(1) run_one_check(check, dbm_instance, cancel=False) - with postgres_conn.cursor() as cur: - cur.execute("SELECT * from pg_stat_statements where query not ilike '%pg_%'") - for row in cur.fetchall(): - print("[AMW] PGSS: " + str(row)) - -# obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' - query0 = queries[0] % ('?',) + # obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' + obfuscated_param = '?' + query0 = queries[0] % (obfuscated_param,) query_signature = compute_sql_signature(query0) events = aggregator.get_event_platform_events("dbm-metrics") - query1 = queries[1] % ('?',) - query1_sig = compute_sql_signature(query1) - print("[AMW] Looking for query signature for " + query0 + ": " + query_signature) - print("[AMW] other query sig for " + query1 + ": " + query1_sig) - assert len(events) > 0 - print("[AMW] Event:") - print(events[0]) - matching_rows = [r for r in events[0]['postgres_rows'] if r['query_signature'] == query_signature] assert len(matching_rows) == 1 @@ -266,75 +254,7 @@ def _run_query(idx): for conn in connections.values(): conn.close() - - fail("Fail") - -# Simulate an agent starting up into a long-running DB instance -# with pg_stat_statements already populated -def test_statement_metrics_existing_calls( - aggregator, - integration_check, - dbm_instance, -): - # don't need samples for this test - dbm_instance['query_samples'] = {'enabled': False} - dbm_instance['query_activity'] = {'enabled': False} - # very low collection interval for test purposes - dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} - connections = {} - - def _run_queries(): - user, password, dbname, query, arg = SAMPLE_QUERIES[0] - if dbname not in connections: - connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) - connections[dbname].cursor().execute(query, (arg,)) - - # Reset pgss - postgres_conn = _get_superconn(dbm_instance) - with postgres_conn.cursor() as cur: - cur.execute("SELECT pg_stat_statements_reset();") - - check = integration_check(dbm_instance) - check._connect() - - # Populate pg_stat_statements with a bunch of executions of the same query - print("Running queries to populate pg_stat_statements") - for _ in range(500): - _run_queries() - - # Run multiple times to fill the initial previous_statements with queries from the agent - run_one_check(check, dbm_instance, cancel=False) - expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) - expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') - - print("Running single query") - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - - aggregator.reset() - - print("Running single query") - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' - expected_query = SAMPLE_QUERIES[0][3] % obfuscated_param - - query_signature = compute_sql_signature(expected_query) - count_metrics = aggregator.metrics('postgresql.queries.count') - expected_metric = None - print("[AMW] count metrics: " + str(count_metrics)) - for metric in count_metrics: - print("[AMW] metric: " + str(metric)) - if metric.tags['query_signature'] == query_signature: - expected_metric = metric - break - - assert expected_metric is not None - assert expected_metric.value == 1 - - for conn in connections.values(): - conn.close() # This results in metrics being omitted def test_statement_metrics_unchanged( From e957ee036e2192bfe7cc1adf373677e063239e0d Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 10:23:16 -0400 Subject: [PATCH 09/35] Remove unneeded setup and logs --- postgres/tests/test_statements.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 6b97b03461935..0a6552a242f1c 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -210,11 +210,6 @@ def _run_query(idx): with mock.patch.object(datadog_agent, 'obfuscate_sql', passthrough=True) as mock_agent: mock_agent.side_effect = obfuscate_sql - # Reset pgss - postgres_conn = _get_superconn(dbm_instance) - with postgres_conn.cursor() as cur: - cur.execute("SELECT pg_stat_statements_reset();") - check = integration_check(dbm_instance) check._connect() @@ -225,7 +220,6 @@ def _run_query(idx): _run_query(0) run_one_check(check, dbm_instance, cancel=False) - print("[AMW] clear previous statements") run_one_check(check, dbm_instance, cancel=False) # Call one query @@ -234,11 +228,9 @@ def _run_query(idx): aggregator.reset() # Call other query - print('[AMW] Call other query') _run_query(1) run_one_check(check, dbm_instance, cancel=False) - # obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' obfuscated_param = '?' query0 = queries[0] % (obfuscated_param,) query_signature = compute_sql_signature(query0) From d364a1d75e9e27866ea6aa576b5dc23b0d278e85 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 10:50:21 -0400 Subject: [PATCH 10/35] Remove logs --- .../base/utils/db/statement_metrics.py | 3 - .../datadog_checks/postgres/statements.py | 7 - postgres/tests/test_statements.py | 175 ------------------ 3 files changed, 185 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index c0d5460a89dcf..be0a238bcd80d 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -57,9 +57,6 @@ def compute_derivative_rows(self, rows, metrics, key): if prev is None: continue - if 'pg_' not in row['query']: - print("[AMW] computing derivative values" + " | prev query " + prev['query'] + " | curr query " + row['query'] + " | prev queryid " + str(prev['queryid']) + " | curr queryid " + str(row['queryid']) + " | prev query_sig: " + prev['query_signature'] + " | cur query sig: " + row['query_signature'] + " | prev calls is: " + str(prev['calls']) + " | current calls is: " + str(row['calls']) + " | diff is: " + str(row['calls'] - prev['calls'])) - metric_columns = metrics & set(row.keys()) # Take the diff of all metric values between the current row and the previous run's row. diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 8b44853f8b773..1acd609ca6688 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -270,7 +270,6 @@ def collect_per_statement_metrics(self): def _load_pg_stat_statements(self): try: called_queryids = self._check_called_queries() - print("[AMW] called queries: " + str(called_queryids)) available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -350,7 +349,6 @@ def _load_pg_stat_statements(self): params=params, ) except psycopg2.Error as e: - print("_load_pg_stat_statements error") error_tag = "error:database-{}".format(type(e).__name__) if ( @@ -508,11 +506,6 @@ def _collect_metrics_rows(self): tags=self.tags + self._check._get_debug_tags(), hostname=self._check.resolved_hostname, ) - print("[AMW] called queries") - for row in rows: - if 'pg_' not in row['query']: - print("QueryId: " + str(row['queryid']) + " | Query: " + str(row['query']) + " | Calls: " + str(row['calls'])) - print("-------------------\n") return rows diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 0a6552a242f1c..9c51a505ee591 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -77,85 +77,6 @@ def test_dbm_enabled_config(integration_check, dbm_instance, dbm_enabled_key, db assert check._config.dbm_enabled == dbm_enabled - -def test_statement_metrics_optimization( - aggregator, - integration_check, - dbm_instance, -): - print("[AMW] statement metrics optimization test") - # don't need samples for this test - dbm_instance['query_samples'] = {'enabled': False} - dbm_instance['query_activity'] = {'enabled': False} - # very low collection interval for test purposes - dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} - connections = {} - - def _run_queries(): - for user, password, dbname, query, arg in SAMPLE_QUERIES: - if dbname not in connections: - connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) - connections[dbname].cursor().execute(query, (arg,)) - - # Reset pgss - postgres_conn = _get_superconn(dbm_instance) - with postgres_conn.cursor() as cur: - cur.execute("SELECT pg_stat_statements_reset();") - - check = integration_check(dbm_instance) - check._connect() - - # Run multiple times to fill the initial previous_statements with queries from the agent - run_one_check(check, dbm_instance, cancel=False) - run_one_check(check, dbm_instance, cancel=False) - expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) - expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') - aggregator.reset() - - # Get the base number of calls generated by the agent - run_one_check(check, dbm_instance, cancel=False) - base_value = aggregator.metrics('dd.postgres.queries.query_rows_raw')[0].value - print("[AMW] base value is " + str(base_value)) - assert base_value > 0 - aggregator.reset() - - # First check run, no rows generated yet - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, - tags=expected_dbm_metrics_tags) - aggregator.reset() - - # Second check run, we should have one row per sample - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, - tags=expected_dbm_metrics_tags) - - events = aggregator.get_event_platform_events("dbm-metrics") - print("[AMW] events: " + str(events)) -# assert len(events) == 2 - event = events[0] # first item is from the initial dummy check to load pg_settings - - obfuscated_param = '?' if POSTGRES_VERSION.split('.')[0] == "9" else '$1' - for username, _, dbname, query, _ in SAMPLE_QUERIES: - expected_query = query % obfuscated_param - query_signature = compute_sql_signature(expected_query) - matching_rows = [r for r in event['postgres_rows'] if r['query_signature'] == query_signature] - - # metrics - assert len(matching_rows) == 1 - row = matching_rows[0] - assert row['calls'] == 1 - assert row['datname'] == dbname - assert row['rolname'] == username - assert row['query'] == expected_query - - aggregator.reset() - - for conn in connections.values(): - conn.close() - def test_statement_metrics_multiple_pgss_rows_single_query_signature( aggregator, integration_check, @@ -201,7 +122,6 @@ def _run_query(idx): if idx == 1: args = ('one',) - print("[AMW] Executing query " + str(query) + " with args " + str(args)) connections[dbname].cursor().execute(query, args) check = integration_check(dbm_instance) @@ -248,101 +168,6 @@ def _run_query(idx): conn.close() -# This results in metrics being omitted -def test_statement_metrics_unchanged( - aggregator, - integration_check, - dbm_instance, -): - # don't need samples for this test - dbm_instance['query_samples'] = {'enabled': False} - dbm_instance['query_activity'] = {'enabled': False} - # very low collection interval for test purposes - dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} - connections = {} - - def _run_queries(): - for user, password, dbname, query, arg in SAMPLE_QUERIES: - if dbname not in connections: - connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) - connections[dbname].cursor().execute(query, (arg,)) - - # Reset pgss - postgres_conn = _get_superconn(dbm_instance) - with postgres_conn.cursor() as cur: - cur.execute("SELECT pg_stat_statements_reset();") - - check = integration_check(dbm_instance) - check._connect() - - print("[AMW] run agent queries") - - # Run multiple times to fill the initial previous_statements with queries from the agent - run_one_check(check, dbm_instance, cancel=False) - run_one_check(check, dbm_instance, cancel=False) - expected_dbm_metrics_tags = _get_expected_tags(check, dbm_instance, with_db=True, with_host=False) - expected_dbm_metrics_tags.append('agent_hostname:stubbed.hostname') - aggregator.reset() - - # Get the base number of calls generated by the agent - run_one_check(check, dbm_instance, cancel=False) - base_value = aggregator.metrics('dd.postgres.queries.query_rows_raw')[0].value - assert base_value > 0 - aggregator.reset() - - print("[AMW] base value: " + str(base_value)) - - print("[AMW] run queries 1") - - - # First check run, no rows generated yet - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, - tags=expected_dbm_metrics_tags) - aggregator.reset() - - print("[AMW] run queries 2") - - - # Second check run, we should have one row per sample - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, - tags=expected_dbm_metrics_tags) - aggregator.reset() - - print("[AMW] run queries 3") - - - # Third check run, no metrics generated since no queries were called - run_one_check(check, dbm_instance, cancel=False) - aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value, count=1, - tags=expected_dbm_metrics_tags) - aggregator.reset() - - print("[AMW] run queries 4") - - # Fourth check run, we should have queries being reported - _run_queries() -# _run_queries() - run_one_check(check, dbm_instance, cancel=False) - - # Now, a metric won't be sent because previous_statements is empty -# aggregator.assert_metric('dd.postgres.queries.query_rows_raw', value=base_value + len(SAMPLE_QUERIES), count=1, -# tags=expected_dbm_metrics_tags) - _run_queries() - run_one_check(check, dbm_instance, cancel=False) - - # This probably results in an inflated metric being sent - - - for conn in connections.values(): - conn.close() - - fail("Fail") - - statement_samples_keys = ["query_samples", "statement_samples"] @pytest.mark.parametrize("statement_samples_key", statement_samples_keys) From 4744a9265ab9f57af2f45eba5812216b1ee440a3 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 18:57:41 +0000 Subject: [PATCH 11/35] Changelog --- postgres/changelog.d/17554.fixed | 1 + 1 file changed, 1 insertion(+) create mode 100644 postgres/changelog.d/17554.fixed diff --git a/postgres/changelog.d/17554.fixed b/postgres/changelog.d/17554.fixed new file mode 100644 index 0000000000000..a1691bacc5883 --- /dev/null +++ b/postgres/changelog.d/17554.fixed @@ -0,0 +1 @@ +This un-reverts https://github.com/DataDog/integrations-core/pull/17187, which was reverted due to a flaw in the logic that could result in improper metric counts. The fixed version caches an initial state of pg_stat_statements and updates it with incremental partial query data. This allows the agent to only fetch full rows for queries that were called between check runs, while also accounting for queryids that can map to the same query_signature. \ No newline at end of file From 22f1803f729dda648e505c1bfd1952f9656a12da Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 10 May 2024 21:28:13 +0000 Subject: [PATCH 12/35] Expire metrics caches to prevent unbounded growth. --- .../postgres/query_calls_cache.py | 3 --- .../datadog_checks/postgres/statements.py | 23 +++++++++++++----- postgres/tests/test_statements.py | 24 +++++++++---------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index 08303f2737fd1..ce8b1a031de92 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -24,8 +24,6 @@ def set_calls(self, queryid, calls): calls_changed = False if queryid in self.cache: -# print("[AMW] Query in cache " + str(queryid) + " | cached calls: " + str(self.cache[queryid]) + " | new calls: " + str(calls)) - diff = calls - self.cache[queryid] # Positive deltas mean the statement remained in pg_stat_statements # between check calls. Negative deltas mean the statement was evicted @@ -33,7 +31,6 @@ def set_calls(self, queryid, calls): # change. calls_changed = diff != 0 else: - # print("[AMW] Query not in cache " + str(queryid) + " | new calls: " + str(calls)) calls_changed = True self.next_cache[queryid] = calls diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 1acd609ca6688..c972cecab3f4f 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -47,6 +47,8 @@ {extra_clauses} """ +BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes + # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 = "SELECT COUNT(*) FROM pg_stat_statements" @@ -155,6 +157,7 @@ def __init__(self, check, config, shutdown_callback): self._stat_column_cache = [] self._query_calls_cache = QueryCallsCache() self._baseline_metrics = {} + self._last_baseline_metrics_expiry = None self._track_io_timing_cache = None self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options)) # full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature @@ -206,14 +209,14 @@ def _check_called_queries(self): # allows the engine to avoid retrieving the potentially large amount of text data. # The query count query does not depend on the statement text, so it's safe for this use case. # For more info: https://www.postgresql.org/docs/current/pgstatstatements.html#PGSTATSTATEMENTS-FUNCS - pgss_view_without_query_text = "pg_stat_statements(false)" + pgss_view_without_query_text = "pg_stat_statements(false)" with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: called_queryids = [] query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) rows = self._execute_query(cursor, query, params=(self._config.dbname,)) - + for row in rows: queryid = row[0] if queryid is None: @@ -347,7 +350,7 @@ def _load_pg_stat_statements(self): called_queryids=', '.join([str(i) for i in called_queryids]), ), params=params, - ) + ) except psycopg2.Error as e: error_tag = "error:database-{}".format(type(e).__name__) @@ -464,9 +467,7 @@ def _apply_deltas(self, rows, metrics): query_signature = row['query_signature'] queryid = row['queryid'] if query_signature not in self._baseline_metrics: - self._baseline_metrics[query_signature] = { - queryid: copy.copy(row) - } + self._baseline_metrics[query_signature] = {queryid: copy.copy(row)} else: self._baseline_metrics[query_signature][queryid] = copy.copy(row) @@ -486,10 +487,20 @@ def _apply_deltas(self, rows, metrics): return aggregated_rows + # To prevent the baseline metrics cache from growing indefinitely (as can happen) because of + # pg_stat_statements eviction), we clear it out periodically to force a full refetch. + def _check_baseline_metrics_expiry(self): + if self._last_baseline_metrics_expiry == None or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time(): + self._baseline_metrics = {} + self._query_calls_cache = QueryCallsCache() + self._last_baseline_metrics_expiry = time.time() + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() + self._check_baseline_metrics_expiry() rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) if not rows: diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 9c51a505ee591..5d393c8ba5d88 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -89,7 +89,7 @@ def test_statement_metrics_multiple_pgss_rows_single_query_signature( # very low collection interval for test purposes dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} connections = {} - + def normalize_query(q): # Remove the quotes from below: normalized = "" @@ -97,19 +97,16 @@ def normalize_query(q): if s in q: normalized = q.replace(s, "?") break - + return normalized def obfuscate_sql(query, options=None): if query.startswith('SET application_name'): return json.dumps({'query': normalize_query(query), 'metadata': {}}) return json.dumps({'query': query, 'metadata': {}}) - - queries = [ - "SET application_name = %s", - "SET application_name = %s" - ] - # These queries will have the same query signature but different queryids in pg_stat_statements + + queries = ["SET application_name = %s", "SET application_name = %s"] + # These queries will have the same query signature but different queryids in pg_stat_statements def _run_query(idx): query = queries[idx] user = "bob" @@ -117,7 +114,7 @@ def _run_query(idx): dbname = "datadog_test" if dbname not in connections: connections[dbname] = psycopg2.connect(host=HOST, dbname=dbname, user=user, password=password) - + args = ('two',) if idx == 1: args = ('one',) @@ -129,7 +126,7 @@ def _run_query(idx): # Execute the query with the mocked obfuscate_sql. The result should produce an event payload with the metadata. with mock.patch.object(datadog_agent, 'obfuscate_sql', passthrough=True) as mock_agent: mock_agent.side_effect = obfuscate_sql - + check = integration_check(dbm_instance) check._connect() @@ -139,7 +136,7 @@ def _run_query(idx): _run_query(0) run_one_check(check, dbm_instance, cancel=False) - + run_one_check(check, dbm_instance, cancel=False) # Call one query @@ -157,7 +154,7 @@ def _run_query(idx): events = aggregator.get_event_platform_events("dbm-metrics") assert len(events) > 0 - + matching_rows = [r for r in events[0]['postgres_rows'] if r['query_signature'] == query_signature] assert len(matching_rows) == 1 @@ -165,11 +162,12 @@ def _run_query(idx): assert matching_rows[0]['calls'] == 1 for conn in connections.values(): - conn.close() + conn.close() statement_samples_keys = ["query_samples", "statement_samples"] + @pytest.mark.parametrize("statement_samples_key", statement_samples_keys) @pytest.mark.parametrize("statement_samples_enabled", [True, False]) @pytest.mark.parametrize("query_activity_enabled", [True, False]) From 1eb7dc986a2fdd2a72e495a88ff9163f4629308f Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Sun, 12 May 2024 22:55:21 +0000 Subject: [PATCH 13/35] Create one row per queryid in _apply_deltas. --- .../datadog_checks/postgres/statements.py | 35 ++++++++++--------- postgres/tests/test_apply_deltas.py | 30 ++++++++-------- postgres/tests/test_statements.py | 4 +-- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index c972cecab3f4f..85d2af4dadaff 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -459,33 +459,33 @@ def _emit_pg_stat_statements_metrics(self): self._log.warning("Failed to query for pg_stat_statements count: %s", e) # _apply_deltas expects normalized rows before any merging of duplicates. - # It takes the partial rows from pg_stat_statements and aggregates metric values - # for queryids that map to the same query_signature - def _apply_deltas(self, rows, metrics): + # It takes the incremental pg_stat_statements rows and constructs the full set of rows + # by adding the existing values in the baseline_metrics cache. This is equivalent to + # fetching the full set of rows from pg_stat_statements, but we avoid paying the price of + # actually querying the rows. + def _apply_deltas(self, rows): + # Apply called queries to baseline_metrics for row in rows: query_signature = row['query_signature'] queryid = row['queryid'] + baseline_row = copy.copy(row) if query_signature not in self._baseline_metrics: - self._baseline_metrics[query_signature] = {queryid: copy.copy(row)} + self._baseline_metrics[query_signature] = {queryid: baseline_row} else: - self._baseline_metrics[query_signature][queryid] = copy.copy(row) + self._baseline_metrics[query_signature][queryid] = baseline_row - # Aggregate multiple queryids into one row per query_signature - aggregated_rows = [] + # Apply query text, so it doesn't have to be cached. + query_text = {row['query_signature']: row['query'] for row in rows} + applied_rows = [] for query_signature, query_sig_metrics in self._baseline_metrics.items(): - aggregated_row = {} for queryid, row in query_sig_metrics.items(): - if 'query_signature' not in aggregated_row: - aggregated_row = copy.copy(row) + if query_signature in query_text: + applied_rows.append({**row, 'query': query_text[query_signature]}) else: - for metric in metrics: - if metric in row: - aggregated_row[metric] += row[metric] - - aggregated_rows.append(aggregated_row) + applied_rows.append(row) - return aggregated_rows + return applied_rows # To prevent the baseline metrics cache from growing indefinitely (as can happen) because of # pg_stat_statements eviction), we clear it out periodically to force a full refetch. @@ -503,12 +503,13 @@ def _collect_metrics_rows(self): self._check_baseline_metrics_expiry() rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) + rows = self._apply_deltas(rows) + if not rows: return [] available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS - rows = self._apply_deltas(rows, metric_columns) rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) self._check.gauge( diff --git a/postgres/tests/test_apply_deltas.py b/postgres/tests/test_apply_deltas.py index 9f002533d3d68..f52c0ea7e436e 100644 --- a/postgres/tests/test_apply_deltas.py +++ b/postgres/tests/test_apply_deltas.py @@ -10,32 +10,34 @@ def test_apply_deltas_base_case(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, ] - metrics = ['calls'] - rows = check.statement_metrics._apply_deltas(rows, metrics) + rows = check.statement_metrics._apply_deltas(rows) - assert len(rows) == 1 - assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 3} + assert rows == [ + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'} + ] def test_apply_deltas_multiple_runs(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, ] - metrics = ['calls'] - rows = check.statement_metrics._apply_deltas(rows, metrics) + rows = check.statement_metrics._apply_deltas(rows) second_rows = [ - {'queryid': 2, 'query_signature': 'abc', 'calls': 3}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}, ] - rows = check.statement_metrics._apply_deltas(second_rows, metrics) + rows = check.statement_metrics._apply_deltas(second_rows) - assert len(rows) == 1 - assert rows[0] == {'queryid': 1, 'query_signature': 'abc', 'calls': 4} + assert rows == [ + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'} + ] diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 5d393c8ba5d88..52e0a1e9d2c4e 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -86,8 +86,6 @@ def test_statement_metrics_multiple_pgss_rows_single_query_signature( # don't need samples for this test dbm_instance['query_samples'] = {'enabled': False} dbm_instance['query_activity'] = {'enabled': False} - # very low collection interval for test purposes - dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} connections = {} def normalize_query(q): @@ -144,7 +142,7 @@ def _run_query(idx): run_one_check(check, dbm_instance, cancel=False) aggregator.reset() - # Call other query + # Call other query that maps to same query signature _run_query(1) run_one_check(check, dbm_instance, cancel=False) From 390ea869251fe626b1c33d5c0f879c83ea79d4ca Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Sun, 12 May 2024 22:52:33 -0400 Subject: [PATCH 14/35] Logging --- .../base/utils/db/statement_metrics.py | 15 +++++++++++++++ postgres/datadog_checks/postgres/statements.py | 11 ++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index be0a238bcd80d..de443c7458005 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -83,6 +83,15 @@ def compute_derivative_rows(self, rows, metrics, key): if all(diffed_row[k] == 0 for k in metric_columns): continue + if diffed_row['calls'] > 10000: + print("[AMW] cur queryid: " + str(row['queryid'])) + print("[AMW] prev queryid: " + str(prev['queryid'])) + print("[AMW] cur calls: " + str(row['calls'])) + print("[AMW] prev calls: " + str(prev['calls'])) + print("[AMW] query: " + row['query']) + print("[AMW] query_signature: " + row['query_signature']) + print("") + result.append(diffed_row) self._previous_statements.clear() @@ -115,4 +124,10 @@ def _merge_duplicate_rows(rows, metrics, key): else: queries_by_key[query_key] = row + if row['query_signature'] in ['7bf8f124e953d206', '524374cff025d947', 'f944c89168db9394']: + print("[AMW] merging duplicates - to merge: " + str(dict((k, row[k]) for k in ['queryid', 'query_signature', 'calls', 'query']))) + print("[AMW] merging duplicates - dest: " + str(dict((k, queries_by_key[query_key][k]) for k in ['queryid', 'query_signature', 'calls', 'query']))) + print("[AMW] merged calls: " + str(queries_by_key[query_key]['calls']) + " query: " + queries_by_key[query_key]['query']) + print("") + return queries_by_key, dropped_metrics diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 85d2af4dadaff..58eba9af6b81d 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -464,7 +464,15 @@ def _emit_pg_stat_statements_metrics(self): # fetching the full set of rows from pg_stat_statements, but we avoid paying the price of # actually querying the rows. def _apply_deltas(self, rows): - + print("[AMW] Baseline cache") + for query_sig, query_sig_metrics in self._baseline_metrics.items(): + for queryid, baseline_row in query_sig_metrics.items(): + if query_sig in ['7bf8f124e953d206', '524374cff025d947', 'f944c89168db9394']: + print("[AMW] baseline - queryid: " + str(baseline_row['queryid'])) + print("[AMW] baseline - query: " + baseline_row['query']) + print("[AMW] baseline - calls: " + str(baseline_row['calls'])) + + print("") # Apply called queries to baseline_metrics for row in rows: query_signature = row['query_signature'] @@ -500,6 +508,7 @@ def _check_baseline_metrics_expiry(self): def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() + self._check_baseline_metrics_expiry() rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) From 7c2e4626f3c902bfed0f9ed6e46d1178f91271c8 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Mon, 13 May 2024 08:51:38 -0400 Subject: [PATCH 15/35] Copy row to prevent mutation. --- postgres/datadog_checks/postgres/statements.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 58eba9af6b81d..31bb443d939c5 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -491,7 +491,7 @@ def _apply_deltas(self, rows): if query_signature in query_text: applied_rows.append({**row, 'query': query_text[query_signature]}) else: - applied_rows.append(row) + applied_rows.append(copy.copy(row)) return applied_rows From af13bdba4b4c1c7b0ac485a1143b9e30b4f64cdb Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Mon, 13 May 2024 09:19:36 -0400 Subject: [PATCH 16/35] Remove logging --- .../base/utils/db/statement_metrics.py | 15 --------------- postgres/datadog_checks/postgres/statements.py | 9 --------- 2 files changed, 24 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index de443c7458005..be0a238bcd80d 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -83,15 +83,6 @@ def compute_derivative_rows(self, rows, metrics, key): if all(diffed_row[k] == 0 for k in metric_columns): continue - if diffed_row['calls'] > 10000: - print("[AMW] cur queryid: " + str(row['queryid'])) - print("[AMW] prev queryid: " + str(prev['queryid'])) - print("[AMW] cur calls: " + str(row['calls'])) - print("[AMW] prev calls: " + str(prev['calls'])) - print("[AMW] query: " + row['query']) - print("[AMW] query_signature: " + row['query_signature']) - print("") - result.append(diffed_row) self._previous_statements.clear() @@ -124,10 +115,4 @@ def _merge_duplicate_rows(rows, metrics, key): else: queries_by_key[query_key] = row - if row['query_signature'] in ['7bf8f124e953d206', '524374cff025d947', 'f944c89168db9394']: - print("[AMW] merging duplicates - to merge: " + str(dict((k, row[k]) for k in ['queryid', 'query_signature', 'calls', 'query']))) - print("[AMW] merging duplicates - dest: " + str(dict((k, queries_by_key[query_key][k]) for k in ['queryid', 'query_signature', 'calls', 'query']))) - print("[AMW] merged calls: " + str(queries_by_key[query_key]['calls']) + " query: " + queries_by_key[query_key]['query']) - print("") - return queries_by_key, dropped_metrics diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 31bb443d939c5..86132330d57dc 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -464,15 +464,6 @@ def _emit_pg_stat_statements_metrics(self): # fetching the full set of rows from pg_stat_statements, but we avoid paying the price of # actually querying the rows. def _apply_deltas(self, rows): - print("[AMW] Baseline cache") - for query_sig, query_sig_metrics in self._baseline_metrics.items(): - for queryid, baseline_row in query_sig_metrics.items(): - if query_sig in ['7bf8f124e953d206', '524374cff025d947', 'f944c89168db9394']: - print("[AMW] baseline - queryid: " + str(baseline_row['queryid'])) - print("[AMW] baseline - query: " + baseline_row['query']) - print("[AMW] baseline - calls: " + str(baseline_row['calls'])) - - print("") # Apply called queries to baseline_metrics for row in rows: query_signature = row['query_signature'] From ec20dd4ce2828e22ac033236fca2aa729c6210d8 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Mon, 13 May 2024 13:20:16 +0000 Subject: [PATCH 17/35] Auto lint --- postgres/datadog_checks/postgres/statements.py | 8 +++++--- postgres/tests/test_apply_deltas.py | 4 ++-- postgres/tests/test_statements.py | 3 +-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 86132330d57dc..7eb2b856edc43 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -47,7 +47,7 @@ {extra_clauses} """ -BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes +BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" @@ -489,12 +489,14 @@ def _apply_deltas(self, rows): # To prevent the baseline metrics cache from growing indefinitely (as can happen) because of # pg_stat_statements eviction), we clear it out periodically to force a full refetch. def _check_baseline_metrics_expiry(self): - if self._last_baseline_metrics_expiry == None or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time(): + if ( + self._last_baseline_metrics_expiry == None + or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time() + ): self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() self._last_baseline_metrics_expiry = time.time() - @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() diff --git a/postgres/tests/test_apply_deltas.py b/postgres/tests/test_apply_deltas.py index f52c0ea7e436e..0ebaadcfba066 100644 --- a/postgres/tests/test_apply_deltas.py +++ b/postgres/tests/test_apply_deltas.py @@ -18,7 +18,7 @@ def test_apply_deltas_base_case(pg_instance, integration_check): assert rows == [ {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'} + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, ] @@ -39,5 +39,5 @@ def test_apply_deltas_multiple_runs(pg_instance, integration_check): assert rows == [ {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'} + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}, ] diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 52e0a1e9d2c4e..c56246af203f0 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -104,6 +104,7 @@ def obfuscate_sql(query, options=None): return json.dumps({'query': query, 'metadata': {}}) queries = ["SET application_name = %s", "SET application_name = %s"] + # These queries will have the same query signature but different queryids in pg_stat_statements def _run_query(idx): query = queries[idx] @@ -135,8 +136,6 @@ def _run_query(idx): _run_query(0) run_one_check(check, dbm_instance, cancel=False) - run_one_check(check, dbm_instance, cancel=False) - # Call one query _run_query(0) run_one_check(check, dbm_instance, cancel=False) From 25eb3a3f22efdeb41d5673de05e5682b84c4e193 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Mon, 13 May 2024 14:27:59 +0000 Subject: [PATCH 18/35] Store called queryids within the QueryCallsCache --- .../postgres/query_calls_cache.py | 8 ++++++-- postgres/datadog_checks/postgres/statements.py | 18 ++++++++---------- postgres/tests/test_query_calls_cache.py | 13 +++++++------ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index ce8b1a031de92..30e2b0d929cbb 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -7,12 +7,16 @@ class QueryCallsCache: def __init__(self): self.cache = {} self.next_cache = {} + self.called_queryids = [] + self.next_called_queryids = set() def end_query_call_snapshot(self): """To prevent evicted statements from building up in the cache we replace the cache outright after each sampling of pg_stat_statements.""" self.cache = self.next_cache self.next_cache = {} + self.called_queryids = self.next_called_queryids + self.next_called_queryids = set() def set_calls(self, queryid, calls): """Updates the cache of calls per query id. @@ -34,5 +38,5 @@ def set_calls(self, queryid, calls): calls_changed = True self.next_cache[queryid] = calls - - return calls_changed + if calls_changed: + self.next_called_queryids.add(queryid) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 7eb2b856edc43..ed8a07d898744 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -213,7 +213,6 @@ def _check_called_queries(self): with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: - called_queryids = [] query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) rows = self._execute_query(cursor, query, params=(self._config.dbname,)) @@ -223,18 +222,17 @@ def _check_called_queries(self): continue calls = row[1] - calls_changed = self._query_calls_cache.set_calls(queryid, calls) - if calls_changed: - called_queryids.append(queryid) + self._query_calls_cache.set_calls(queryid, calls) + self._query_calls_cache.end_query_call_snapshot() self._check.gauge( "dd.postgresql.pg_stat_statements.calls_changed", - len(called_queryids), + len(self._query_calls_cache.called_queryids), tags=self.tags, hostname=self._check.resolved_hostname, ) - return called_queryids + return self._query_calls_cache.called_queryids def run_job(self): # do not emit any dd.internal metrics for DBM specific check code @@ -272,7 +270,7 @@ def collect_per_statement_metrics(self): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _load_pg_stat_statements(self): try: - called_queryids = self._check_called_queries() + self._check_called_queries() available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -347,7 +345,7 @@ def _load_pg_stat_statements(self): pg_stat_statements_view=self._config.pg_stat_statements_view, filters=filters, extra_clauses="", - called_queryids=', '.join([str(i) for i in called_queryids]), + called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]), ), params=params, ) @@ -478,7 +476,7 @@ def _apply_deltas(self, rows): query_text = {row['query_signature']: row['query'] for row in rows} applied_rows = [] for query_signature, query_sig_metrics in self._baseline_metrics.items(): - for queryid, row in query_sig_metrics.items(): + for row in query_sig_metrics.values(): if query_signature in query_text: applied_rows.append({**row, 'query': query_text[query_signature]}) else: @@ -490,7 +488,7 @@ def _apply_deltas(self, rows): # pg_stat_statements eviction), we clear it out periodically to force a full refetch. def _check_baseline_metrics_expiry(self): if ( - self._last_baseline_metrics_expiry == None + self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time() ): self._baseline_metrics = {} diff --git a/postgres/tests/test_query_calls_cache.py b/postgres/tests/test_query_calls_cache.py index 81e5e34f251cc..864aabcae979d 100644 --- a/postgres/tests/test_query_calls_cache.py +++ b/postgres/tests/test_query_calls_cache.py @@ -12,27 +12,28 @@ def test_statement_queryid_cache_same_calls_does_not_change(): cache = QueryCallsCache() cache.set_calls(123, 1) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 1) + cache.set_calls(123, 1) + cache.end_query_call_snapshot() - assert changed is False + assert cache.called_queryids == set() def test_statement_queryid_cache_multiple_calls_change(): cache = QueryCallsCache() cache.set_calls(123, 1) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 2) + cache.set_calls(123, 2) - assert changed is True + assert cache.called_queryids == {123} def test_statement_queryid_cache_after_pg_stat_statement_eviction(): cache = QueryCallsCache() cache.set_calls(123, 100) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 5) + cache.set_calls(123, 5) - assert changed is True + assert cache.called_queryids == {123} def test_statement_queryid_cache_snapshot_eviction(): From 19e95c753500043b15d567ae5b0f68355ccaca20 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 03:07:08 +0000 Subject: [PATCH 19/35] Delete query text. --- postgres/datadog_checks/postgres/statements.py | 1 + postgres/tests/test_statements.py | 1 + 2 files changed, 2 insertions(+) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index ed8a07d898744..034aa5e82e16e 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -467,6 +467,7 @@ def _apply_deltas(self, rows): query_signature = row['query_signature'] queryid = row['queryid'] baseline_row = copy.copy(row) + del baseline_row['query'] if query_signature not in self._baseline_metrics: self._baseline_metrics[query_signature] = {queryid: baseline_row} else: diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index c56246af203f0..b849efdfa2093 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -86,6 +86,7 @@ def test_statement_metrics_multiple_pgss_rows_single_query_signature( # don't need samples for this test dbm_instance['query_samples'] = {'enabled': False} dbm_instance['query_activity'] = {'enabled': False} + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} connections = {} def normalize_query(q): From 34dd2fa665faad712a970fb9ffb1b96b47872f4d Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 03:23:38 +0000 Subject: [PATCH 20/35] This test doesn't work on v9 --- postgres/tests/test_statements.py | 1 + 1 file changed, 1 insertion(+) diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index b849efdfa2093..163ddad69e472 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -77,6 +77,7 @@ def test_dbm_enabled_config(integration_check, dbm_instance, dbm_enabled_key, db assert check._config.dbm_enabled == dbm_enabled +@requires_over_10 def test_statement_metrics_multiple_pgss_rows_single_query_signature( aggregator, integration_check, From ad9c4bfb21a199118fc1f22c132e0b4115ddaed9 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Mon, 13 May 2024 23:40:52 -0400 Subject: [PATCH 21/35] Queryids are guaranteed, so can skip this check. --- postgres/datadog_checks/postgres/statements.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 034aa5e82e16e..c5d2f09cc91a0 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -218,9 +218,6 @@ def _check_called_queries(self): for row in rows: queryid = row[0] - if queryid is None: - continue - calls = row[1] self._query_calls_cache.set_calls(queryid, calls) From 3ef0940fbcb56f0b40693215fd52681df855a42d Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 10:41:10 -0400 Subject: [PATCH 22/35] Fetch all pgss rows initially to avoid unnecessary filtering in the execution. --- .../postgres/query_calls_cache.py | 33 +++--- .../datadog_checks/postgres/statements.py | 104 ++++++++++++------ postgres/tests/test_query_calls_cache.py | 27 ++--- 3 files changed, 101 insertions(+), 63 deletions(-) diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index 30e2b0d929cbb..6046e1030a3b8 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -18,25 +18,30 @@ def end_query_call_snapshot(self): self.called_queryids = self.next_called_queryids self.next_called_queryids = set() - def set_calls(self, queryid, calls): + def set_calls(self, rows): """Updates the cache of calls per query id. Returns whether or not the number of calls changed based on the newly updated value. The first seen update for a queryid does not count as a change in values since that would result in an inflated value.""" - calls_changed = False + for row in rows: + queryid = row['queryid'] + calls = row['calls'] + calls_changed = False - if queryid in self.cache: - diff = calls - self.cache[queryid] - # Positive deltas mean the statement remained in pg_stat_statements - # between check calls. Negative deltas mean the statement was evicted - # and replaced with a new call count. Both cases should count as a call - # change. - calls_changed = diff != 0 - else: - calls_changed = True + if queryid in self.cache: + diff = calls - self.cache[queryid] + # Positive deltas mean the statement remained in pg_stat_statements + # between check calls. Negative deltas mean the statement was evicted + # and replaced with a new call count. Both cases should count as a call + # change. + calls_changed = diff != 0 + else: + calls_changed = True - self.next_cache[queryid] = calls - if calls_changed: - self.next_called_queryids.add(queryid) + self.next_cache[queryid] = calls + if calls_changed: + self.next_called_queryids.add(queryid) + + self.end_query_call_snapshot() diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index c5d2f09cc91a0..dca5ed425b65d 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -21,7 +21,7 @@ from .query_calls_cache import QueryCallsCache from .util import DatabaseConfigurationError, payload_pg_version, warning_with_tags -from .version_utils import V9_4, V14 +from .version_utils import V9_4, V10, V14 try: import datadog_agent @@ -42,11 +42,31 @@ LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid WHERE query NOT LIKE 'EXPLAIN %%' - AND queryid = ANY('{{ {called_queryids} }}'::bigint[]) + {queryid_filter} {filters} {extra_clauses} """ +def statements_query(**kwargs): + pg_stat_statements_view = kwargs.get('pg_stat_statements_view', 'pg_stat_statements') + cols = kwargs.get('cols', '*') + filters = kwargs.get('filters', '') + extra_clauses = kwargs.get('extra_clauses', '') + called_queryids = kwargs.get('called_queryids', []) + + queryid_filter = "" + if len(called_queryids) > 0: + queryid_filter = f"AND queryid = ANY('{{ {called_queryids} }}'::bigint[])" + + return STATEMENTS_QUERY.format( + cols=cols, + pg_stat_statements_view=pg_stat_statements_view, + filters=filters, + extra_clauses=extra_clauses, + queryid_filter=queryid_filter, + called_queryids=called_queryids, + ) + BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk @@ -188,12 +208,10 @@ def _get_pg_stat_statements_columns(self): return self._stat_column_cache # Querying over '*' with limit 0 allows fetching only the column names from the cursor without data - query = STATEMENTS_QUERY.format( + query = statements_query( cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", - filters="", - called_queryids="", ) with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: @@ -212,16 +230,10 @@ def _check_called_queries(self): pgss_view_without_query_text = "pg_stat_statements(false)" with self._check._get_main_db() as conn: - with conn.cursor(cursor_factory=CommenterCursor) as cursor: + with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) rows = self._execute_query(cursor, query, params=(self._config.dbname,)) - - for row in rows: - queryid = row[0] - calls = row[1] - self._query_calls_cache.set_calls(queryid, calls) - - self._query_calls_cache.end_query_call_snapshot() + self._query_calls_cache.set_calls(rows) self._check.gauge( "dd.postgresql.pg_stat_statements.calls_changed", len(self._query_calls_cache.called_queryids), @@ -267,7 +279,6 @@ def collect_per_statement_metrics(self): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _load_pg_stat_statements(self): try: - self._check_called_queries() available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -335,17 +346,27 @@ def _load_pg_stat_statements(self): params = params + tuple(self._config.ignore_databases) with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: - return self._execute_query( - cursor, - STATEMENTS_QUERY.format( - cols=', '.join(query_columns), - pg_stat_statements_view=self._config.pg_stat_statements_view, - filters=filters, - extra_clauses="", - called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]), - ), - params=params, - ) + if len(self._query_calls_cache.cache) > 0: + return self._execute_query( + cursor, + statements_query( + cols=', '.join(query_columns), + pg_stat_statements_view=self._config.pg_stat_statements_view, + filters=filters, + called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]), + ), + params=params, + ) + else: + return self._execute_query( + cursor, + statements_query( + cols=', '.join(query_columns), + pg_stat_statements_view=self._config.pg_stat_statements_view, + filters=filters, + ), + params=params, + ) except psycopg2.Error as e: error_tag = "error:database-{}".format(type(e).__name__) @@ -453,24 +474,27 @@ def _emit_pg_stat_statements_metrics(self): except psycopg2.Error as e: self._log.warning("Failed to query for pg_stat_statements count: %s", e) - # _apply_deltas expects normalized rows before any merging of duplicates. + # _apply_called_queries expects normalized rows before any merging of duplicates. # It takes the incremental pg_stat_statements rows and constructs the full set of rows # by adding the existing values in the baseline_metrics cache. This is equivalent to # fetching the full set of rows from pg_stat_statements, but we avoid paying the price of # actually querying the rows. - def _apply_deltas(self, rows): + def _apply_called_queries(self, rows): # Apply called queries to baseline_metrics for row in rows: query_signature = row['query_signature'] queryid = row['queryid'] baseline_row = copy.copy(row) + + # To avoid high memory usage, don't cache the query text since it can be large. del baseline_row['query'] if query_signature not in self._baseline_metrics: self._baseline_metrics[query_signature] = {queryid: baseline_row} else: self._baseline_metrics[query_signature][queryid] = baseline_row - # Apply query text, so it doesn't have to be cached. + # Apply query text for called queries since it is not cached and uncalled queries won't get result + # in sent metrics. query_text = {row['query_signature']: row['query'] for row in rows} applied_rows = [] for query_signature, query_sig_metrics in self._baseline_metrics.items(): @@ -498,10 +522,26 @@ def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() - self._check_baseline_metrics_expiry() - rows = self._load_pg_stat_statements() - rows = self._normalize_queries(rows) - rows = self._apply_deltas(rows) + rows = [] + if self._check.version < V10: + rows = self._load_pg_stat_statements() + rows = self._normalize_queries(rows) + elif len(self._baseline_metrics) == 0: + # When we don't have baseline metrics (either on the first run or after cache expiry), + # we fetch all rows from pg_stat_statements, and update the initial state of relevant + # caches. + rows = self._load_pg_stat_statements() + rows = self._normalize_queries(rows) + self._query_calls_cache.set_calls(rows) + self._apply_called_queries(rows) + else: + # When we do have baseline metrics, use them to construct the full set of rows + # so that compute_derivative_rows can merge duplicates and calculate deltas. + self._check_baseline_metrics_expiry() + self._check_called_queries() + rows = self._load_pg_stat_statements() + rows = self._normalize_queries(rows) + rows = self._apply_called_queries(rows) if not rows: return [] diff --git a/postgres/tests/test_query_calls_cache.py b/postgres/tests/test_query_calls_cache.py index 864aabcae979d..674dc717a0e18 100644 --- a/postgres/tests/test_query_calls_cache.py +++ b/postgres/tests/test_query_calls_cache.py @@ -10,47 +10,40 @@ def test_statement_queryid_cache_same_calls_does_not_change(): cache = QueryCallsCache() - cache.set_calls(123, 1) - cache.end_query_call_snapshot() - cache.set_calls(123, 1) - cache.end_query_call_snapshot() + cache.set_calls([{'queryid': 123, 'calls': 1}]) + cache.set_calls([{'queryid': 123, 'calls': 1}]) assert cache.called_queryids == set() def test_statement_queryid_cache_multiple_calls_change(): cache = QueryCallsCache() - cache.set_calls(123, 1) - cache.end_query_call_snapshot() - cache.set_calls(123, 2) + cache.set_calls([{'queryid': 123, 'calls': 1}]) + cache.set_calls([{'queryid': 123, 'calls': 2}]) assert cache.called_queryids == {123} def test_statement_queryid_cache_after_pg_stat_statement_eviction(): cache = QueryCallsCache() - cache.set_calls(123, 100) - cache.end_query_call_snapshot() - cache.set_calls(123, 5) + cache.set_calls([{'queryid': 123, 'calls': 100}]) + cache.set_calls([{'queryid': 123, 'calls': 5}]) assert cache.called_queryids == {123} def test_statement_queryid_cache_snapshot_eviction(): cache = QueryCallsCache() - cache.set_calls(123, 100) - cache.end_query_call_snapshot() - cache.set_calls(124, 5) - cache.end_query_call_snapshot() + cache.set_calls([{'queryid': 123, 'calls': 100}]) + cache.set_calls([{'queryid': 124, 'calls': 5}]) assert cache.cache.get(123, None) is None def test_statement_queryid_multiple_inserts(): cache = QueryCallsCache() - cache.set_calls(123, 100) - cache.set_calls(124, 5) - cache.end_query_call_snapshot() + cache.set_calls([{'queryid': 123, 'calls': 100}]) + cache.set_calls([{'queryid': 124, 'calls': 5}]) assert cache.cache[123] == 100 assert cache.cache[124] == 5 From 2630e2f6a3d73e9826707f7497c30ec488363beb Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 12:07:43 -0400 Subject: [PATCH 23/35] Opt in to performance optimization --- postgres/assets/configuration/spec.yaml | 8 ++++++++ postgres/datadog_checks/postgres/config.py | 1 + postgres/datadog_checks/postgres/statements.py | 2 +- postgres/tests/test_statements.py | 2 +- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index 061b84866c11a..7da8a4ab7fc35 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -428,6 +428,14 @@ files: value: type: number example: 10000 + - name: incremental_query_metrics + hidden: true + description: | + Enable an experimental performance optimization that reduces the amount of data queried from `pg_stat_statements` + when calculating query metrics. This option is only available for PostgreSQL 10.0 and above. + value: + type: boolean + example: true - name: query_samples description: Configure collection of query samples options: diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index b5efb30ae2ad7..4ac97f657d524 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -165,6 +165,7 @@ def __init__(self, instance, init_config): self.log_unobfuscated_queries = is_affirmative(instance.get('log_unobfuscated_queries', False)) self.log_unobfuscated_plans = is_affirmative(instance.get('log_unobfuscated_plans', False)) self.database_instance_collection_interval = instance.get('database_instance_collection_interval', 300) + self.incremental_query_metrics = is_affirmative(self.statement_metrics_config.get('incremental_query_metrics', False)) def _build_tags(self, custom_tags, agent_tags, propagate_agent_tags=True): # Clean up tags in case there was a None entry in the instance diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index dca5ed425b65d..718d6a565f79c 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -523,7 +523,7 @@ def _collect_metrics_rows(self): self._emit_pg_stat_statements_dealloc() rows = [] - if self._check.version < V10: + if (not self._config.incremental_query_metrics) or self._check.version < V10: rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) elif len(self._baseline_metrics) == 0: diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 163ddad69e472..4696384084d4b 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -87,7 +87,7 @@ def test_statement_metrics_multiple_pgss_rows_single_query_signature( # don't need samples for this test dbm_instance['query_samples'] = {'enabled': False} dbm_instance['query_activity'] = {'enabled': False} - dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.1} + dbm_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'incremental_query_metrics': True} connections = {} def normalize_query(q): From e44b0b39fa4cbbb18a5d243f55bccbe56f216486 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 19:24:05 +0000 Subject: [PATCH 24/35] Lint fixes --- postgres/datadog_checks/postgres/config.py | 4 +++- postgres/datadog_checks/postgres/config_models/instance.py | 1 + postgres/datadog_checks/postgres/statements.py | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index 4ac97f657d524..5c8fffedd250e 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -165,7 +165,9 @@ def __init__(self, instance, init_config): self.log_unobfuscated_queries = is_affirmative(instance.get('log_unobfuscated_queries', False)) self.log_unobfuscated_plans = is_affirmative(instance.get('log_unobfuscated_plans', False)) self.database_instance_collection_interval = instance.get('database_instance_collection_interval', 300) - self.incremental_query_metrics = is_affirmative(self.statement_metrics_config.get('incremental_query_metrics', False)) + self.incremental_query_metrics = is_affirmative( + self.statement_metrics_config.get('incremental_query_metrics', False) + ) def _build_tags(self, custom_tags, agent_tags, propagate_agent_tags=True): # Clean up tags in case there was a None entry in the instance diff --git a/postgres/datadog_checks/postgres/config_models/instance.py b/postgres/datadog_checks/postgres/config_models/instance.py index 0947276352658..e018818a02594 100644 --- a/postgres/datadog_checks/postgres/config_models/instance.py +++ b/postgres/datadog_checks/postgres/config_models/instance.py @@ -156,6 +156,7 @@ class QueryMetrics(BaseModel): ) collection_interval: Optional[float] = None enabled: Optional[bool] = None + incremental_query_metrics: Optional[bool] = None pg_stat_statements_max_warning_threshold: Optional[float] = None diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 718d6a565f79c..966bd3cb3151f 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -47,6 +47,7 @@ {extra_clauses} """ + def statements_query(**kwargs): pg_stat_statements_view = kwargs.get('pg_stat_statements_view', 'pg_stat_statements') cols = kwargs.get('cols', '*') @@ -67,6 +68,7 @@ def statements_query(**kwargs): called_queryids=called_queryids, ) + BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk @@ -527,7 +529,7 @@ def _collect_metrics_rows(self): rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) elif len(self._baseline_metrics) == 0: - # When we don't have baseline metrics (either on the first run or after cache expiry), + # When we don't have baseline metrics (either on the first run or after cache expiry), # we fetch all rows from pg_stat_statements, and update the initial state of relevant # caches. rows = self._load_pg_stat_statements() From c82598398168a229d2b481fe526992489e38320c Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 19:42:15 +0000 Subject: [PATCH 25/35] Account for function rename --- ...st_apply_deltas.py => test_apply_called_queries.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename postgres/tests/{test_apply_deltas.py => test_apply_called_queries.py} (76%) diff --git a/postgres/tests/test_apply_deltas.py b/postgres/tests/test_apply_called_queries.py similarity index 76% rename from postgres/tests/test_apply_deltas.py rename to postgres/tests/test_apply_called_queries.py index 0ebaadcfba066..c24e4c2f7ce0e 100644 --- a/postgres/tests/test_apply_deltas.py +++ b/postgres/tests/test_apply_called_queries.py @@ -6,7 +6,7 @@ pytestmark = [pytest.mark.unit] -def test_apply_deltas_base_case(pg_instance, integration_check): +def test_apply_called_queries_base_case(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ @@ -14,7 +14,7 @@ def test_apply_deltas_base_case(pg_instance, integration_check): {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, ] - rows = check.statement_metrics._apply_deltas(rows) + rows = check.statement_metrics._apply_called_queries(rows) assert rows == [ {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, @@ -22,7 +22,7 @@ def test_apply_deltas_base_case(pg_instance, integration_check): ] -def test_apply_deltas_multiple_runs(pg_instance, integration_check): +def test_apply_called_queries_multiple_runs(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ @@ -30,12 +30,12 @@ def test_apply_deltas_multiple_runs(pg_instance, integration_check): {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, ] - rows = check.statement_metrics._apply_deltas(rows) + rows = check.statement_metrics._apply_called_queries(rows) second_rows = [ {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}, ] - rows = check.statement_metrics._apply_deltas(second_rows) + rows = check.statement_metrics._apply_called_queries(second_rows) assert rows == [ {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, From 217358dc16939a6197dd29187f9509b9adc65cf5 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 19:55:26 +0000 Subject: [PATCH 26/35] Account for new set_calls pattern. --- postgres/tests/test_query_calls_cache.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/postgres/tests/test_query_calls_cache.py b/postgres/tests/test_query_calls_cache.py index 674dc717a0e18..5b4a42ace717a 100644 --- a/postgres/tests/test_query_calls_cache.py +++ b/postgres/tests/test_query_calls_cache.py @@ -42,8 +42,7 @@ def test_statement_queryid_cache_snapshot_eviction(): def test_statement_queryid_multiple_inserts(): cache = QueryCallsCache() - cache.set_calls([{'queryid': 123, 'calls': 100}]) - cache.set_calls([{'queryid': 124, 'calls': 5}]) + cache.set_calls([{'queryid': 123, 'calls': 100}, {'queryid': 124, 'calls': 5}]) assert cache.cache[123] == 100 assert cache.cache[124] == 5 From d252c9a7544c353b2289e163e6440147618a49e7 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Tue, 14 May 2024 20:57:26 +0000 Subject: [PATCH 27/35] Spec default --- postgres/assets/configuration/spec.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index 7da8a4ab7fc35..66063301257d0 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -435,7 +435,8 @@ files: when calculating query metrics. This option is only available for PostgreSQL 10.0 and above. value: type: boolean - example: true + display_default: false + example: false - name: query_samples description: Configure collection of query samples options: From 8bdf3c66e3da463ebb83d6c555bf6fa85083a3ac Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Wed, 15 May 2024 17:18:54 -0400 Subject: [PATCH 28/35] Properly expire baseline metrics cache. --- postgres/assets/configuration/spec.yaml | 9 +++++++++ postgres/datadog_checks/postgres/config.py | 1 + .../datadog_checks/postgres/config_models/instance.py | 1 + postgres/datadog_checks/postgres/statements.py | 6 ++---- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index 66063301257d0..0f8727b6441e8 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -437,6 +437,15 @@ files: type: boolean display_default: false example: false + - name: baseline_metrics_expiry + hidden: true + description: | + Set the cache expiry time (in seconds) for the baseline for query metrics. This option is only available when + `incremental_query_metrics` is enabled. + value: + type: number + display_default: 300 + example: 300 - name: query_samples description: Configure collection of query samples options: diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index 5c8fffedd250e..dc3939e046eb4 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -168,6 +168,7 @@ def __init__(self, instance, init_config): self.incremental_query_metrics = is_affirmative( self.statement_metrics_config.get('incremental_query_metrics', False) ) + self.baseline_metrics_expiry = self.statement_metrics_config.get('incremental_query_metrics', 300) def _build_tags(self, custom_tags, agent_tags, propagate_agent_tags=True): # Clean up tags in case there was a None entry in the instance diff --git a/postgres/datadog_checks/postgres/config_models/instance.py b/postgres/datadog_checks/postgres/config_models/instance.py index e018818a02594..6c12162692ab6 100644 --- a/postgres/datadog_checks/postgres/config_models/instance.py +++ b/postgres/datadog_checks/postgres/config_models/instance.py @@ -154,6 +154,7 @@ class QueryMetrics(BaseModel): arbitrary_types_allowed=True, frozen=True, ) + baseline_metrics_expiry: Optional[float] = None collection_interval: Optional[float] = None enabled: Optional[bool] = None incremental_query_metrics: Optional[bool] = None diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 966bd3cb3151f..6e6b477af1ca3 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -69,8 +69,6 @@ def statements_query(**kwargs): ) -BASELINE_METRICS_EXPIRY = 60 * 10 # 10 minutes - # Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)" PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 = "SELECT COUNT(*) FROM pg_stat_statements" @@ -513,7 +511,7 @@ def _apply_called_queries(self, rows): def _check_baseline_metrics_expiry(self): if ( self._last_baseline_metrics_expiry is None - or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time() + or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time() ): self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() @@ -524,6 +522,7 @@ def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() + self._check_baseline_metrics_expiry() rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: rows = self._load_pg_stat_statements() @@ -539,7 +538,6 @@ def _collect_metrics_rows(self): else: # When we do have baseline metrics, use them to construct the full set of rows # so that compute_derivative_rows can merge duplicates and calculate deltas. - self._check_baseline_metrics_expiry() self._check_called_queries() rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) From 6edb871aa94b68f7060cefda45ec2db80ef35fdb Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Wed, 15 May 2024 17:44:38 -0400 Subject: [PATCH 29/35] Loggin --- postgres/datadog_checks/postgres/statements.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 6e6b477af1ca3..e36bd9eb6ee57 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -520,14 +520,15 @@ def _check_baseline_metrics_expiry(self): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() - self._emit_pg_stat_statements_dealloc() - + self._emit_pg_stat_sta self._check_baseline_metrics_expiry() rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: + print("[AMW] feature flag off branch") rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) elif len(self._baseline_metrics) == 0: + print("[AMW] baseline queries") # When we don't have baseline metrics (either on the first run or after cache expiry), # we fetch all rows from pg_stat_statements, and update the initial state of relevant # caches. @@ -536,6 +537,7 @@ def _collect_metrics_rows(self): self._query_calls_cache.set_calls(rows) self._apply_called_queries(rows) else: + print("[AMW] incremental queries") # When we do have baseline metrics, use them to construct the full set of rows # so that compute_derivative_rows can merge duplicates and calculate deltas. self._check_called_queries() From cdefa8eee49dfcdff19394a91c93293f092b348c Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Thu, 16 May 2024 13:08:59 -0400 Subject: [PATCH 30/35] wip --- .../base/utils/db/statement_metrics.py | 7 ++- postgres/datadog_checks/postgres/config.py | 2 +- .../datadog_checks/postgres/statements.py | 57 +++++++++++++------ postgres/tests/test_apply_called_queries.py | 37 +++++++++--- 4 files changed, 76 insertions(+), 27 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index be0a238bcd80d..ae992ca305426 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -24,7 +24,7 @@ class StatementMetrics: def __init__(self): self._previous_statements = {} - def compute_derivative_rows(self, rows, metrics, key): + def compute_derivative_rows(self, rows, metrics, log, key): """ Compute the first derivative of column-based metrics for a given set of rows. This function takes the difference of the previous check run's values and the current check run's values @@ -83,6 +83,11 @@ def compute_derivative_rows(self, rows, metrics, key): if all(diffed_row[k] == 0 for k in metric_columns): continue + if 'begin' in row['query']: + log.info("[AMW] metrics calc") + log.info("[AMW] prev: " + str({k: prev[k] for k in ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls' ]})) + log.info("[AMW] curr: " + str({k: row[k] for k in ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls' ]})) + result.append(diffed_row) self._previous_statements.clear() diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index dc3939e046eb4..6514dcd77e059 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -168,7 +168,7 @@ def __init__(self, instance, init_config): self.incremental_query_metrics = is_affirmative( self.statement_metrics_config.get('incremental_query_metrics', False) ) - self.baseline_metrics_expiry = self.statement_metrics_config.get('incremental_query_metrics', 300) + self.baseline_metrics_expiry = self.statement_metrics_config.get('baseline_metrics_expiry', 300) def _build_tags(self, custom_tags, agent_tags, propagate_agent_tags=True): # Clean up tags in case there was a None entry in the instance diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index e36bd9eb6ee57..2bbad588db233 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -41,7 +41,8 @@ ON pg_stat_statements.userid = pg_roles.oid LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid - WHERE query NOT LIKE 'EXPLAIN %%' + WHERE query != '' + AND query NOT LIKE 'EXPLAIN %%' {queryid_filter} {filters} {extra_clauses} @@ -255,6 +256,7 @@ def collect_per_statement_metrics(self): # all databases on the host. For metrics the "db" tag is added during ingestion based on which database # each query came from. try: + self._log.info("[AMW] check running") rows = self._collect_metrics_rows() if not rows: return @@ -347,6 +349,7 @@ def _load_pg_stat_statements(self): with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: if len(self._query_calls_cache.cache) > 0: + self._log.info("[AMW] fetching incremental queries") return self._execute_query( cursor, statements_query( @@ -358,6 +361,7 @@ def _load_pg_stat_statements(self): params=params, ) else: + self._log.info("[AMW] fetching all queries") return self._execute_query( cursor, statements_query( @@ -486,23 +490,21 @@ def _apply_called_queries(self, rows): queryid = row['queryid'] baseline_row = copy.copy(row) + key = _row_key(baseline_row) + (queryid, ) + # To avoid high memory usage, don't cache the query text since it can be large. del baseline_row['query'] - if query_signature not in self._baseline_metrics: - self._baseline_metrics[query_signature] = {queryid: baseline_row} - else: - self._baseline_metrics[query_signature][queryid] = baseline_row + self._baseline_metrics[key] = baseline_row # Apply query text for called queries since it is not cached and uncalled queries won't get result # in sent metrics. query_text = {row['query_signature']: row['query'] for row in rows} applied_rows = [] - for query_signature, query_sig_metrics in self._baseline_metrics.items(): - for row in query_sig_metrics.values(): - if query_signature in query_text: - applied_rows.append({**row, 'query': query_text[query_signature]}) - else: - applied_rows.append(copy.copy(row)) + for row in self._baseline_metrics.values(): + if query_signature in query_text: + applied_rows.append({**row, 'query': query_text[query_signature]}) + else: + applied_rows.append(copy.copy(row)) return applied_rows @@ -513,37 +515,60 @@ def _check_baseline_metrics_expiry(self): self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time() ): + self._log.info("[AMW] last_baseline_metrics_expiry" + str(self._last_baseline_metrics_expiry)) + self._log.info("[AMW] baseline_metrics_expiry" + str(self._config.baseline_metrics_expiry)) + self._log.info("[AMW] resetting baseline cachce") self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() self._last_baseline_metrics_expiry = time.time() + def log_rows(self, rows): + self._log.info("[AMW] logging frame") + columns = ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls'] + for row in rows: + if 'query' in row and 'begin' in row['query']: + self._log.info("[AMW] pgss row: " + str({k: row[k] for k in columns})) + + self._log.info("") + + for row_key, prev in self._state._previous_statements.items(): + if 'fc747ae36f14c50d' in row_key: + self._log.info("[AMW] prev row: " + str({k: prev[k] for k in columns})) + + self._log.info("[AMW] done logging rows") + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() - self._emit_pg_stat_sta + self._emit_pg_stat_statements_dealloc() + + # This leads to inflated metrics self._check_baseline_metrics_expiry() + rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: - print("[AMW] feature flag off branch") + self._log.info("[AMW] feature flag off branch") rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) elif len(self._baseline_metrics) == 0: - print("[AMW] baseline queries") + self._log.info("[AMW] baseline queries") # When we don't have baseline metrics (either on the first run or after cache expiry), # we fetch all rows from pg_stat_statements, and update the initial state of relevant # caches. rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) + self.log_rows(rows) self._query_calls_cache.set_calls(rows) self._apply_called_queries(rows) else: - print("[AMW] incremental queries") + self._log.info("[AMW] incremental queries") # When we do have baseline metrics, use them to construct the full set of rows # so that compute_derivative_rows can merge duplicates and calculate deltas. self._check_called_queries() rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) rows = self._apply_called_queries(rows) + self.log_rows(rows) if not rows: return [] @@ -551,7 +576,7 @@ def _collect_metrics_rows(self): available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS - rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) + rows = self._state.compute_derivative_rows(rows, metric_columns, self._log, key=_row_key) self._check.gauge( 'dd.postgres.queries.query_rows_raw', len(rows), diff --git a/postgres/tests/test_apply_called_queries.py b/postgres/tests/test_apply_called_queries.py index c24e4c2f7ce0e..01b270d753b6e 100644 --- a/postgres/tests/test_apply_called_queries.py +++ b/postgres/tests/test_apply_called_queries.py @@ -10,15 +10,15 @@ def test_apply_called_queries_base_case(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, ] rows = check.statement_metrics._apply_called_queries(rows) assert rows == [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, ] @@ -26,18 +26,37 @@ def test_apply_called_queries_multiple_runs(pg_instance, integration_check): check = integration_check(pg_instance) rows = [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123'}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 2, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, ] rows = check.statement_metrics._apply_called_queries(rows) second_rows = [ - {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, ] rows = check.statement_metrics._apply_called_queries(second_rows) assert rows == [ - {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123'}, - {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123'}, + {'queryid': 1, 'query_signature': 'abc', 'calls': 1, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, + ] + +def test_apply_called_queries_multiple_dbs(pg_instance, integration_check): + check = integration_check(pg_instance) + + db1 = 'db1' + db2 = 'db2' + user1 = 'usr1' + user2 = 'usr2' + rows = [ + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123', 'datname': db1, 'rolname': user1}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 100, 'query': 'query 123', 'datname': db2, 'rolname': user2}, + ] + applied = check.statement_metrics._apply_called_queries(rows) + + # The rows should remain separate because they have separate database and user values. + assert applied == [ + {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123', 'datname': db1, 'rolname': user1}, + {'queryid': 2, 'query_signature': 'abc', 'calls': 100, 'query': 'query 123', 'datname': db2, 'rolname': user2}, ] From 3ebcfbf7d9333b20dcb43bbd10a7dd9a0a1a5afb Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Thu, 16 May 2024 19:54:33 -0400 Subject: [PATCH 31/35] Fix query text. --- postgres/datadog_checks/postgres/statements.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 2bbad588db233..cd50f1f1ffafe 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -478,6 +478,9 @@ def _emit_pg_stat_statements_metrics(self): except psycopg2.Error as e: self._log.warning("Failed to query for pg_stat_statements count: %s", e) + def _baseline_metrics_query_key(self, row): + return _row_key(row) + (row['queryid'], ) + # _apply_called_queries expects normalized rows before any merging of duplicates. # It takes the incremental pg_stat_statements rows and constructs the full set of rows # by adding the existing values in the baseline_metrics cache. This is equivalent to @@ -486,11 +489,8 @@ def _emit_pg_stat_statements_metrics(self): def _apply_called_queries(self, rows): # Apply called queries to baseline_metrics for row in rows: - query_signature = row['query_signature'] - queryid = row['queryid'] baseline_row = copy.copy(row) - - key = _row_key(baseline_row) + (queryid, ) + key = self._baseline_metrics_query_key(row) # To avoid high memory usage, don't cache the query text since it can be large. del baseline_row['query'] @@ -501,6 +501,7 @@ def _apply_called_queries(self, rows): query_text = {row['query_signature']: row['query'] for row in rows} applied_rows = [] for row in self._baseline_metrics.values(): + query_signature = row['query_signature'] if query_signature in query_text: applied_rows.append({**row, 'query': query_text[query_signature]}) else: @@ -542,14 +543,13 @@ def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() - # This leads to inflated metrics self._check_baseline_metrics_expiry() - rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: self._log.info("[AMW] feature flag off branch") rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) + self.log_rows(rows) elif len(self._baseline_metrics) == 0: self._log.info("[AMW] baseline queries") # When we don't have baseline metrics (either on the first run or after cache expiry), From dbc72b25e4ba6ea1ab614e4d036c47a6b53e7bd4 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Thu, 16 May 2024 19:55:27 -0400 Subject: [PATCH 32/35] Remove logging --- .../base/utils/db/statement_metrics.py | 7 +---- .../datadog_checks/postgres/statements.py | 26 +------------------ 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index ae992ca305426..be0a238bcd80d 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -24,7 +24,7 @@ class StatementMetrics: def __init__(self): self._previous_statements = {} - def compute_derivative_rows(self, rows, metrics, log, key): + def compute_derivative_rows(self, rows, metrics, key): """ Compute the first derivative of column-based metrics for a given set of rows. This function takes the difference of the previous check run's values and the current check run's values @@ -83,11 +83,6 @@ def compute_derivative_rows(self, rows, metrics, log, key): if all(diffed_row[k] == 0 for k in metric_columns): continue - if 'begin' in row['query']: - log.info("[AMW] metrics calc") - log.info("[AMW] prev: " + str({k: prev[k] for k in ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls' ]})) - log.info("[AMW] curr: " + str({k: row[k] for k in ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls' ]})) - result.append(diffed_row) self._previous_statements.clear() diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index cd50f1f1ffafe..271aeb6c35079 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -256,7 +256,6 @@ def collect_per_statement_metrics(self): # all databases on the host. For metrics the "db" tag is added during ingestion based on which database # each query came from. try: - self._log.info("[AMW] check running") rows = self._collect_metrics_rows() if not rows: return @@ -349,7 +348,6 @@ def _load_pg_stat_statements(self): with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterDictCursor) as cursor: if len(self._query_calls_cache.cache) > 0: - self._log.info("[AMW] fetching incremental queries") return self._execute_query( cursor, statements_query( @@ -361,7 +359,6 @@ def _load_pg_stat_statements(self): params=params, ) else: - self._log.info("[AMW] fetching all queries") return self._execute_query( cursor, statements_query( @@ -516,28 +513,10 @@ def _check_baseline_metrics_expiry(self): self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time() ): - self._log.info("[AMW] last_baseline_metrics_expiry" + str(self._last_baseline_metrics_expiry)) - self._log.info("[AMW] baseline_metrics_expiry" + str(self._config.baseline_metrics_expiry)) - self._log.info("[AMW] resetting baseline cachce") self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() self._last_baseline_metrics_expiry = time.time() - def log_rows(self, rows): - self._log.info("[AMW] logging frame") - columns = ['query', 'queryid', 'query_signature', 'datname', 'rolname', 'calls'] - for row in rows: - if 'query' in row and 'begin' in row['query']: - self._log.info("[AMW] pgss row: " + str({k: row[k] for k in columns})) - - self._log.info("") - - for row_key, prev in self._state._previous_statements.items(): - if 'fc747ae36f14c50d' in row_key: - self._log.info("[AMW] prev row: " + str({k: prev[k] for k in columns})) - - self._log.info("[AMW] done logging rows") - @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() @@ -546,12 +525,10 @@ def _collect_metrics_rows(self): self._check_baseline_metrics_expiry() rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: - self._log.info("[AMW] feature flag off branch") rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) self.log_rows(rows) elif len(self._baseline_metrics) == 0: - self._log.info("[AMW] baseline queries") # When we don't have baseline metrics (either on the first run or after cache expiry), # we fetch all rows from pg_stat_statements, and update the initial state of relevant # caches. @@ -561,7 +538,6 @@ def _collect_metrics_rows(self): self._query_calls_cache.set_calls(rows) self._apply_called_queries(rows) else: - self._log.info("[AMW] incremental queries") # When we do have baseline metrics, use them to construct the full set of rows # so that compute_derivative_rows can merge duplicates and calculate deltas. self._check_called_queries() @@ -576,7 +552,7 @@ def _collect_metrics_rows(self): available_columns = set(rows[0].keys()) metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS - rows = self._state.compute_derivative_rows(rows, metric_columns, self._log, key=_row_key) + rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key) self._check.gauge( 'dd.postgres.queries.query_rows_raw', len(rows), From eff72a3c0eee0a9828cc4ff2682acc132fff06a7 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 17 May 2024 00:15:11 +0000 Subject: [PATCH 33/35] Lint fixes --- postgres/datadog_checks/postgres/statements.py | 4 ++-- postgres/tests/test_apply_called_queries.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 271aeb6c35079..733022c1eb90e 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -476,7 +476,7 @@ def _emit_pg_stat_statements_metrics(self): self._log.warning("Failed to query for pg_stat_statements count: %s", e) def _baseline_metrics_query_key(self, row): - return _row_key(row) + (row['queryid'], ) + return _row_key(row) + (row['queryid'],) # _apply_called_queries expects normalized rows before any merging of duplicates. # It takes the incremental pg_stat_statements rows and constructs the full set of rows @@ -521,7 +521,7 @@ def _check_baseline_metrics_expiry(self): def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics() self._emit_pg_stat_statements_dealloc() - + self._check_baseline_metrics_expiry() rows = [] if (not self._config.incremental_query_metrics) or self._check.version < V10: diff --git a/postgres/tests/test_apply_called_queries.py b/postgres/tests/test_apply_called_queries.py index 01b270d753b6e..7a00616db893b 100644 --- a/postgres/tests/test_apply_called_queries.py +++ b/postgres/tests/test_apply_called_queries.py @@ -42,6 +42,7 @@ def test_apply_called_queries_multiple_runs(pg_instance, integration_check): {'queryid': 2, 'query_signature': 'abc', 'calls': 3, 'query': 'query 123', 'datname': 'db', 'rolname': 'user'}, ] + def test_apply_called_queries_multiple_dbs(pg_instance, integration_check): check = integration_check(pg_instance) From 89c92939713aeb460ea1f6c5446d532e5a786ae0 Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 17 May 2024 01:37:55 +0000 Subject: [PATCH 34/35] Remove more logs --- postgres/datadog_checks/postgres/statements.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 733022c1eb90e..b83c1889bd19e 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -527,14 +527,12 @@ def _collect_metrics_rows(self): if (not self._config.incremental_query_metrics) or self._check.version < V10: rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) - self.log_rows(rows) elif len(self._baseline_metrics) == 0: # When we don't have baseline metrics (either on the first run or after cache expiry), # we fetch all rows from pg_stat_statements, and update the initial state of relevant # caches. rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) - self.log_rows(rows) self._query_calls_cache.set_calls(rows) self._apply_called_queries(rows) else: @@ -544,7 +542,6 @@ def _collect_metrics_rows(self): rows = self._load_pg_stat_statements() rows = self._normalize_queries(rows) rows = self._apply_called_queries(rows) - self.log_rows(rows) if not rows: return [] From 752355200964ed7696e6b3723f3c92700068ae1c Mon Sep 17 00:00:00 2001 From: "alex.weisberger@datadoghq.com" Date: Fri, 17 May 2024 02:02:07 +0000 Subject: [PATCH 35/35] Expire baseline metrics cache based on size. --- postgres/datadog_checks/postgres/statements.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index b83c1889bd19e..731496ba64908 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -512,11 +512,19 @@ def _check_baseline_metrics_expiry(self): if ( self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time() + or len(self._baseline_metrics) > 3 * int(self._check.pg_settings.get("pg_stat_statements.max")) ): self._baseline_metrics = {} self._query_calls_cache = QueryCallsCache() self._last_baseline_metrics_expiry = time.time() + self._check.count( + "dd.postgres.statement_metrics.baseline_metrics_cache_reset", + 1, + tags=self.tags + self._check._get_debug_tags(), + hostname=self._check.resolved_hostname, + ) + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _collect_metrics_rows(self): self._emit_pg_stat_statements_metrics()