Skip to content

Commit a39c641

Browse files
authored
Gather and transmit Postgres schema data in segments (#17381)
1 parent fcdefa3 commit a39c641

File tree

6 files changed

+240
-130
lines changed

6 files changed

+240
-130
lines changed

postgres/changelog.d/17381.fixed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved performance of database schema collection.

postgres/datadog_checks/postgres/metadata.py

Lines changed: 158 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from datadog_checks.base import is_affirmative
1818
from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding
1919
from datadog_checks.base.utils.tracking import tracked_method
20+
from datadog_checks.postgres.util import get_list_chunks
2021

2122
from .util import payload_pg_version
2223
from .version_utils import VersionUtils
@@ -58,7 +59,7 @@
5859
PG_TABLES_QUERY_V10_PLUS = """
5960
SELECT c.oid AS id,
6061
c.relname AS name,
61-
c.relhasindex AS hasindexes,
62+
c.relhasindex AS has_indexes,
6263
c.relowner :: regrole AS owner,
6364
( CASE
6465
WHEN c.relkind = 'p' THEN TRUE
@@ -76,7 +77,7 @@
7677
PG_TABLES_QUERY_V9 = """
7778
SELECT c.oid AS id,
7879
c.relname AS name,
79-
c.relhasindex AS hasindexes,
80+
c.relhasindex AS has_indexes,
8081
c.relowner :: regrole AS owner,
8182
t.relname AS toast_table
8283
FROM pg_class c
@@ -102,9 +103,10 @@
102103

103104
PG_INDEXES_QUERY = """
104105
SELECT indexname AS NAME,
105-
indexdef AS definition
106+
indexdef AS definition,
107+
tablename
106108
FROM pg_indexes
107-
WHERE tablename LIKE '{tablename}';
109+
WHERE {table_names_like};
108110
"""
109111

110112
PG_CHECK_FOR_FOREIGN_KEY = """
@@ -116,22 +118,24 @@
116118

117119
PG_CONSTRAINTS_QUERY = """
118120
SELECT conname AS name,
119-
pg_get_constraintdef(oid) AS definition
121+
pg_get_constraintdef(oid) AS definition,
122+
conrelid AS id
120123
FROM pg_constraint
121124
WHERE contype = 'f'
122-
AND conrelid = {oid};
125+
AND conrelid IN ({table_ids});
123126
"""
124127

125128
COLUMNS_QUERY = """
126129
SELECT attname AS name,
127130
Format_type(atttypid, atttypmod) AS data_type,
128131
NOT attnotnull AS nullable,
129-
pg_get_expr(adbin, adrelid) AS default
132+
pg_get_expr(adbin, adrelid) AS default,
133+
attrelid AS id
130134
FROM pg_attribute
131135
LEFT JOIN pg_attrdef ad
132136
ON adrelid = attrelid
133137
AND adnum = attnum
134-
WHERE attrelid = {oid}
138+
WHERE attrelid IN ({table_ids})
135139
AND attnum > 0
136140
AND NOT attisdropped;
137141
"""
@@ -140,13 +144,14 @@
140144
SELECT relname,
141145
pg_get_partkeydef(oid) AS partition_key
142146
FROM pg_class
143-
WHERE '{parent}' = relname;
147+
WHERE relname in ({table_names});
144148
"""
145149

146150
NUM_PARTITIONS_QUERY = """
147-
SELECT count(inhrelid :: regclass) AS num_partitions
151+
SELECT count(inhrelid :: regclass) AS num_partitions, inhparent as id
148152
FROM pg_inherits
149-
WHERE inhparent = {parent_oid};
153+
WHERE inhparent IN ({table_ids})
154+
GROUP BY inhparent;
150155
"""
151156

152157
PARTITION_ACTIVITY_QUERY = """
@@ -174,26 +179,27 @@ class PostgresMetadata(DBMAsyncJob):
174179
"""
175180

176181
def __init__(self, check, config, shutdown_callback):
177-
self.pg_settings_collection_interval = config.settings_metadata_config.get(
178-
"collection_interval", DEFAULT_SETTINGS_COLLECTION_INTERVAL
179-
)
180182
self.pg_settings_ignored_patterns = config.settings_metadata_config.get(
181183
"ignored_settings_patterns", DEFAULT_SETTINGS_IGNORED_PATTERNS
182184
)
185+
self.pg_settings_collection_interval = config.settings_metadata_config.get(
186+
"collection_interval", DEFAULT_SETTINGS_COLLECTION_INTERVAL
187+
)
183188
self.schemas_collection_interval = config.schemas_metadata_config.get(
184189
"collection_interval", DEFAULT_SCHEMAS_COLLECTION_INTERVAL
185190
)
186-
187-
collection_interval = config.resources_metadata_config.get(
191+
resources_collection_interval = config.resources_metadata_config.get(
188192
"collection_interval", DEFAULT_RESOURCES_COLLECTION_INTERVAL
189193
)
190194

191195
# by default, send resources every 5 minutes
192-
self.collection_interval = min(collection_interval, self.pg_settings_collection_interval)
196+
self.collection_interval = min(
197+
resources_collection_interval, self.pg_settings_collection_interval, self.schemas_collection_interval
198+
)
193199

194200
super(PostgresMetadata, self).__init__(
195201
check,
196-
rate_limit=1 / self.collection_interval,
202+
rate_limit=1 / float(self.collection_interval),
197203
run_sync=is_affirmative(config.settings_metadata_config.get("run_sync", False)),
198204
enabled=is_affirmative(config.resources_metadata_config.get("enabled", True)),
199205
dbms="postgres",
@@ -207,9 +213,10 @@ def __init__(self, check, config, shutdown_callback):
207213
self.db_pool = self._check.db_pool
208214
self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get("enabled", False))
209215
self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get("enabled", False))
216+
self._is_schemas_collection_in_progress = False
210217
self._pg_settings_cached = None
211218
self._time_since_last_settings_query = 0
212-
self._time_since_last_schemas_query = 0
219+
self._last_schemas_query_time = 0
213220
self._conn_ttl_ms = self._config.idle_connection_timeout
214221
self._tags_no_db = None
215222
self.tags = None
@@ -253,24 +260,65 @@ def report_postgres_metadata(self):
253260
}
254261
self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding))
255262

256-
elapsed_s_schemas = time.time() - self._time_since_last_schemas_query
257-
if elapsed_s_schemas >= self.schemas_collection_interval and self._collect_schemas_enabled:
258-
metadata = self._collect_schema_info()
259-
event = {
263+
elapsed_s_schemas = time.time() - self._last_schemas_query_time
264+
if (
265+
self._collect_schemas_enabled
266+
and not self._is_schemas_collection_in_progress
267+
and elapsed_s_schemas >= self.schemas_collection_interval
268+
):
269+
self._is_schemas_collection_in_progress = True
270+
schema_metadata = self._collect_schema_info()
271+
# We emit an event for each batch of tables to reduce total data in memory and keep event size reasonable
272+
base_event = {
260273
"host": self._check.resolved_hostname,
261274
"agent_version": datadog_agent.get_version(),
262275
"dbms": "postgres",
263276
"kind": "pg_databases",
264277
"collection_interval": self.schemas_collection_interval,
265278
"dbms_version": self._payload_pg_version(),
266279
"tags": self._tags_no_db,
267-
"timestamp": time.time() * 1000,
268-
"metadata": metadata,
269280
"cloud_metadata": self._config.cloud_metadata,
270281
}
271-
json_event = json.dumps(event, default=default_json_event_encoding)
272-
self._log.debug("Reporting the following payload for schema collection: {}".format(json_event))
273-
self._check.database_monitoring_metadata(json_event)
282+
283+
# Tuned from experiments on staging, we may want to make this dynamic based on schema size in the future
284+
chunk_size = 50
285+
286+
for database in schema_metadata:
287+
dbname = database["name"]
288+
with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
289+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
290+
for schema in database["schemas"]:
291+
tables = self._query_tables_for_schema(cursor, schema["id"], dbname)
292+
table_chunks = list(get_list_chunks(tables, chunk_size))
293+
294+
buffer_column_count = 0
295+
tables_buffer = []
296+
297+
for tables in table_chunks:
298+
table_info = self._query_table_information(cursor, tables)
299+
300+
tables_buffer = [*tables_buffer, *table_info]
301+
for t in table_info:
302+
buffer_column_count += len(t.get("columns", []))
303+
304+
if buffer_column_count >= 100_000:
305+
self._flush_schema(base_event, database, schema, tables_buffer)
306+
tables_buffer = []
307+
buffer_column_count = 0
308+
309+
if len(tables_buffer) > 0:
310+
self._flush_schema(base_event, database, schema, tables_buffer)
311+
self._is_schemas_collection_in_progress = False
312+
313+
def _flush_schema(self, base_event, database, schema, tables):
314+
event = {
315+
**base_event,
316+
"metadata": [{**database, "schemas": [{**schema, "tables": tables}]}],
317+
"timestamp": time.time() * 1000,
318+
}
319+
json_event = json.dumps(event, default=default_json_event_encoding)
320+
self._log.debug("Reporting the following payload for schema collection: {}".format(json_event))
321+
self._check.database_monitoring_metadata(json_event)
274322

275323
def _payload_pg_version(self):
276324
version = self._check.version
@@ -289,7 +337,7 @@ def _collect_schema_info(self):
289337
for database in databases:
290338
metadata.append(self._collect_metadata_for_database(database))
291339

292-
self._time_since_last_schemas_query = time.time()
340+
self._last_schemas_query_time = time.time()
293341
return metadata
294342

295343
def _query_database_information(
@@ -356,26 +404,61 @@ def sort_tables(info):
356404
or not info["has_partitions"]
357405
):
358406
# if we don't have metrics in our cache for this table, return 0
359-
table_data = cache[dbname].get(
407+
table_data = cache.get(dbname, {}).get(
360408
info["name"],
361409
{"postgresql.index_scans": 0, "postgresql.seq_scans": 0},
362410
)
363-
return table_data["postgresql.index_scans"] + table_data["postgresql.seq_scans"]
411+
return table_data.get("postgresql.index_scans", 0) + table_data.get("postgresql.seq_scans", 0)
364412
else:
365413
# get activity
366414
cursor.execute(PARTITION_ACTIVITY_QUERY.format(parent_oid=info["id"]))
367415
row = cursor.fetchone()
368-
return row.get("total_activity", 0) if row else 0
416+
return row.get("total_activity", 0) if row is not None else 0
417+
418+
# We only sort to filter by top so no need to waste resources if we're going to return everything
419+
if len(table_info) <= limit:
420+
return table_info
369421

370422
# if relation metrics are enabled, sorted based on last activity information
371423
table_info = sorted(table_info, key=sort_tables, reverse=True)
372424
return table_info[:limit]
373425

374-
def _query_table_information_for_schema(
426+
def _query_tables_for_schema(
375427
self, cursor: psycopg2.extensions.cursor, schema_id: str, dbname: str
376428
) -> List[Dict[str, Union[str, Dict]]]:
377429
"""
378-
Collect table information per schema. Returns a list of dictionaries
430+
Collect list of tables for a schema. Returns a list of dictionaries
431+
with key/values:
432+
"id": str
433+
"name": str
434+
"owner": str
435+
"has_indexes: bool
436+
"has_partitions: bool
437+
"toast_table": str (if associated toast table exists)
438+
"num_partitions": int (if has partitions)
439+
440+
"""
441+
tables_info = self._get_table_info(cursor, dbname, schema_id)
442+
table_payloads = []
443+
for table in tables_info:
444+
this_payload = {}
445+
this_payload.update({"id": str(table["id"])})
446+
this_payload.update({"name": table["name"]})
447+
this_payload.update({"owner": table["owner"]})
448+
this_payload.update({"has_indexes": table["has_indexes"]})
449+
this_payload.update({"has_partitions": table.get("has_partitions", False)})
450+
if table["toast_table"] is not None:
451+
this_payload.update({"toast_table": table["toast_table"]})
452+
453+
table_payloads.append(this_payload)
454+
455+
return table_payloads
456+
457+
def _query_table_information(
458+
self, cursor: psycopg2.extensions.cursor, table_info: List[Dict[str, Union[str, bool]]]
459+
) -> List[Dict[str, Union[str, Dict]]]:
460+
"""
461+
Collect table information . Returns a dictionary
379462
with key/values:
380463
"id": str
381464
"name": str
@@ -395,55 +478,51 @@ def _query_table_information_for_schema(
395478
"partition_key": str (if has partitions)
396479
"num_partitions": int (if has partitions)
397480
"""
398-
tables_info = self._get_table_info(cursor, dbname, schema_id)
399-
table_payloads = []
400-
for table in tables_info:
401-
this_payload = {}
402-
name = table["name"]
403-
table_id = table["id"]
404-
table_owner = table["owner"]
405-
this_payload.update({"id": str(table["id"])})
406-
this_payload.update({"name": name})
407-
this_payload.update({"owner": table_owner})
408-
if table["hasindexes"]:
409-
cursor.execute(PG_INDEXES_QUERY.format(tablename=name))
410-
rows = cursor.fetchall()
411-
idxs = [dict(row) for row in rows]
412-
this_payload.update({"indexes": idxs})
413-
414-
if VersionUtils.transform_version(str(self._check.version))["version.major"] != "9":
415-
if table["has_partitions"]:
416-
cursor.execute(PARTITION_KEY_QUERY.format(parent=name))
417-
row = cursor.fetchone()
418-
this_payload.update({"partition_key": row["partition_key"]})
419-
420-
cursor.execute(NUM_PARTITIONS_QUERY.format(parent_oid=table_id))
421-
row = cursor.fetchone()
422-
this_payload.update({"num_partitions": row["num_partitions"]})
423-
424-
if table["toast_table"] is not None:
425-
this_payload.update({"toast_table": table["toast_table"]})
481+
tables = {t.get("name"): {**t, "num_partitions": 0} for t in table_info}
482+
table_name_lookup = {t.get("id"): t.get("name") for t in table_info}
483+
table_ids = ",".join(["'{}'".format(t.get("id")) for t in table_info])
484+
table_names = ",".join(["'{}'".format(t.get("name")) for t in table_info])
485+
table_names_like = " OR ".join(["tablename LIKE '{}%'".format(t.get("name")) for t in table_info])
486+
487+
# Get indexes
488+
cursor.execute(PG_INDEXES_QUERY.format(table_names_like=table_names_like))
489+
rows = cursor.fetchall()
490+
for row in rows:
491+
# Partition indexes in some versions of Postgres have appended digits for each partition
492+
table_name = row.get("tablename")
493+
while tables.get(table_name) is None and len(table_name) > 1 and table_name[-1].isdigit():
494+
table_name = table_name[0:-1]
495+
if tables.get(table_name) is not None:
496+
tables.get(table_name)["indexes"] = tables.get(table_name).get("indexes", []) + [dict(row)]
497+
498+
# Get partitions
499+
if VersionUtils.transform_version(str(self._check.version))["version.major"] != "9":
500+
cursor.execute(PARTITION_KEY_QUERY.format(table_names=table_names))
501+
rows = cursor.fetchall()
502+
for row in rows:
503+
tables.get(row.get("relname"))["partition_key"] = row.get("partition_key")
426504

427-
# Get foreign keys
428-
cursor.execute(PG_CHECK_FOR_FOREIGN_KEY.format(oid=table_id))
429-
row = cursor.fetchone()
430-
if row["count"] > 0:
431-
cursor.execute(PG_CONSTRAINTS_QUERY.format(oid=table_id))
432-
rows = cursor.fetchall()
433-
if rows:
434-
fks = [dict(row) for row in rows]
435-
this_payload.update({"foreign_keys": fks})
505+
cursor.execute(NUM_PARTITIONS_QUERY.format(table_ids=table_ids))
506+
rows = cursor.fetchall()
507+
for row in rows:
508+
table_name = table_name_lookup.get(str(row.get("id")))
509+
tables.get(table_name)["num_partitions"] = row.get("num_partitions", 0)
436510

437-
# Get columns
438-
cursor.execute(COLUMNS_QUERY.format(oid=table_id))
439-
rows = cursor.fetchall()[:]
440-
max_columns = self._config.schemas_metadata_config.get("max_columns", 50)
441-
columns = [dict(row) for row in rows][:max_columns]
442-
this_payload.update({"columns": columns})
511+
# Get foreign keys
512+
cursor.execute(PG_CONSTRAINTS_QUERY.format(table_ids=table_ids))
513+
rows = cursor.fetchall()
514+
for row in rows:
515+
table_name = table_name_lookup.get(str(row.get("id")))
516+
tables.get(table_name)["foreign_keys"] = tables.get(table_name).get("foreign_keys", []) + [dict(row)]
443517

444-
table_payloads.append(this_payload)
518+
# Get columns
519+
cursor.execute(COLUMNS_QUERY.format(table_ids=table_ids))
520+
rows = cursor.fetchall()
521+
for row in rows:
522+
table_name = table_name_lookup.get(str(row.get("id")))
523+
tables.get(table_name)["columns"] = tables.get(table_name).get("columns", []) + [dict(row)]
445524

446-
return table_payloads
525+
return tables.values()
447526

448527
def _collect_metadata_for_database(self, dbname):
449528
metadata = {}
@@ -462,8 +541,6 @@ def _collect_metadata_for_database(self, dbname):
462541
)
463542
schema_info = self._query_schema_information(cursor, dbname)
464543
for schema in schema_info:
465-
tables_info = self._query_table_information_for_schema(cursor, schema["id"], dbname)
466-
schema.update({"tables": tables_info})
467544
metadata["schemas"].append(schema)
468545

469546
return metadata

0 commit comments

Comments
 (0)