diff --git a/sqlserver/assets/configuration/spec.yaml b/sqlserver/assets/configuration/spec.yaml index 0126a1af7f63a..01df2387bff92 100644 --- a/sqlserver/assets/configuration/spec.yaml +++ b/sqlserver/assets/configuration/spec.yaml @@ -713,6 +713,30 @@ files: type: number example: 1800 display_default: false + - name: schemas_collection + description: | + Configure collection of schemas. If `database_autodiscovery` is not enabled, data is collected + only for the database configured with `database` parameter. + options: + - name: enabled + description: | + Enable schema collection. Requires `dbm: true`. Defaults to false. + value: + type: boolean + example: false + - name: collection_interval + description: | + Set the database schema collection interval (in seconds). Defaults to 600 seconds. + value: + type: number + example: 600 + - name: max_execution_time + description: | + Set the maximum time for schema collection (in seconds). Defaults to 10 seconds. + Capped by `schemas_collection.collection_interval` + value: + type: number + example: 10 - template: instances/default - template: logs example: diff --git a/sqlserver/changelog.d/17258.added b/sqlserver/changelog.d/17258.added new file mode 100644 index 0000000000000..ac15210ed68ba --- /dev/null +++ b/sqlserver/changelog.d/17258.added @@ -0,0 +1,2 @@ +Adding schema collection to sqlserver +Schema data includes information about the tables, their columns, indexes, foreign keys, and partitions. diff --git a/sqlserver/datadog_checks/sqlserver/config.py b/sqlserver/datadog_checks/sqlserver/config.py index 99c3a12aa52ed..de7dcfea0aa4e 100644 --- a/sqlserver/datadog_checks/sqlserver/config.py +++ b/sqlserver/datadog_checks/sqlserver/config.py @@ -7,7 +7,10 @@ from datadog_checks.base.config import is_affirmative from datadog_checks.base.utils.common import to_native_string -from datadog_checks.sqlserver.const import DEFAULT_AUTODISCOVERY_INTERVAL, PROC_CHAR_LIMIT +from datadog_checks.sqlserver.const import ( + DEFAULT_AUTODISCOVERY_INTERVAL, + PROC_CHAR_LIMIT, +) class SQLServerConfig: @@ -45,6 +48,7 @@ def __init__(self, init_config, instance, log): self.procedure_metrics_config: dict = instance.get('procedure_metrics', {}) or {} self.settings_config: dict = instance.get('collect_settings', {}) or {} self.activity_config: dict = instance.get('query_activity', {}) or {} + self.schema_config: dict = instance.get('schemas_collection', {}) or {} self.cloud_metadata: dict = {} aws: dict = instance.get('aws', {}) or {} gcp: dict = instance.get('gcp', {}) or {} diff --git a/sqlserver/datadog_checks/sqlserver/config_models/instance.py b/sqlserver/datadog_checks/sqlserver/config_models/instance.py index bdd5621c46a57..44d971fabc633 100644 --- a/sqlserver/datadog_checks/sqlserver/config_models/instance.py +++ b/sqlserver/datadog_checks/sqlserver/config_models/instance.py @@ -139,6 +139,16 @@ class QueryMetrics(BaseModel): samples_per_hour_per_query: Optional[int] = None +class SchemasCollection(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + max_execution_time: Optional[float] = None + + class InstanceConfig(BaseModel): model_config = ConfigDict( validate_default=True, @@ -199,6 +209,7 @@ class InstanceConfig(BaseModel): query_activity: Optional[QueryActivity] = None query_metrics: Optional[QueryMetrics] = None reported_hostname: Optional[str] = None + schemas_collection: Optional[SchemasCollection] = None server_version: Optional[str] = None service: Optional[str] = None stored_procedure: Optional[str] = None diff --git a/sqlserver/datadog_checks/sqlserver/const.py b/sqlserver/datadog_checks/sqlserver/const.py index 8b4a70ff1e6d0..3a6f77923b2aa 100644 --- a/sqlserver/datadog_checks/sqlserver/const.py +++ b/sqlserver/datadog_checks/sqlserver/const.py @@ -268,3 +268,5 @@ ] PROC_CHAR_LIMIT = 500 + +DEFAULT_SCHEMAS_COLLECTION_INTERVAL = 600 diff --git a/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example b/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example index 8d3fff9c006c2..97199c413facd 100644 --- a/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example +++ b/sqlserver/datadog_checks/sqlserver/data/conf.yaml.example @@ -659,6 +659,27 @@ instances: # # ignore_missing_database: false + ## Configure collection of schemas. If `database_autodiscovery` is not enabled, data is collected + ## only for the database configured with `database` parameter. + # + # schemas_collection: + + ## @param enabled - boolean - optional - default: false + ## Enable schema collection. Requires `dbm: true`. Defaults to false. + # + # enabled: false + + ## @param collection_interval - number - optional - default: 600 + ## Set the database schema collection interval (in seconds). Defaults to 600 seconds. + # + # collection_interval: 600 + + ## @param max_execution_time - number - optional - default: 10 + ## Set the maximum time for schema collection (in seconds). Defaults to 10 seconds. + ## Capped by `schemas_collection.collection_interval` + # + # max_execution_time: 10 + ## @param tags - list of strings - optional ## A list of tags to attach to every metric and service check emitted by this instance. ## diff --git a/sqlserver/datadog_checks/sqlserver/queries.py b/sqlserver/datadog_checks/sqlserver/queries.py index 9f41eb09ccde9..f88d3f7231394 100644 --- a/sqlserver/datadog_checks/sqlserver/queries.py +++ b/sqlserver/datadog_checks/sqlserver/queries.py @@ -143,6 +143,73 @@ ], } +DB_QUERY = """ +SELECT + db.database_id AS id, db.name AS name, db.collation_name AS collation, dp.name AS owner +FROM + sys.databases db LEFT JOIN sys.database_principals dp ON db.owner_sid = dp.sid +WHERE db.name IN ({}); +""" + +SCHEMA_QUERY = """ +SELECT + s.name AS name, s.schema_id AS id, dp.name AS owner_name +FROM + sys.schemas AS s JOIN sys.database_principals dp ON s.principal_id = dp.principal_id +WHERE s.name NOT IN ('sys', 'information_schema') +""" + +TABLES_IN_SCHEMA_QUERY = """ +SELECT + object_id AS id, name +FROM + sys.tables +WHERE schema_id=? +""" + +COLUMN_QUERY = """ +SELECT + column_name AS name, data_type, column_default, is_nullable AS nullable , table_name, ordinal_position +FROM + information_schema.columns +WHERE + table_name IN ({}) and table_schema='{}'; +""" + +PARTITIONS_QUERY = """ +SELECT + object_id AS id, COUNT(*) AS partition_count +FROM + sys.partitions +WHERE + object_id IN ({}) GROUP BY object_id; +""" + +INDEX_QUERY = """ +SELECT + i.object_id AS id, i.name, i.type, i.is_unique, i.is_primary_key, i.is_unique_constraint, + i.is_disabled, STRING_AGG(c.name, ',') AS column_names +FROM + sys.indexes i JOIN sys.index_columns ic ON i.object_id = ic.object_id + AND i.index_id = ic.index_id JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id +WHERE + i.object_id IN ({}) GROUP BY i.object_id, i.name, i.type, + i.is_unique, i.is_primary_key, i.is_unique_constraint, i.is_disabled; +""" + +FOREIGN_KEY_QUERY = """ +SELECT + FK.referenced_object_id AS id, FK.name AS foreign_key_name, + OBJECT_NAME(FK.parent_object_id) AS referencing_table, + STRING_AGG(COL_NAME(FKC.parent_object_id, FKC.parent_column_id),',') AS referencing_column, + OBJECT_NAME(FK.referenced_object_id) AS referenced_table, + STRING_AGG(COL_NAME(FKC.referenced_object_id, FKC.referenced_column_id),',') AS referenced_column +FROM + sys.foreign_keys AS FK JOIN sys.foreign_key_columns AS FKC ON FK.object_id = FKC.constraint_object_id +WHERE + FK.referenced_object_id IN ({}) GROUP BY FK.name, FK.parent_object_id, FK.referenced_object_id; +""" + def get_query_ao_availability_groups(sqlserver_major_version): """ diff --git a/sqlserver/datadog_checks/sqlserver/schemas.py b/sqlserver/datadog_checks/sqlserver/schemas.py new file mode 100644 index 0000000000000..8888ea7c0e0bf --- /dev/null +++ b/sqlserver/datadog_checks/sqlserver/schemas.py @@ -0,0 +1,417 @@ +# (C) Datadog, Inc. 2024-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +try: + import datadog_agent +except ImportError: + from ..stubs import datadog_agent + +import json +import time + +from datadog_checks.base import is_affirmative +from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding +from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.sqlserver.const import ( + DEFAULT_SCHEMAS_COLLECTION_INTERVAL, + STATIC_INFO_ENGINE_EDITION, + STATIC_INFO_VERSION, + SWITCH_DB_STATEMENT, +) +from datadog_checks.sqlserver.queries import ( + COLUMN_QUERY, + DB_QUERY, + FOREIGN_KEY_QUERY, + INDEX_QUERY, + PARTITIONS_QUERY, + SCHEMA_QUERY, + TABLES_IN_SCHEMA_QUERY, +) +from datadog_checks.sqlserver.utils import convert_to_bool, execute_query, get_list_chunks, is_azure_sql_database + + +class SubmitData: + + def __init__(self, submit_data_function, base_event, logger): + self._submit_to_agent_queue = submit_data_function + self._base_event = base_event + self._log = logger + + self._columns_count = 0 + self._total_columns_sent = 0 + self.db_to_schemas = {} # dbname : { id : schema } + self.db_info = {} # name to info + + def set_base_event_data(self, hostname, tags, cloud_metadata, dbms_version): + self._base_event["host"] = hostname + self._base_event["tags"] = tags + self._base_event["cloud_metadata"] = cloud_metadata + self._base_event["dbms_version"] = dbms_version + + def reset(self): + self._total_columns_sent = 0 + self._columns_count = 0 + self.db_to_schemas.clear() + self.db_info.clear() + + def store_db_infos(self, db_infos): + for db_info in db_infos: + self.db_info[db_info['name']] = db_info + + def store(self, db_name, schema, tables, columns_count): + self._columns_count += columns_count + schemas = self.db_to_schemas.setdefault(db_name, {}) + if schema["id"] in schemas: + known_tables = schemas[schema["id"]].setdefault("tables", []) + known_tables = known_tables.extend(tables) + else: + schemas[schema["id"]] = schema + schemas[schema["id"]]["tables"] = tables + + def columns_since_last_submit(self): + return self._columns_count + + def truncate(self, json_event): + max_length = 1000 + if len(json_event) > max_length: + return json_event[:max_length] + " ... (truncated)" + else: + return json_event + + def send_truncated_msg(self, db_name, time_spent): + event = { + **self._base_event, + "metadata": [], + "timestamp": time.time() * 1000, + "collection_errors": [{"error_type": "truncated", "message": ""}], + } + db_info = self.db_info[db_name] + event["metadata"] = [{**(db_info)}] + event["collection_errors"][0]["message"] = ( + "Truncated after fetching {} columns, elapsed time is {}s, database is {}".format( + self._total_columns_sent, time_spent, db_name + ) + ) + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting truncation of schema collection: {}".format(self.truncate(json_event))) + self._submit_to_agent_queue(json_event) + + def submit(self): + if not self.db_to_schemas: + return + self._total_columns_sent += self._columns_count + self._columns_count = 0 + event = {**self._base_event, "metadata": [], "timestamp": time.time() * 1000} + for db, schemas_by_id in self.db_to_schemas.items(): + db_info = {} + db_info = self.db_info[db] + event["metadata"] = event["metadata"] + [{**(db_info), "schemas": list(schemas_by_id.values())}] + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting the following payload for schema collection: {}".format(self.truncate(json_event))) + self._submit_to_agent_queue(json_event) + self.db_to_schemas.clear() + + +def agent_check_getter(self): + return self._check + + +class Schemas(DBMAsyncJob): + + TABLES_CHUNK_SIZE = 500 + # Note: in async mode execution time also cannot exceed 2 checks. + DEFAULT_MAX_EXECUTION_TIME = 10 + MAX_COLUMNS_PER_EVENT = 100_000 + + def __init__(self, check, config): + self._check = check + self._log = check.log + self.schemas_per_db = {} + self._last_schemas_collect_time = None + collection_interval = config.schema_config.get('collection_interval', DEFAULT_SCHEMAS_COLLECTION_INTERVAL) + self._max_execution_time = min( + config.schema_config.get('max_execution_time', self.DEFAULT_MAX_EXECUTION_TIME), collection_interval + ) + super(Schemas, self).__init__( + check, + run_sync=True, + enabled=is_affirmative(config.schema_config.get('enabled', False)), + expected_db_exceptions=(), + # min collection interval is a desired collection interval for a check as a whole. + min_collection_interval=config.min_collection_interval, + dbms="sqlserver", + rate_limit=1 / float(collection_interval), + job_name="schemas", + shutdown_callback=self.shut_down, + ) + base_event = { + "host": None, + "agent_version": datadog_agent.get_version(), + "dbms": "sqlserver", + "kind": "sqlserver_databases", + "collection_interval": collection_interval, + "dbms_version": None, + "tags": self._check.non_internal_tags, + "cloud_metadata": self._check._config.cloud_metadata, + } + self._data_submitter = SubmitData(self._check.database_monitoring_metadata, base_event, self._log) + + def run_job(self): + self._collect_schemas_data() + + def shut_down(self): + self._data_submitter.submit() + + @tracked_method(agent_check_getter=agent_check_getter) + def _fetch_schema_data(self, cursor, start_time, db_name): + schemas = self._query_schema_information(cursor) + for schema in schemas: + tables = self._get_tables(schema, cursor) + tables_chunks = list(get_list_chunks(tables, self.TABLES_CHUNK_SIZE)) + for tables_chunk in tables_chunks: + schema_collection_elapsed_time = time.time() - start_time + if schema_collection_elapsed_time > self._max_execution_time: + self._data_submitter.submit() + self._data_submitter.send_truncated_msg(db_name, schema_collection_elapsed_time) + raise StopIteration( + """Schema collection took {}s which is longer than allowed limit of {}s, + stopped while collecting for db - {}""".format( + schema_collection_elapsed_time, self._max_execution_time, db_name + ) + ) + columns_count, tables_info = self._get_tables_data(tables_chunk, schema, cursor) + self._data_submitter.store(db_name, schema, tables_info, columns_count) + if self._data_submitter.columns_since_last_submit() > self.MAX_COLUMNS_PER_EVENT: + self._data_submitter.submit() + self._data_submitter.submit() + return False + + def _fetch_for_databases(self): + start_time = time.time() + databases = self._check.get_databases() + engine_edition = self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION) + with self._check.connection.open_managed_default_connection(): + with self._check.connection.get_managed_cursor() as cursor: + for db_name in databases: + try: + if not is_azure_sql_database(engine_edition): + cursor.execute(SWITCH_DB_STATEMENT.format(db_name)) + self._fetch_schema_data(cursor, start_time, db_name) + except StopIteration as e: + self._log.error( + "While executing fetch schemas for databse {}, the following exception occured {}".format( + db_name, e + ) + ) + return + except Exception as e: + self._log.error( + "While executing fetch schemas for databse {}, the following exception occured {}".format( + db_name, e + ) + ) + # Switch DB back to MASTER + if not is_azure_sql_database(engine_edition): + cursor.execute(SWITCH_DB_STATEMENT.format(self._check.connection.DEFAULT_DATABASE)) + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_schemas_data(self): + """Collects database information and schemas and submits to the agent's queue as dictionaries + schema dict + key/value: + "name": str + "id": str + "owner_name": str + "tables" : list of tables dicts + table + key/value: + "id" : str + "name" : str + columns: list of columns dicts + columns + key/value: + "name": str + "data_type": str + "default": str + "nullable": bool + indexes : list of index dicts + index + key/value: + "name": str + "type": str + "is_unique": bool + "is_primary_key": bool + "is_unique_constraint": bool + "is_disabled": bool, + "column_names": str + foreign_keys : list of foreign key dicts + foreign_key + key/value: + "foreign_key_name": str + "referencing_table": str + "referencing_column": str + "referenced_table": str + "referenced_column": str + partitions: partition dict + partition + key/value: + "partition_count": int + """ + self._data_submitter.reset() + self._data_submitter.set_base_event_data( + self._check.resolved_hostname, + self._check.non_internal_tags, + self._check._config.cloud_metadata, + "{},{}".format( + self._check.static_info_cache.get(STATIC_INFO_VERSION, ""), + self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION, ""), + ), + ) + + databases = self._check.get_databases() + db_infos = self._query_db_information(databases) + self._data_submitter.store_db_infos(db_infos) + self._fetch_for_databases() + self._data_submitter.submit() + self._log.debug("Finished collect_schemas_data") + + def _query_db_information(self, db_names): + with self._check.connection.open_managed_default_connection(): + with self._check.connection.get_managed_cursor() as cursor: + db_names_formatted = ",".join(["'{}'".format(t) for t in db_names]) + return execute_query(DB_QUERY.format(db_names_formatted), cursor, convert_results_to_str=True) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_tables(self, schema, cursor): + """returns a list of tables for schema with their names and empty column array + list of table dicts + "id": str + "name": str + "columns": [] + """ + tables_info = execute_query(TABLES_IN_SCHEMA_QUERY, cursor, convert_results_to_str=True, parameter=schema["id"]) + for t in tables_info: + t.setdefault("columns", []) + return tables_info + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _query_schema_information(self, cursor): + """returns a list of schema dicts + schema + dict: + "name": str + "id": str + "owner_name": str + """ + return execute_query(SCHEMA_QUERY, cursor, convert_results_to_str=True) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_tables_data(self, table_list, schema, cursor): + """returns extracted column numbers and a list of tables + "tables" : list of tables dicts + table + key/value: + "id" : str + "name" : str + columns: list of columns dicts + columns + key/value: + "name": str + "data_type": str + "default": str + "nullable": bool + indexes : list of index dicts + index + key/value: + "name": str + "type": str + "is_unique": bool + "is_primary_key": bool + "is_unique_constraint": bool + "is_disabled": bool, + "column_names": str + foreign_keys : list of foreign key dicts + foreign_key + key/value: + "foreign_key_name": str + "referencing_table": str + "referencing_column": str + "referenced_table": str + "referenced_column": str + partitions: partition dict + partition + key/value: + "partition_count": int + """ + if len(table_list) == 0: + return + name_to_id = {} + id_to_table_data = {} + table_ids_object = ",".join(["OBJECT_NAME({})".format(t.get("id")) for t in table_list]) + table_ids = ",".join(["{}".format(t.get("id")) for t in table_list]) + for t in table_list: + name_to_id[t["name"]] = t["id"] + id_to_table_data[t["id"]] = t + total_columns_number = self._populate_with_columns_data( + table_ids_object, name_to_id, id_to_table_data, schema, cursor + ) + self._populate_with_partitions_data(table_ids, id_to_table_data, cursor) + self._populate_with_foreign_keys_data(table_ids, id_to_table_data, cursor) + self._populate_with_index_data(table_ids, id_to_table_data, cursor) + return total_columns_number, list(id_to_table_data.values()) + + @tracked_method(agent_check_getter=agent_check_getter) + def _populate_with_columns_data(self, table_ids, name_to_id, id_to_table_data, schema, cursor): + cursor.execute(COLUMN_QUERY.format(table_ids, schema["name"])) + data = cursor.fetchall() + # AS default - cannot be used in sqlserver query as this word is reserved + columns = [ + "default" if str(i[0]).lower() == "column_default" else str(i[0]).lower() for i in cursor.description + ] + rows = [dict(zip(columns, [str(item) for item in row])) for row in data] + for row in rows: + table_name = str(row.get("table_name")) + table_id = name_to_id.get(table_name) + row.pop("table_name", None) + if "nullable" in row: + if row["nullable"].lower() == "no" or row["nullable"].lower() == "false": + row["nullable"] = False + else: + row["nullable"] = True + id_to_table_data.get(table_id)["columns"] = id_to_table_data.get(table_id).get("columns", []) + [row] + return len(data) + + @tracked_method(agent_check_getter=agent_check_getter) + def _populate_with_partitions_data(self, table_ids, table_id_to_table_data, cursor): + rows = execute_query(PARTITIONS_QUERY.format(table_ids), cursor) + for row in rows: + table_id = row.pop("id", None) + table_id_str = str(table_id) + table_id_to_table_data[table_id_str]["partitions"] = row + + @tracked_method(agent_check_getter=agent_check_getter) + def _populate_with_index_data(self, table_ids, table_id_to_table_data, cursor): + rows = execute_query(INDEX_QUERY.format(table_ids), cursor) + for row in rows: + table_id = row.pop("id", None) + table_id_str = str(table_id) + if "is_unique" in row: + row["is_unique"] = convert_to_bool(row["is_unique"]) + if "is_primary_key" in row: + row["is_primary_key"] = convert_to_bool(row["is_primary_key"]) + if "is_disabled" in row: + row["is_disabled"] = convert_to_bool(row["is_disabled"]) + if "is_unique_constraint" in row: + row["is_unique_constraint"] = convert_to_bool(row["is_unique_constraint"]) + table_id_to_table_data[table_id_str].setdefault("indexes", []) + table_id_to_table_data[table_id_str]["indexes"].append(row) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _populate_with_foreign_keys_data(self, table_ids, table_id_to_table_data, cursor): + rows = execute_query(FOREIGN_KEY_QUERY.format(table_ids), cursor) + for row in rows: + table_id = row.pop("id", None) + table_id_str = str(table_id) + table_id_to_table_data.get(table_id_str).setdefault("foreign_keys", []) + table_id_to_table_data.get(table_id_str)["foreign_keys"].append(row) diff --git a/sqlserver/datadog_checks/sqlserver/sqlserver.py b/sqlserver/datadog_checks/sqlserver/sqlserver.py index 3a661d8147d71..17aecaaa6fee7 100644 --- a/sqlserver/datadog_checks/sqlserver/sqlserver.py +++ b/sqlserver/datadog_checks/sqlserver/sqlserver.py @@ -23,6 +23,7 @@ SqlserverIndexUsageMetrics, ) from datadog_checks.sqlserver.metadata import SqlserverMetadata +from datadog_checks.sqlserver.schemas import Schemas from datadog_checks.sqlserver.statements import SqlserverStatementMetrics from datadog_checks.sqlserver.stored_procedures import SqlserverProcedureMetrics from datadog_checks.sqlserver.utils import Database, construct_use_statement, parse_sqlserver_major_version @@ -144,7 +145,7 @@ def __init__(self, name, init_config, instances): ) # type: TTLCache # Keep a copy of the tags before the internal resource tags are set so they can be used for paths that don't # go through the agent internal metrics submission processing those tags - self._non_internal_tags = copy.deepcopy(self.tags) + self.non_internal_tags = copy.deepcopy(self.tags) self.check_initializations.append(self.initialize_connection) self.check_initializations.append(self.set_resolved_hostname) self.check_initializations.append(self.set_resolved_hostname_metadata) @@ -159,11 +160,14 @@ def __init__(self, name, init_config, instances): self._database_metrics = None + self._schemas = Schemas(self, self._config) + def cancel(self): self.statement_metrics.cancel() self.procedure_metrics.cancel() self.activity.cancel() self.sql_metadata.cancel() + self._schemas.cancel() def config_checks(self): if self._config.autodiscovery and self.instance.get("database"): @@ -724,6 +728,16 @@ def _check_connections_by_use_db(self): # Switch DB back to MASTER cursor.execute(SWITCH_DB_STATEMENT.format(self.connection.DEFAULT_DATABASE)) + def get_databases(self): + engine_edition = self.static_info_cache.get(STATIC_INFO_ENGINE_EDITION) + if not is_azure_sql_database(engine_edition): + db_names = [d.name for d in self.databases] or [ + self.instance.get('database', self.connection.DEFAULT_DATABASE) + ] + else: + db_names = [self.instance.get('database', self.connection.DEFAULT_DATABASE)] + return db_names + def _check_database_conns(self): engine_edition = self.static_info_cache.get(STATIC_INFO_ENGINE_EDITION) if is_azure_sql_database(engine_edition): @@ -760,6 +774,7 @@ def check(self, _): self.procedure_metrics.run_job_loop(self.tags) self.activity.run_job_loop(self.tags) self.sql_metadata.run_job_loop(self.tags) + self._schemas.run_job_loop(self.tags) else: self.log.debug("Skipping check") @@ -1024,7 +1039,7 @@ def _send_database_instance_metadata(self): self.static_info_cache.get(STATIC_INFO_ENGINE_EDITION, ""), ), "integration_version": __version__, - "tags": self._non_internal_tags, + "tags": self.non_internal_tags, "timestamp": time.time() * 1000, "cloud_metadata": self._config.cloud_metadata, "metadata": { diff --git a/sqlserver/datadog_checks/sqlserver/utils.py b/sqlserver/datadog_checks/sqlserver/utils.py index 4664f768dcc10..a35106bd1ce09 100644 --- a/sqlserver/datadog_checks/sqlserver/utils.py +++ b/sqlserver/datadog_checks/sqlserver/utils.py @@ -3,6 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import os import re +from typing import Dict from datadog_checks.base.utils.platform import Platform from datadog_checks.sqlserver.const import ENGINE_EDITION_AZURE_MANAGED_INSTANCE, ENGINE_EDITION_SQL_DATABASE @@ -137,3 +138,30 @@ def is_azure_sql_database(engine_edition): :return: bool """ return engine_edition == ENGINE_EDITION_SQL_DATABASE + + +def execute_query(query, cursor, convert_results_to_str=False, parameter=None) -> Dict[str, str]: + if parameter is not None: + cursor.execute(query, (parameter,)) + else: + cursor.execute(query) + columns = [str(column[0]).lower() for column in cursor.description] + rows = [] + if convert_results_to_str: + rows = [dict(zip(columns, [str(item) for item in row])) for row in cursor.fetchall()] + else: + rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + return rows + + +def get_list_chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i : i + n] + + +def convert_to_bool(value): + if isinstance(value, int): + return bool(value) + else: + return value diff --git a/sqlserver/hatch.toml b/sqlserver/hatch.toml index a305f161e8fcf..27cd54574a225 100644 --- a/sqlserver/hatch.toml +++ b/sqlserver/hatch.toml @@ -1,6 +1,9 @@ [env.collectors.datadog-checks] base-package-features = ["deps", "db", "json"] +[envs.default] +dependencies = ["deepdiff"] + [[envs.default.matrix]] python = ["3.11"] os = ["linux"] diff --git a/sqlserver/pyproject.toml b/sqlserver/pyproject.toml index 1d04d0124de61..dccce892d132f 100644 --- a/sqlserver/pyproject.toml +++ b/sqlserver/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Private :: Do Not Upload", ] dependencies = [ - "datadog-checks-base>=36.5.0", + "datadog-checks-base>=36.8.0", ] dynamic = [ "version", diff --git a/sqlserver/tests/compose-ha/sql/aoag_primary.sql b/sqlserver/tests/compose-ha/sql/aoag_primary.sql index 9ed17b021f6b6..07c79b03b6aa5 100644 --- a/sqlserver/tests/compose-ha/sql/aoag_primary.sql +++ b/sqlserver/tests/compose-ha/sql/aoag_primary.sql @@ -36,6 +36,75 @@ GO ALTER DATABASE restricted_db SET RESTRICTED_USER GO +-- Create test database for integration schema tests +CREATE DATABASE datadog_test_schemas; +GO +USE datadog_test_schemas; +GO + +CREATE SCHEMA test_schema; +GO + +-- Create the partition function +CREATE PARTITION FUNCTION CityPartitionFunction (INT) +AS RANGE LEFT FOR VALUES (100, 200, 300); -- Define your partition boundaries here + +-- Create the partition scheme +CREATE PARTITION SCHEME CityPartitionScheme +AS PARTITION CityPartitionFunction ALL TO ([PRIMARY]); -- Assign partitions to filegroups + +-- Create the partitioned table +CREATE TABLE datadog_test_schemas.test_schema.cities ( + id INT NOT NULL DEFAULT 0, + name VARCHAR(255), + population INT NOT NULL DEFAULT 0, + CONSTRAINT PK_Cities PRIMARY KEY (id) +) ON CityPartitionScheme(id); -- Assign the partition scheme to the table + +-- Create indexes +CREATE INDEX two_columns_index ON datadog_test_schemas.test_schema.cities (id, name); +CREATE INDEX single_column_index ON datadog_test_schemas.test_schema.cities (population); + +INSERT INTO datadog_test_schemas.test_schema.cities VALUES (1, 'yey', 100), (2, 'bar', 200); +GO + +-- Create table with a foreign key +CREATE TABLE datadog_test_schemas.test_schema.landmarks (name varchar(255), city_id int DEFAULT 0); +GO +ALTER TABLE datadog_test_schemas.test_schema.landmarks ADD CONSTRAINT FK_CityId FOREIGN KEY (city_id) REFERENCES datadog_test_schemas.test_schema.cities(id); +GO + +-- Create table with unique constraint +CREATE TABLE datadog_test_schemas.test_schema.Restaurants ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Cuisine VARCHAR(100), + CONSTRAINT UC_RestaurantNameDistrict UNIQUE (RestaurantName, District) +); +GO + +-- Create table with a foreign key on two columns +CREATE TABLE datadog_test_schemas.test_schema.RestaurantReviews ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Review VARCHAR(MAX), + CONSTRAINT FK_RestaurantNameDistrict FOREIGN KEY (RestaurantName, District) REFERENCES datadog_test_schemas.test_schema.Restaurants(RestaurantName, District) +); +GO + +-- Create second test database for integration schema tests +CREATE DATABASE datadog_test_schemas_second; +GO +USE datadog_test_schemas_second; +-- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we +-- correctly support unicode throughout the integration. +CREATE TABLE datadog_test_schemas_second.dbo.ϑings (id int DEFAULT 0, name varchar(255)); +INSERT INTO datadog_test_schemas_second.dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); +CREATE USER bob FOR LOGIN bob; +CREATE USER fred FOR LOGIN fred; +CREATE CLUSTERED INDEX thingsindex ON datadog_test_schemas_second.dbo.ϑings (name); +GO + -- Create test database for integration tests -- only bob and fred have read/write access to this database USE [datadog_test-1]; diff --git a/sqlserver/tests/compose-high-cardinality-windows/setup.sql b/sqlserver/tests/compose-high-cardinality-windows/setup.sql index fd4c0efa3d4cf..f33ceff2df42e 100644 --- a/sqlserver/tests/compose-high-cardinality-windows/setup.sql +++ b/sqlserver/tests/compose-high-cardinality-windows/setup.sql @@ -30,6 +30,75 @@ GO CREATE USER datadog FOR LOGIN datadog; GO +-- Create test database for integration schema tests +CREATE DATABASE datadog_test_schemas; +GO +USE datadog_test_schemas; +GO + +CREATE SCHEMA test_schema; +GO + +-- Create the partition function +CREATE PARTITION FUNCTION CityPartitionFunction (INT) +AS RANGE LEFT FOR VALUES (100, 200, 300); -- Define your partition boundaries here + +-- Create the partition scheme +CREATE PARTITION SCHEME CityPartitionScheme +AS PARTITION CityPartitionFunction ALL TO ([PRIMARY]); -- Assign partitions to filegroups + +-- Create the partitioned table +CREATE TABLE datadog_test_schemas.test_schema.cities ( + id INT NOT NULL DEFAULT 0, + name VARCHAR(255), + population INT NOT NULL DEFAULT 0, + CONSTRAINT PK_Cities PRIMARY KEY (id) +) ON CityPartitionScheme(id); -- Assign the partition scheme to the table + +-- Create indexes +CREATE INDEX two_columns_index ON datadog_test_schemas.test_schema.cities (id, name); +CREATE INDEX single_column_index ON datadog_test_schemas.test_schema.cities (population); + +INSERT INTO datadog_test_schemas.test_schema.cities VALUES (1, 'yey', 100), (2, 'bar', 200); +GO + +-- Create table with a foreign key +CREATE TABLE datadog_test_schemas.test_schema.landmarks (name varchar(255), city_id int DEFAULT 0); +GO +ALTER TABLE datadog_test_schemas.test_schema.landmarks ADD CONSTRAINT FK_CityId FOREIGN KEY (city_id) REFERENCES datadog_test_schemas.test_schema.cities(id); +GO + +-- Create table with unique constraint +CREATE TABLE datadog_test_schemas.test_schema.Restaurants ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Cuisine VARCHAR(100), + CONSTRAINT UC_RestaurantNameDistrict UNIQUE (RestaurantName, District) +); +GO + +-- Create table with a foreign key on two columns +CREATE TABLE datadog_test_schemas.test_schema.RestaurantReviews ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Review VARCHAR(MAX), + CONSTRAINT FK_RestaurantNameDistrict FOREIGN KEY (RestaurantName, District) REFERENCES datadog_test_schemas.test_schema.Restaurants(RestaurantName, District) +); +GO + +-- Create second test database for integration schema tests +CREATE DATABASE datadog_test_schemas_second; +GO +USE datadog_test_schemas_second; +-- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we +-- correctly support unicode throughout the integration. +CREATE TABLE datadog_test_schemas_second.dbo.ϑings (id int DEFAULT 0, name varchar(255)); +INSERT INTO datadog_test_schemas_second.dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); +CREATE USER bob FOR LOGIN bob; +CREATE USER fred FOR LOGIN fred; +CREATE CLUSTERED INDEX thingsindex ON datadog_test_schemas_second.dbo.ϑings (name); +GO + -- Create test database for integration tests -- only bob and fred have read/write access to this database -- the datadog user has only connect access but can't read any objects diff --git a/sqlserver/tests/compose-high-cardinality/setup.sql b/sqlserver/tests/compose-high-cardinality/setup.sql index f8c2cc506500b..839fd7c690679 100644 --- a/sqlserver/tests/compose-high-cardinality/setup.sql +++ b/sqlserver/tests/compose-high-cardinality/setup.sql @@ -123,6 +123,75 @@ GRANT EXECUTE on nullCharTest to bob; GRANT EXECUTE on nullCharTest to fred; GO +-- Create test database for integration schema tests +CREATE DATABASE datadog_test_schemas; +GO +USE datadog_test_schemas; +GO + +CREATE SCHEMA test_schema; +GO + +-- Create the partition function +CREATE PARTITION FUNCTION CityPartitionFunction (INT) +AS RANGE LEFT FOR VALUES (100, 200, 300); -- Define your partition boundaries here + +-- Create the partition scheme +CREATE PARTITION SCHEME CityPartitionScheme +AS PARTITION CityPartitionFunction ALL TO ([PRIMARY]); -- Assign partitions to filegroups + +-- Create the partitioned table +CREATE TABLE datadog_test_schemas.test_schema.cities ( + id INT NOT NULL DEFAULT 0, + name VARCHAR(255), + population INT NOT NULL DEFAULT 0, + CONSTRAINT PK_Cities PRIMARY KEY (id) +) ON CityPartitionScheme(id); -- Assign the partition scheme to the table + +-- Create indexes +CREATE INDEX two_columns_index ON datadog_test_schemas.test_schema.cities (id, name); +CREATE INDEX single_column_index ON datadog_test_schemas.test_schema.cities (population); + +INSERT INTO datadog_test_schemas.test_schema.cities VALUES (1, 'yey', 100), (2, 'bar', 200); +GO + +-- Create table with a foreign key +CREATE TABLE datadog_test_schemas.test_schema.landmarks (name varchar(255), city_id int DEFAULT 0); +GO +ALTER TABLE datadog_test_schemas.test_schema.landmarks ADD CONSTRAINT FK_CityId FOREIGN KEY (city_id) REFERENCES datadog_test_schemas.test_schema.cities(id); +GO + +-- Create table with unique constraint +CREATE TABLE datadog_test_schemas.test_schema.Restaurants ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Cuisine VARCHAR(100), + CONSTRAINT UC_RestaurantNameDistrict UNIQUE (RestaurantName, District) +); +GO + +-- Create table with a foreign key on two columns +CREATE TABLE datadog_test_schemas.test_schema.RestaurantReviews ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Review VARCHAR(MAX), + CONSTRAINT FK_RestaurantNameDistrict FOREIGN KEY (RestaurantName, District) REFERENCES datadog_test_schemas.test_schema.Restaurants(RestaurantName, District) +); +GO + +-- Create second test database for integration schema tests +CREATE DATABASE datadog_test_schemas_second; +GO +USE datadog_test_schemas_second; +-- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we +-- correctly support unicode throughout the integration. +CREATE TABLE datadog_test_schemas_second.dbo.ϑings (id int DEFAULT 0, name varchar(255)); +INSERT INTO datadog_test_schemas_second.dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); +CREATE USER bob FOR LOGIN bob; +CREATE USER fred FOR LOGIN fred; +CREATE CLUSTERED INDEX thingsindex ON datadog_test_schemas_second.dbo.ϑings (name); +GO + -- Create test database for integration tests. -- Only bob and fred have read/write access to this database. CREATE DATABASE [datadog_test-1]; diff --git a/sqlserver/tests/compose-windows/setup.sql b/sqlserver/tests/compose-windows/setup.sql index 3df6386c8b4f2..d0f7c7cf5409d 100644 --- a/sqlserver/tests/compose-windows/setup.sql +++ b/sqlserver/tests/compose-windows/setup.sql @@ -30,6 +30,75 @@ GO CREATE USER datadog FOR LOGIN datadog; GO +-- Create test database for integration schema tests +CREATE DATABASE datadog_test_schemas; +GO +USE datadog_test_schemas; +GO + +CREATE SCHEMA test_schema; +GO + +-- Create the partition function +CREATE PARTITION FUNCTION CityPartitionFunction (INT) +AS RANGE LEFT FOR VALUES (100, 200, 300); -- Define your partition boundaries here + +-- Create the partition scheme +CREATE PARTITION SCHEME CityPartitionScheme +AS PARTITION CityPartitionFunction ALL TO ([PRIMARY]); -- Assign partitions to filegroups + +-- Create the partitioned table +CREATE TABLE datadog_test_schemas.test_schema.cities ( + id INT NOT NULL DEFAULT 0, + name VARCHAR(255), + population INT NOT NULL DEFAULT 0, + CONSTRAINT PK_Cities PRIMARY KEY (id) +) ON CityPartitionScheme(id); -- Assign the partition scheme to the table + +-- Create indexes +CREATE INDEX two_columns_index ON datadog_test_schemas.test_schema.cities (id, name); +CREATE INDEX single_column_index ON datadog_test_schemas.test_schema.cities (population); + +INSERT INTO datadog_test_schemas.test_schema.cities VALUES (1, 'yey', 100), (2, 'bar', 200); +GO + +-- Create table with a foreign key +CREATE TABLE datadog_test_schemas.test_schema.landmarks (name varchar(255), city_id int DEFAULT 0); +GO +ALTER TABLE datadog_test_schemas.test_schema.landmarks ADD CONSTRAINT FK_CityId FOREIGN KEY (city_id) REFERENCES datadog_test_schemas.test_schema.cities(id); +GO + +-- Create table with unique constraint +CREATE TABLE datadog_test_schemas.test_schema.Restaurants ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Cuisine VARCHAR(100), + CONSTRAINT UC_RestaurantNameDistrict UNIQUE (RestaurantName, District) +); +GO + +-- Create table with a foreign key on two columns +CREATE TABLE datadog_test_schemas.test_schema.RestaurantReviews ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Review VARCHAR(MAX), + CONSTRAINT FK_RestaurantNameDistrict FOREIGN KEY (RestaurantName, District) REFERENCES datadog_test_schemas.test_schema.Restaurants(RestaurantName, District) +); +GO + +-- Create second test database for integration schema tests +CREATE DATABASE datadog_test_schemas_second; +GO +USE datadog_test_schemas_second; +-- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we +-- correctly support unicode throughout the integration. +CREATE TABLE datadog_test_schemas_second.dbo.ϑings (id int DEFAULT 0, name varchar(255)); +INSERT INTO datadog_test_schemas_second.dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); +CREATE USER bob FOR LOGIN bob; +CREATE USER fred FOR LOGIN fred; +CREATE CLUSTERED INDEX thingsindex ON datadog_test_schemas_second.dbo.ϑings (name); +GO + -- Create test database for integration tests -- only bob and fred have read/write access to this database -- the datadog user has only connect access but can't read any objects diff --git a/sqlserver/tests/compose/setup.sql b/sqlserver/tests/compose/setup.sql index 86b2934a43c79..aac5c217160f2 100644 --- a/sqlserver/tests/compose/setup.sql +++ b/sqlserver/tests/compose/setup.sql @@ -15,6 +15,75 @@ CREATE USER fred FOR LOGIN fred; GRANT CONNECT ANY DATABASE to fred; GO +-- Create test database for integration schema tests +CREATE DATABASE datadog_test_schemas; +GO +USE datadog_test_schemas; +GO + +CREATE SCHEMA test_schema; +GO + +-- Create the partition function +CREATE PARTITION FUNCTION CityPartitionFunction (INT) +AS RANGE LEFT FOR VALUES (100, 200, 300); -- Define your partition boundaries here + +-- Create the partition scheme +CREATE PARTITION SCHEME CityPartitionScheme +AS PARTITION CityPartitionFunction ALL TO ([PRIMARY]); -- Assign partitions to filegroups + +-- Create the partitioned table +CREATE TABLE datadog_test_schemas.test_schema.cities ( + id INT NOT NULL DEFAULT 0, + name VARCHAR(255), + population INT NOT NULL DEFAULT 0, + CONSTRAINT PK_Cities PRIMARY KEY (id) +) ON CityPartitionScheme(id); -- Assign the partition scheme to the table + +-- Create indexes +CREATE INDEX two_columns_index ON datadog_test_schemas.test_schema.cities (id, name); +CREATE INDEX single_column_index ON datadog_test_schemas.test_schema.cities (population); + +INSERT INTO datadog_test_schemas.test_schema.cities VALUES (1, 'yey', 100), (2, 'bar', 200); +GO + +-- Create table with a foreign key +CREATE TABLE datadog_test_schemas.test_schema.landmarks (name varchar(255), city_id int DEFAULT 0); +GO +ALTER TABLE datadog_test_schemas.test_schema.landmarks ADD CONSTRAINT FK_CityId FOREIGN KEY (city_id) REFERENCES datadog_test_schemas.test_schema.cities(id); +GO + +-- Create table with unique constraint +CREATE TABLE datadog_test_schemas.test_schema.Restaurants ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Cuisine VARCHAR(100), + CONSTRAINT UC_RestaurantNameDistrict UNIQUE (RestaurantName, District) +); +GO + +-- Create table with a foreign key on two columns +CREATE TABLE datadog_test_schemas.test_schema.RestaurantReviews ( + RestaurantName VARCHAR(255), + District VARCHAR(100), + Review VARCHAR(MAX), + CONSTRAINT FK_RestaurantNameDistrict FOREIGN KEY (RestaurantName, District) REFERENCES datadog_test_schemas.test_schema.Restaurants(RestaurantName, District) +); +GO + +-- Create second test database for integration schema tests +CREATE DATABASE datadog_test_schemas_second; +GO +USE datadog_test_schemas_second; +-- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we +-- correctly support unicode throughout the integration. +CREATE TABLE datadog_test_schemas_second.dbo.ϑings (id int DEFAULT 0, name varchar(255)); +INSERT INTO datadog_test_schemas_second.dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); +CREATE USER bob FOR LOGIN bob; +CREATE USER fred FOR LOGIN fred; +CREATE CLUSTERED INDEX thingsindex ON datadog_test_schemas_second.dbo.ϑings (name); +GO + -- Create test database for integration tests -- only bob and fred have read/write access to this database CREATE DATABASE [datadog_test-1]; @@ -22,7 +91,7 @@ GO USE [datadog_test-1]; -- This table is pronounced "things" except we've replaced "th" with the greek lower case "theta" to ensure we -- correctly support unicode throughout the integration. -CREATE TABLE [datadog_test-1].dbo.ϑings (id int, name varchar(255)); +CREATE TABLE [datadog_test-1].dbo.ϑings (id int DEFAULT 0, name varchar(255)); INSERT INTO [datadog_test-1].dbo.ϑings VALUES (1, 'foo'), (2, 'bar'); CREATE USER bob FOR LOGIN bob; CREATE USER fred FOR LOGIN fred; diff --git a/sqlserver/tests/test_metadata.py b/sqlserver/tests/test_metadata.py index 226519eb6ebdb..361add055f9db 100644 --- a/sqlserver/tests/test_metadata.py +++ b/sqlserver/tests/test_metadata.py @@ -5,13 +5,17 @@ from __future__ import unicode_literals import logging +import re from copy import copy import pytest +from deepdiff import DeepDiff +from datadog_checks.dev.utils import running_on_windows_ci from datadog_checks.sqlserver import SQLServer from .common import CHECK_NAME +from .utils import normalize_ids, normalize_indexes_columns try: import pyodbc @@ -90,3 +94,308 @@ def test_sqlserver_collect_settings(aggregator, dd_run_check, dbm_instance): assert event['dbms'] == "sqlserver" assert event['kind'] == "sqlserver_configs" assert len(event["metadata"]) > 0 + + +def test_collect_schemas(aggregator, dd_run_check, dbm_instance): + databases_to_find = ['datadog_test_schemas', 'datadog_test_schemas_second'] + exp_datadog_test = { + 'id': 'normalized_value', + 'name': 'datadog_test_schemas_second', + "collation": "SQL_Latin1_General_CP1_CI_AS", + 'owner': 'dbo', + 'schemas': [ + { + 'name': 'dbo', + 'id': 'normalized_value', + 'owner_name': 'dbo', + 'tables': [ + { + 'id': 'normalized_value', + 'name': 'ϑings', + 'columns': [ + { + 'name': 'id', + 'data_type': 'int', + 'default': '((0))', + 'nullable': True, + 'ordinal_position': '1', + }, + { + 'name': 'name', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '2', + }, + ], + 'partitions': {'partition_count': 1}, + 'indexes': [ + { + 'name': 'thingsindex', + 'type': 1, + 'is_unique': False, + 'is_primary_key': False, + 'is_unique_constraint': False, + 'is_disabled': False, + 'column_names': 'name', + } + ], + } + ], + } + ], + } + exp_datadog_test_schemas = { + 'id': 'normalized_value', + 'name': 'datadog_test_schemas', + "collation": "SQL_Latin1_General_CP1_CI_AS", + 'owner': 'dbo', + 'schemas': [ + { + 'name': 'test_schema', + 'id': 'normalized_value', + 'owner_name': 'dbo', + 'tables': [ + { + 'id': 'normalized_value', + 'name': 'cities', + 'columns': [ + { + 'name': 'id', + 'data_type': 'int', + 'default': '((0))', + 'nullable': False, + 'ordinal_position': '1', + }, + { + 'name': 'name', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '2', + }, + { + 'name': 'population', + 'data_type': 'int', + 'default': '((0))', + 'nullable': False, + 'ordinal_position': '3', + }, + ], + 'partitions': {'partition_count': 12}, + 'foreign_keys': [ + { + 'foreign_key_name': 'FK_CityId', + 'referencing_table': 'landmarks', + 'referencing_column': 'city_id', + 'referenced_table': 'cities', + 'referenced_column': 'id', + } + ], + 'indexes': [ + { + 'name': 'PK_Cities', + 'type': 1, + 'is_unique': True, + 'is_primary_key': True, + 'is_unique_constraint': False, + 'is_disabled': False, + 'column_names': 'id', + }, + { + 'name': 'single_column_index', + 'type': 2, + 'is_unique': False, + 'is_primary_key': False, + 'is_unique_constraint': False, + 'is_disabled': False, + 'column_names': 'id,population', + }, + { + 'name': 'two_columns_index', + 'type': 2, + 'is_unique': False, + 'is_primary_key': False, + 'is_unique_constraint': False, + 'is_disabled': False, + 'column_names': 'id,name', + }, + ], + }, + { + 'id': 'normalized_value', + 'name': 'landmarks', + 'columns': [ + { + 'name': 'name', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '1', + }, + { + 'name': 'city_id', + 'data_type': 'int', + 'default': '((0))', + 'nullable': True, + 'ordinal_position': '2', + }, + ], + 'partitions': {'partition_count': 1}, + }, + { + 'id': 'normalized_value', + 'name': 'RestaurantReviews', + 'columns': [ + { + 'name': 'RestaurantName', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '1', + }, + { + 'name': 'District', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '2', + }, + { + 'name': 'Review', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '3', + }, + ], + 'partitions': {'partition_count': 1}, + }, + { + 'id': 'normalized_value', + 'name': 'Restaurants', + 'columns': [ + { + 'name': 'RestaurantName', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '1', + }, + { + 'name': 'District', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '2', + }, + { + 'name': 'Cuisine', + 'data_type': 'varchar', + 'default': 'None', + 'nullable': True, + 'ordinal_position': '3', + }, + ], + 'partitions': {'partition_count': 2}, + 'foreign_keys': [ + { + 'foreign_key_name': 'FK_RestaurantNameDistrict', + 'referencing_table': 'RestaurantReviews', + 'referencing_column': 'RestaurantName,District', + 'referenced_table': 'Restaurants', + 'referenced_column': 'RestaurantName,District', + } + ], + 'indexes': [ + { + 'name': 'UC_RestaurantNameDistrict', + 'type': 2, + 'is_unique': True, + 'is_primary_key': False, + 'is_unique_constraint': True, + 'is_disabled': False, + 'column_names': 'District,RestaurantName', + } + ], + }, + ], + } + ], + } + + if running_on_windows_ci(): + exp_datadog_test['owner'] = 'None' + exp_datadog_test_schemas['owner'] = 'None' + + expected_data_for_db = { + 'datadog_test_schemas_second': exp_datadog_test, + 'datadog_test_schemas': exp_datadog_test_schemas, + } + + dbm_instance['database_autodiscovery'] = True + dbm_instance['autodiscovery_include'] = ['datadog_test_schemas', 'datadog_test_schemas_second'] + dbm_instance['dbm'] = True + dbm_instance['schemas_collection'] = {"enabled": True} + + check = SQLServer(CHECK_NAME, {}, [dbm_instance]) + dd_run_check(check) + + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + + actual_payloads = {} + + for schema_event in (e for e in dbm_metadata if e['kind'] == 'sqlserver_databases'): + assert schema_event.get("timestamp") is not None + assert schema_event["host"] == "stubbed.hostname" + assert schema_event["agent_version"] == "0.0.0" + assert schema_event["dbms"] == "sqlserver" + assert schema_event.get("collection_interval") is not None + assert schema_event.get("dbms_version") is not None + + database_metadata = schema_event['metadata'] + assert len(database_metadata) == 1 + db_name = database_metadata[0]['name'] + + if db_name in actual_payloads: + actual_payloads[db_name]['schemas'] = actual_payloads[db_name]['schemas'] + database_metadata[0]['schemas'] + else: + actual_payloads[db_name] = database_metadata[0] + + assert len(actual_payloads) == len(expected_data_for_db) + + for db_name, actual_payload in actual_payloads.items(): + + assert db_name in databases_to_find + + # id's are env dependant + normalize_ids(actual_payload) + + # index columns may be in any order + normalize_indexes_columns(actual_payload) + + difference = DeepDiff(actual_payload, expected_data_for_db[db_name], ignore_order=True) + + diff_keys = list(difference.keys()) + # schema data also collects certain builtin default schemas which are ignored in the test + if len(diff_keys) > 0 and diff_keys != ['iterable_item_removed']: + raise AssertionError(Exception("found the following diffs: " + str(difference))) + + +def test_schemas_collection_truncated(aggregator, dd_run_check, dbm_instance): + dbm_instance['database_autodiscovery'] = True + dbm_instance['autodiscovery_include'] = ['datadog_test_schemas'] + dbm_instance['dbm'] = True + dbm_instance['schemas_collection'] = {"enabled": True, "max_execution_time": 0} + expected_pattern = r"^Truncated after fetching \d+ columns, elapsed time is \d+(\.\d+)?s, database is .*" + check = SQLServer(CHECK_NAME, {}, [dbm_instance]) + dd_run_check(check) + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + found = False + for schema_event in (e for e in dbm_metadata if e['kind'] == 'sqlserver_databases'): + if "collection_errors" in schema_event: + if schema_event["collection_errors"][0]["error_type"] == "truncated" and re.fullmatch( + expected_pattern, schema_event["collection_errors"][0]["message"] + ): + found = True + assert found diff --git a/sqlserver/tests/test_unit.py b/sqlserver/tests/test_unit.py index 0f65e631a01cc..35776ab816025 100644 --- a/sqlserver/tests/test_unit.py +++ b/sqlserver/tests/test_unit.py @@ -2,17 +2,21 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) import copy +import json import os import re +import time from collections import namedtuple import mock import pytest +from deepdiff import DeepDiff from datadog_checks.dev import EnvVars from datadog_checks.sqlserver import SQLServer from datadog_checks.sqlserver.connection import split_sqlserver_host_port from datadog_checks.sqlserver.metrics import SqlFractionMetric, SqlMasterDatabaseFileStats +from datadog_checks.sqlserver.schemas import Schemas, SubmitData from datadog_checks.sqlserver.sqlserver import SQLConnectionError from datadog_checks.sqlserver.utils import ( Database, @@ -735,3 +739,114 @@ def test_extract_sql_comments_and_procedure_name(query, expected_comments, is_pr assert comments == expected_comments assert p == is_proc assert re.match(name, expected_name, re.IGNORECASE) if expected_name else expected_name == name + + +class DummyLogger: + def debug(*args): + pass + + def error(*args): + pass + + +def set_up_submitter_unit_test(): + submitted_data = [] + base_event = { + "host": "some", + "agent_version": 0, + "dbms": "sqlserver", + "kind": "sqlserver_databases", + "collection_interval": 1200, + "dbms_version": "some", + "tags": "some", + "cloud_metadata": "some", + } + + def submitData(data): + submitted_data.append(data) + + dataSubmitter = SubmitData(submitData, base_event, DummyLogger()) + return dataSubmitter, submitted_data + + +def test_submit_data(): + + dataSubmitter, submitted_data = set_up_submitter_unit_test() + + dataSubmitter.store_db_infos([{"id": 3, "name": "test_db1"}, {"id": 4, "name": "test_db2"}]) + schema1 = {"id": "1"} + schema2 = {"id": "2"} + schema3 = {"id": "3"} + + dataSubmitter.store("test_db1", schema1, [1, 2], 5) + dataSubmitter.store("test_db2", schema3, [1, 2], 5) + assert dataSubmitter.columns_since_last_submit() == 10 + dataSubmitter.store("test_db1", schema2, [1, 2], 10) + + dataSubmitter.submit() + + assert dataSubmitter.columns_since_last_submit() == 0 + + expected_data = { + "host": "some", + "agent_version": 0, + "dbms": "sqlserver", + "kind": "sqlserver_databases", + "collection_interval": 1200, + "dbms_version": "some", + "tags": "some", + "cloud_metadata": "some", + "metadata": [ + {"id": 3, "name": "test_db1", "schemas": [{"id": "1", "tables": [1, 2]}, {"id": "2", "tables": [1, 2]}]}, + {"id": 4, "name": "test_db2", "schemas": [{"id": "3", "tables": [1, 2]}]}, + ], + "timestamp": 1.1, + } + difference = DeepDiff( + json.loads(submitted_data[0]), expected_data, exclude_paths="root['timestamp']", ignore_order=True + ) + assert len(difference) == 0 + + +def test_fetch_throws(instance_docker): + check = SQLServer(CHECK_NAME, {}, [instance_docker]) + schemas = Schemas(check, check._config) + with mock.patch('time.time', side_effect=[0, 9999999]), mock.patch( + 'datadog_checks.sqlserver.schemas.Schemas._query_schema_information', return_value={"id": 1} + ), mock.patch('datadog_checks.sqlserver.schemas.Schemas._get_tables', return_value=[1, 2]): + with pytest.raises(StopIteration): + schemas._fetch_schema_data("dummy_cursor", time.time(), "my_db") + + +def test_submit_is_called_if_too_many_columns(instance_docker): + check = SQLServer(CHECK_NAME, {}, [instance_docker]) + schemas = Schemas(check, check._config) + with mock.patch('time.time', side_effect=[0, 0]), mock.patch( + 'datadog_checks.sqlserver.schemas.Schemas._query_schema_information', return_value={"id": 1} + ), mock.patch('datadog_checks.sqlserver.schemas.Schemas._get_tables', return_value=[1, 2]), mock.patch( + 'datadog_checks.sqlserver.schemas.SubmitData.submit' + ) as mocked_submit, mock.patch( + 'datadog_checks.sqlserver.schemas.Schemas._get_tables_data', return_value=(1000_000, {"id": 1}) + ): + with pytest.raises(StopIteration): + schemas._fetch_schema_data("dummy_cursor", time.time(), "my_db") + mocked_submit.called_once() + + +def test_exception_handling_by_do_for_dbs(instance_docker): + check = SQLServer(CHECK_NAME, {}, [instance_docker]) + check.initialize_connection() + schemas = Schemas(check, check._config) + mock_cursor = mock.MagicMock() + with mock.patch( + 'datadog_checks.sqlserver.schemas.Schemas._fetch_schema_data', side_effect=Exception("Can't connect to DB") + ), mock.patch('datadog_checks.sqlserver.sqlserver.SQLServer.get_databases', return_value=["db1"]), mock.patch( + 'cachetools.TTLCache.get', return_value="dummy" + ), mock.patch( + 'datadog_checks.sqlserver.connection.Connection.open_managed_default_connection' + ), mock.patch( + 'datadog_checks.sqlserver.connection.Connection.get_managed_cursor', return_value=mock_cursor + ), mock.patch( + 'datadog_checks.sqlserver.utils.is_azure_sql_database', return_value={} + ): + schemas._fetch_for_databases() diff --git a/sqlserver/tests/utils.py b/sqlserver/tests/utils.py index 1d009b47ed6f5..eac8dceebde69 100644 --- a/sqlserver/tests/utils.py +++ b/sqlserver/tests/utils.py @@ -220,3 +220,23 @@ def run_query_and_ignore_exception(conn, query): @staticmethod def _create_rand_string(length=5): return ''.join(choice(string.ascii_lowercase + string.digits) for _ in range(length)) + + +def normalize_ids(actual_payload): + actual_payload['id'] = 'normalized_value' + for schema in actual_payload['schemas']: + schema['id'] = 'normalized_value' + for table in schema['tables']: + table['id'] = 'normalized_value' + + +def normalize_indexes_columns(actual_payload): + for schema in actual_payload['schemas']: + schema['id'] = 'normalized_value' + for table in schema['tables']: + if 'indexes' in table: + for index in table['indexes']: + column_names = index['column_names'] + columns = column_names.split(',') + sorted_columns = sorted(columns) + index['column_names'] = ','.join(sorted_columns)