Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 2890000

Browse files
authored
Update singer streams to lsn_last_processed before emiting STATE message (#26)
1 parent 7934c45 commit 2890000

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

tap_postgres/sync_strategies/logical_replication.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,9 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
434434
lsn_received_timestamp = datetime.datetime.utcnow()
435435
lsn_processed_count = lsn_processed_count + 1
436436
if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD:
437+
LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
438+
for s in logical_streams:
439+
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)
437440
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
438441
lsn_processed_count = 0
439442

@@ -465,8 +468,9 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
465468
if lsn_comitted > lsn_last_processed:
466469
lsn_last_processed = lsn_comitted
467470
LOGGER.info("Current lsn_last_processed {} is older than lsn_comitted {}".format(int_to_lsn(lsn_last_processed), int_to_lsn(lsn_comitted)))
471+
472+
LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
468473
for s in logical_streams:
469-
LOGGER.info("updating bookmark for stream {} to lsn = {} ({})".format(s['tap_stream_id'], lsn_last_processed, int_to_lsn(lsn_last_processed)))
470474
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)
471475

472476
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))

0 commit comments

Comments
 (0)