Skip to content

Postgres Performance Optimization: Cache baseline metrics and apply updates incrementaly #17554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
592f3fb
Revert "Revert "[SDBM-842] Postgres integration performance optimizat…
amw-zero May 2, 2024
cb6a051
Logging
amw-zero May 7, 2024
2dcfba6
Add targeted logging
amw-zero May 7, 2024
d586d38
Improve logging
amw-zero May 7, 2024
32d27dc
Observe logs based on querying all pgss rows.
amw-zero May 7, 2024
e552d9c
Reproduce inflated metrics in test.
amw-zero May 10, 2024
48d4bd9
Keep baseline metric cache.
amw-zero May 10, 2024
d89db22
Replace baseline outright
amw-zero May 10, 2024
e957ee0
Remove unneeded setup and logs
amw-zero May 10, 2024
d364a1d
Remove logs
amw-zero May 10, 2024
4744a92
Changelog
amw-zero May 10, 2024
22f1803
Expire metrics caches to prevent unbounded growth.
amw-zero May 10, 2024
1eb7dc9
Create one row per queryid in _apply_deltas.
amw-zero May 12, 2024
390ea86
Logging
amw-zero May 13, 2024
7c2e462
Copy row to prevent mutation.
amw-zero May 13, 2024
af13bdb
Remove logging
amw-zero May 13, 2024
ec20dd4
Auto lint
amw-zero May 13, 2024
25eb3a3
Store called queryids within the QueryCallsCache
amw-zero May 13, 2024
19e95c7
Delete query text.
amw-zero May 14, 2024
34dd2fa
This test doesn't work on v9
amw-zero May 14, 2024
ad9c4bf
Queryids are guaranteed, so can skip this check.
amw-zero May 14, 2024
3ef0940
Fetch all pgss rows initially to avoid unnecessary filtering in the e…
amw-zero May 14, 2024
2630e2f
Opt in to performance optimization
amw-zero May 14, 2024
e44b0b3
Lint fixes
amw-zero May 14, 2024
c825983
Account for function rename
amw-zero May 14, 2024
217358d
Account for new set_calls pattern.
amw-zero May 14, 2024
d252c9a
Spec default
amw-zero May 14, 2024
8bdf3c6
Properly expire baseline metrics cache.
amw-zero May 15, 2024
6edb871
Loggin
amw-zero May 15, 2024
cdefa8e
wip
amw-zero May 16, 2024
3ebcfbf
Fix query text.
amw-zero May 16, 2024
dbc72b2
Remove logging
amw-zero May 16, 2024
eff72a3
Lint fixes
amw-zero May 17, 2024
89c9293
Remove more logs
amw-zero May 17, 2024
7523552
Expire baseline metrics cache based on size.
amw-zero May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions postgres/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,24 @@ files:
value:
type: number
example: 10000
- name: incremental_query_metrics
hidden: true
description: |
Enable an experimental performance optimization that reduces the amount of data queried from `pg_stat_statements`
when calculating query metrics. This option is only available for PostgreSQL 10.0 and above.
value:
type: boolean
display_default: false
example: false
- name: baseline_metrics_expiry
hidden: true
description: |
Set the cache expiry time (in seconds) for the baseline for query metrics. This option is only available when
`incremental_query_metrics` is enabled.
value:
type: number
display_default: 300
example: 300
- name: query_samples
description: Configure collection of query samples
options:
Expand Down
1 change: 1 addition & 0 deletions postgres/changelog.d/17554.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This un-reverts https://github.com/DataDog/integrations-core/pull/17187, which was reverted due to a flaw in the logic that could result in improper metric counts. The fixed version caches an initial state of pg_stat_statements and updates it with incremental partial query data. This allows the agent to only fetch full rows for queries that were called between check runs, while also accounting for queryids that can map to the same query_signature.
4 changes: 4 additions & 0 deletions postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ def __init__(self, instance, init_config):
self.log_unobfuscated_queries = is_affirmative(instance.get('log_unobfuscated_queries', False))
self.log_unobfuscated_plans = is_affirmative(instance.get('log_unobfuscated_plans', False))
self.database_instance_collection_interval = instance.get('database_instance_collection_interval', 300)
self.incremental_query_metrics = is_affirmative(
self.statement_metrics_config.get('incremental_query_metrics', False)
)
self.baseline_metrics_expiry = self.statement_metrics_config.get('baseline_metrics_expiry', 300)

def _build_tags(self, custom_tags, agent_tags, propagate_agent_tags=True):
# Clean up tags in case there was a None entry in the instance
Expand Down
2 changes: 2 additions & 0 deletions postgres/datadog_checks/postgres/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ class QueryMetrics(BaseModel):
arbitrary_types_allowed=True,
frozen=True,
)
baseline_metrics_expiry: Optional[float] = None
collection_interval: Optional[float] = None
enabled: Optional[bool] = None
incremental_query_metrics: Optional[bool] = None
pg_stat_statements_max_warning_threshold: Optional[float] = None


Expand Down
47 changes: 47 additions & 0 deletions postgres/datadog_checks/postgres/query_calls_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# (C) Datadog, Inc. 2024-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
class QueryCallsCache:
"""Maintains a cache of the last-known number of calls per queryid, as per pg_stat_statements"""

def __init__(self):
self.cache = {}
self.next_cache = {}
self.called_queryids = []
self.next_called_queryids = set()

def end_query_call_snapshot(self):
"""To prevent evicted statements from building up in the cache we
replace the cache outright after each sampling of pg_stat_statements."""
self.cache = self.next_cache
self.next_cache = {}
self.called_queryids = self.next_called_queryids
self.next_called_queryids = set()

def set_calls(self, rows):
"""Updates the cache of calls per query id.

Returns whether or not the number of calls changed based on
the newly updated value. The first seen update for a queryid
does not count as a change in values since that would result
in an inflated value."""
for row in rows:
queryid = row['queryid']
calls = row['calls']
calls_changed = False

if queryid in self.cache:
diff = calls - self.cache[queryid]
# Positive deltas mean the statement remained in pg_stat_statements
# between check calls. Negative deltas mean the statement was evicted
# and replaced with a new call count. Both cases should count as a call
# change.
calls_changed = diff != 0
else:
calls_changed = True

self.next_cache[queryid] = calls
if calls_changed:
self.next_called_queryids.add(queryid)

self.end_query_call_snapshot()
170 changes: 155 additions & 15 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
from datadog_checks.base.utils.tracking import tracked_method
from datadog_checks.postgres.cursor import CommenterCursor, CommenterDictCursor

from .query_calls_cache import QueryCallsCache
from .util import DatabaseConfigurationError, payload_pg_version, warning_with_tags
from .version_utils import V9_4, V14
from .version_utils import V9_4, V10, V14

try:
import datadog_agent
except ImportError:
from ..stubs import datadog_agent

QUERYID_TO_CALLS_QUERY = """
SELECT queryid, calls
FROM {pg_stat_statements_view}
WHERE queryid IS NOT NULL
"""

STATEMENTS_QUERY = """
SELECT {cols}
FROM {pg_stat_statements_view} as pg_stat_statements
Expand All @@ -36,10 +43,33 @@
ON pg_stat_statements.dbid = pg_database.oid
WHERE query != '<insufficient privilege>'
AND query NOT LIKE 'EXPLAIN %%'
{queryid_filter}
{filters}
{extra_clauses}
"""


def statements_query(**kwargs):
pg_stat_statements_view = kwargs.get('pg_stat_statements_view', 'pg_stat_statements')
cols = kwargs.get('cols', '*')
filters = kwargs.get('filters', '')
extra_clauses = kwargs.get('extra_clauses', '')
called_queryids = kwargs.get('called_queryids', [])

queryid_filter = ""
if len(called_queryids) > 0:
queryid_filter = f"AND queryid = ANY('{{ {called_queryids} }}'::bigint[])"

return STATEMENTS_QUERY.format(
cols=cols,
pg_stat_statements_view=pg_stat_statements_view,
filters=filters,
extra_clauses=extra_clauses,
queryid_filter=queryid_filter,
called_queryids=called_queryids,
)


# Use pg_stat_statements(false) when available as an optimization to avoid pulling SQL text from disk
PG_STAT_STATEMENTS_COUNT_QUERY = "SELECT COUNT(*) FROM pg_stat_statements(false)"
PG_STAT_STATEMENTS_COUNT_QUERY_LT_9_4 = "SELECT COUNT(*) FROM pg_stat_statements"
Expand Down Expand Up @@ -146,6 +176,9 @@ def __init__(self, check, config, shutdown_callback):
self.tags = None
self._state = StatementMetrics()
self._stat_column_cache = []
self._query_calls_cache = QueryCallsCache()
self._baseline_metrics = {}
self._last_baseline_metrics_expiry = None
self._track_io_timing_cache = None
self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options))
# full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature
Expand Down Expand Up @@ -176,8 +209,10 @@ def _get_pg_stat_statements_columns(self):
return self._stat_column_cache

# Querying over '*' with limit 0 allows fetching only the column names from the cursor without data
query = STATEMENTS_QUERY.format(
cols='*', pg_stat_statements_view=self._config.pg_stat_statements_view, extra_clauses="LIMIT 0", filters=""
query = statements_query(
cols='*',
pg_stat_statements_view=self._config.pg_stat_statements_view,
extra_clauses="LIMIT 0",
)
with self._check._get_main_db() as conn:
with conn.cursor(cursor_factory=CommenterCursor) as cursor:
Expand All @@ -186,6 +221,29 @@ def _get_pg_stat_statements_columns(self):
self._stat_column_cache = col_names
return col_names

def _check_called_queries(self):
pgss_view_without_query_text = self._config.pg_stat_statements_view
if pgss_view_without_query_text == "pg_stat_statements":
# Passing false for the showtext argument leads to a huge performance increase. This
# allows the engine to avoid retrieving the potentially large amount of text data.
# The query count query does not depend on the statement text, so it's safe for this use case.
# For more info: https://www.postgresql.org/docs/current/pgstatstatements.html#PGSTATSTATEMENTS-FUNCS
pgss_view_without_query_text = "pg_stat_statements(false)"

with self._check._get_main_db() as conn:
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text)
rows = self._execute_query(cursor, query, params=(self._config.dbname,))
self._query_calls_cache.set_calls(rows)
self._check.gauge(
"dd.postgresql.pg_stat_statements.calls_changed",
len(self._query_calls_cache.called_queryids),
tags=self.tags,
hostname=self._check.resolved_hostname,
)

return self._query_calls_cache.called_queryids

def run_job(self):
# do not emit any dd.internal metrics for DBM specific check code
self.tags = [t for t in self._tags if not t.startswith('dd.internal')]
Expand Down Expand Up @@ -289,16 +347,27 @@ def _load_pg_stat_statements(self):
params = params + tuple(self._config.ignore_databases)
with self._check._get_main_db() as conn:
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
return self._execute_query(
cursor,
STATEMENTS_QUERY.format(
cols=', '.join(query_columns),
pg_stat_statements_view=self._config.pg_stat_statements_view,
filters=filters,
extra_clauses="",
),
params=params,
)
if len(self._query_calls_cache.cache) > 0:
return self._execute_query(
cursor,
statements_query(
cols=', '.join(query_columns),
pg_stat_statements_view=self._config.pg_stat_statements_view,
filters=filters,
called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]),
),
params=params,
)
else:
return self._execute_query(
cursor,
statements_query(
cols=', '.join(query_columns),
pg_stat_statements_view=self._config.pg_stat_statements_view,
filters=filters,
),
params=params,
)
except psycopg2.Error as e:
error_tag = "error:database-{}".format(type(e).__name__)

Expand Down Expand Up @@ -406,25 +475,96 @@ def _emit_pg_stat_statements_metrics(self):
except psycopg2.Error as e:
self._log.warning("Failed to query for pg_stat_statements count: %s", e)

def _baseline_metrics_query_key(self, row):
return _row_key(row) + (row['queryid'],)

# _apply_called_queries expects normalized rows before any merging of duplicates.
# It takes the incremental pg_stat_statements rows and constructs the full set of rows
# by adding the existing values in the baseline_metrics cache. This is equivalent to
# fetching the full set of rows from pg_stat_statements, but we avoid paying the price of
# actually querying the rows.
def _apply_called_queries(self, rows):
# Apply called queries to baseline_metrics
for row in rows:
baseline_row = copy.copy(row)
key = self._baseline_metrics_query_key(row)

# To avoid high memory usage, don't cache the query text since it can be large.
del baseline_row['query']
self._baseline_metrics[key] = baseline_row

# Apply query text for called queries since it is not cached and uncalled queries won't get result
# in sent metrics.
query_text = {row['query_signature']: row['query'] for row in rows}
applied_rows = []
for row in self._baseline_metrics.values():
query_signature = row['query_signature']
if query_signature in query_text:
applied_rows.append({**row, 'query': query_text[query_signature]})
else:
applied_rows.append(copy.copy(row))

return applied_rows

# To prevent the baseline metrics cache from growing indefinitely (as can happen) because of
# pg_stat_statements eviction), we clear it out periodically to force a full refetch.
def _check_baseline_metrics_expiry(self):
if (
self._last_baseline_metrics_expiry is None
or self._last_baseline_metrics_expiry + self._config.baseline_metrics_expiry < time.time()
or len(self._baseline_metrics) > 3 * int(self._check.pg_settings.get("pg_stat_statements.max"))
):
self._baseline_metrics = {}
self._query_calls_cache = QueryCallsCache()
self._last_baseline_metrics_expiry = time.time()

self._check.count(
"dd.postgres.statement_metrics.baseline_metrics_cache_reset",
1,
tags=self.tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _collect_metrics_rows(self):
self._emit_pg_stat_statements_metrics()
self._emit_pg_stat_statements_dealloc()
rows = self._load_pg_stat_statements()

rows = self._normalize_queries(rows)
self._check_baseline_metrics_expiry()
rows = []
if (not self._config.incremental_query_metrics) or self._check.version < V10:
rows = self._load_pg_stat_statements()
rows = self._normalize_queries(rows)
elif len(self._baseline_metrics) == 0:
# When we don't have baseline metrics (either on the first run or after cache expiry),
# we fetch all rows from pg_stat_statements, and update the initial state of relevant
# caches.
rows = self._load_pg_stat_statements()
rows = self._normalize_queries(rows)
self._query_calls_cache.set_calls(rows)
self._apply_called_queries(rows)
else:
# When we do have baseline metrics, use them to construct the full set of rows
# so that compute_derivative_rows can merge duplicates and calculate deltas.
self._check_called_queries()
rows = self._load_pg_stat_statements()
rows = self._normalize_queries(rows)
rows = self._apply_called_queries(rows)

if not rows:
return []

available_columns = set(rows[0].keys())
metric_columns = available_columns & PG_STAT_STATEMENTS_METRICS_COLUMNS

rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key)
self._check.gauge(
'dd.postgres.queries.query_rows_raw',
len(rows),
tags=self.tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

return rows

def _normalize_queries(self, rows):
Expand Down
Loading
Loading