Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 43abbe4

Browse files
authored
use psycopg2 2.8.4 with auto keep-alive feature (#31)
1 parent 69e7395 commit 43abbe4

File tree

2 files changed

+11
-18
lines changed

2 files changed

+11
-18
lines changed

setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
long_description = f.read()
77

88
setup(name='pipelinewise-tap-postgres',
9-
version='1.1.9',
9+
version='1.2.0',
1010
description='Singer.io tap for extracting data from PostgreSQL - PipelineWise compatible',
1111
long_description=long_description,
1212
long_description_content_type='text/markdown',
@@ -18,7 +18,7 @@
1818
],
1919
install_requires=[
2020
'singer-python==5.8.1',
21-
'psycopg2==2.8.2',
21+
'psycopg2==2.8.4',
2222
'strict-rfc3339==0.7',
2323
'nose==1.3.7'
2424
],

tap_postgres/sync_strategies/logical_replication.py

+9-16
Original file line numberDiff line numberDiff line change
@@ -400,15 +400,15 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
400400
cur = conn.cursor()
401401

402402
try:
403-
LOGGER.info("{} : Starting log streaming at {} to {} (slot {})".format(datetime.datetime.utcnow(), int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot))
404-
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
403+
LOGGER.info("{} : Request wal streaming from {} to {} (slot {})".format(datetime.datetime.utcnow(), int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot))
404+
# psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval
405+
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
405406
except psycopg2.ProgrammingError:
406407
raise Exception("Unable to start replication with logical replication (slot {})".format(slot))
407408

408409
# Emulate some behaviour of pg_recvlogical
409410
LOGGER.info("{} : Confirming write up to 0/0, flush to 0/0".format(datetime.datetime.utcnow()))
410411
cur.send_feedback(write_lsn=0, flush_lsn=0, reply=True)
411-
time.sleep(1)
412412

413413
lsn_received_timestamp = datetime.datetime.utcnow()
414414
poll_timestamp = datetime.datetime.utcnow()
@@ -438,7 +438,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
438438
# This is to ensure we only flush to lsn that has completed entirely
439439
if lsn_currently_processing is None:
440440
lsn_currently_processing = msg.data_start
441-
LOGGER.info("{} : First message received is {} at {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_currently_processing), datetime.datetime.utcnow()))
441+
LOGGER.info("{} : First wal message received was {} at {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_currently_processing), datetime.datetime.utcnow()))
442442

443443
# Flush Postgres wal up to lsn comitted in previous run, or first lsn received in this run
444444
lsn_to_flush = lsn_comitted
@@ -452,24 +452,18 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
452452
lsn_received_timestamp = datetime.datetime.utcnow()
453453
lsn_processed_count = lsn_processed_count + 1
454454
if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD:
455-
# LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
455+
LOGGER.debug("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
456456
for s in logical_streams:
457457
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)
458458
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
459459
lsn_processed_count = 0
460460

461-
# When data is received, and when data is not received, a keep-alive poll needs to be returned to PostgreSQL
461+
# Every poll_interval, update latest comitted lsn position from the state_file
462462
if datetime.datetime.utcnow() >= (poll_timestamp + datetime.timedelta(seconds=poll_interval)):
463463
if lsn_currently_processing is None:
464-
LOGGER.info("{} : Sending keep-alive message to source server (last message received was {} at {})".format(
465-
datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
466-
cur.send_feedback()
467-
elif state_file is None:
468-
LOGGER.info("{} : Sending keep-alive message to source server (last message received was {} at {})".format(
469-
datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
470-
cur.send_feedback()
464+
LOGGER.info("{} : Waiting for first wal message".format(datetime.datetime.utcnow()))
471465
else:
472-
# Read lsn_comitted from state.json and feeback to source server
466+
LOGGER.info("{} : Last wal message received was {} at {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
473467
try:
474468
state_comitted_file = open(state_file)
475469
except:
@@ -480,8 +474,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
480474
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
481475
lsn_to_flush = lsn_comitted
482476
if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing
483-
LOGGER.info("{} : Confirming write up to {}, flush to {} (last message received was {} at {})".format(
484-
datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
477+
LOGGER.info("{} : Confirming write up to {}, flush to {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush)))
485478
cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True)
486479

487480
poll_timestamp = datetime.datetime.utcnow()

0 commit comments

Comments
 (0)