Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit c295d7c

Browse files
authored
AP-1424: limit config to mitigate long running incremental queries (#211)
1 parent b418f2e commit c295d7c

File tree

4 files changed

+44
-40
lines changed

4 files changed

+44
-40
lines changed

README.md

+20-19
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,26 @@ These are the same basic configuration properties used by the PostgreSQL command
5050

5151
Full list of options in `config.json`:
5252

53-
| Property | Type | Required? | Description |
54-
|-------------------------------------|---------|------------|---------------------------------------------------------------|
55-
| host | String | Yes | PostgreSQL host |
56-
| port | Integer | Yes | PostgreSQL port |
57-
| user | String | Yes | PostgreSQL user |
58-
| password | String | Yes | PostgreSQL password |
59-
| dbname | String | Yes | PostgreSQL database name |
60-
| filter_schemas | String | No | Comma separated schema names to scan only the required schemas to improve the performance of data extraction. (Default: None) |
61-
| ssl | String | No | If set to `"true"` then use SSL via postgres sslmode `require` option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail. (Default: None) |
62-
| logical_poll_total_seconds | Integer | No | Stop running the tap when no data received from wal after certain number of seconds. (Default: 10800) |
63-
| break_at_end_lsn | Boolean | No | Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started. (Default: true) |
64-
| max_run_seconds | Integer | No | Stop running the tap after certain number of seconds. (Default: 43200) |
65-
| debug_lsn | String | No | If set to `"true"` then add `_sdc_lsn` property to the singer messages to debug postgres LSN position in the WAL stream. (Default: None) |
66-
| tap_id | String | No | ID of the pipeline/tap (Default: None) |
67-
| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |
68-
| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |
69-
| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |
70-
| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |
71-
| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |
53+
| Property | Type | Required? | Default | Description |
54+
|----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
55+
| host | String | Yes | - | PostgreSQL host |
56+
| port | Integer | Yes | - | PostgreSQL port |
57+
| user | String | Yes | - | PostgreSQL user |
58+
| password | String | Yes | - | PostgreSQL password |
59+
| dbname | String | Yes | - | PostgreSQL database name |
60+
| filter_schemas | String | No | None | Comma separated schema names to scan only the required schemas to improve the performance of data extraction. |
61+
| ssl | String | No | None | If set to `"true"` then use SSL via postgres sslmode `require` option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail. |
62+
| logical_poll_total_seconds | Integer | No | 10800 | Stop running the tap when no data received from wal after certain number of seconds. |
63+
| break_at_end_lsn | Boolean | No | true | Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started. |
64+
| max_run_seconds | Integer | No | 43200 | Stop running the tap after certain number of seconds. |
65+
| debug_lsn | String | No | None | If set to `"true"` then add `_sdc_lsn` property to the singer messages to debug postgres LSN position in the WAL stream. |
66+
| tap_id | String | No | None | ID of the pipeline/tap |
67+
| itersize | Integer | No | 20000 | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE |
68+
| default_replication_method | String | No | None | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) |
69+
| use_secondary | Boolean | No | False | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication |
70+
| secondary_host | String | No | - | PostgreSQL Replica host (required if `use_secondary` is `True`) |
71+
| secondary_port | Integer | No | - | PostgreSQL Replica port (required if `use_secondary` is `True`) |
72+
| limit | Integer | No | None | Adds a limit to INCREMENTAL queries to limit the number of records returns per run |
7273

7374

7475
### Run the tap in Discovery Mode

tap_postgres/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ def main_impl():
391391
Main method
392392
"""
393393
args = parse_args(REQUIRED_CONFIG_KEYS)
394+
395+
limit = args.config.get('limit')
394396
conn_config = {
395397
# Required config keys
396398
'host': args.config['host'],
@@ -407,6 +409,7 @@ def main_impl():
407409
'break_at_end_lsn': args.config.get('break_at_end_lsn', True),
408410
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
409411
'use_secondary': args.config.get('use_secondary', False),
412+
'limit': int(limit) if limit else None
410413
}
411414

412415
if conn_config['use_secondary']:

tap_postgres/sync_strategies/incremental.py

+19-20
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
8686
"replication_key_sql_datatype": replication_key_sql_datatype,
8787
"replication_key_value": replication_key_value,
8888
"schema_name": schema_name,
89-
"stream": stream})
89+
"table_name": stream['table_name'],
90+
"limit": conn_info['limit']
91+
})
9092
LOGGER.info('select statement: %s with itersize %s', select_sql, cur.itersize)
9193
cur.execute(select_sql)
9294

@@ -111,7 +113,6 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
111113
'replication_key_value',
112114
record_message.record[replication_key])
113115

114-
115116
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
116117
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
117118

@@ -122,25 +123,23 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
122123

123124
def _get_select_sql(params):
124125
escaped_columns = params['escaped_columns']
125-
replication_key = params['replication_key']
126+
replication_key = post_db.prepare_columns_sql(params['replication_key'])
126127
replication_key_sql_datatype = params['replication_key_sql_datatype']
127128
replication_key_value = params['replication_key_value']
128129
schema_name = params['schema_name']
129-
stream = params['stream']
130-
if replication_key_value:
131-
select_sql = f"""
132-
SELECT {','.join(escaped_columns)}
133-
FROM (
134-
SELECT *
135-
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
136-
WHERE {post_db.prepare_columns_sql(replication_key)} >= '{replication_key_value}'::{replication_key_sql_datatype}
137-
ORDER BY {post_db.prepare_columns_sql(replication_key)} ASC
138-
) pg_speedup_trick"""
139-
else:
140-
# if not replication_key_value
141-
select_sql = f"""
142-
SELECT {','.join(escaped_columns)}
143-
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
144-
ORDER BY {post_db.prepare_columns_sql(replication_key)} ASC
145-
"""
130+
table_name = params['table_name']
131+
132+
limit_statement = f'LIMIT {params["limit"]}' if params["limit"] else ''
133+
where_statement = f"WHERE {replication_key} >= '{replication_key_value}'::{replication_key_sql_datatype}" \
134+
if replication_key_value else ""
135+
136+
select_sql = f"""
137+
SELECT {','.join(escaped_columns)}
138+
FROM (
139+
SELECT *
140+
FROM {post_db.fully_qualified_table_name(schema_name, table_name)}
141+
{where_statement}
142+
ORDER BY {replication_key} ASC {limit_statement}
143+
) pg_speedup_trick;"""
144+
146145
return select_sql

tests/unit/test_incremental.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def setUp(self) -> None:
2929
'user': 'foo_user',
3030
'password': 'foo_pass',
3131
'port': 12345,
32-
'use_secondary': False
32+
'use_secondary': False,
33+
'limit': None
3334
}
3435
self.stream = {'tap_stream_id': 5, 'stream': 'bar', 'table_name': 'pg_tbl'}
3536
self.md_map = {

0 commit comments

Comments
 (0)