From e5db119277c7b3908c1c97862f6482a2789547d0 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Wed, 21 May 2025 16:26:10 +0200 Subject: [PATCH 01/13] Removed zongji type mappings which are now provided by the Zongji package directly Added check for tablemap events --- modules/module-mysql/package.json | 2 +- .../src/replication/zongji/zongji-utils.ts | 5 + .../src/replication/zongji/zongji.d.ts | 129 ------------------ modules/module-mysql/test/tsconfig.json | 2 +- pnpm-lock.yaml | 10 +- 5 files changed, 12 insertions(+), 136 deletions(-) delete mode 100644 modules/module-mysql/src/replication/zongji/zongji.d.ts diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 07fe335f..c87e698b 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "^0.1.0", + "@powersync/mysql-zongji": "0.0.0-dev-20250521092520", "semver": "^7.5.4", "async": "^3.2.4", "mysql2": "^3.11.0", diff --git a/modules/module-mysql/src/replication/zongji/zongji-utils.ts b/modules/module-mysql/src/replication/zongji/zongji-utils.ts index 36122b63..24d01663 100644 --- a/modules/module-mysql/src/replication/zongji/zongji-utils.ts +++ b/modules/module-mysql/src/replication/zongji/zongji-utils.ts @@ -3,6 +3,7 @@ import { BinLogGTIDLogEvent, BinLogMutationEvent, BinLogRotationEvent, + BinLogTableMapEvent, BinLogUpdateEvent, BinLogXidEvent } from '@powersync/mysql-zongji'; @@ -11,6 +12,10 @@ export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent return event.getEventName() == 'gtidlog'; } +export function eventIsTableMap(event: BinLogEvent): event is BinLogTableMapEvent { + return event.getEventName() == 'tablemap'; +} + export function eventIsXid(event: BinLogEvent): event is BinLogXidEvent { return event.getEventName() == 'xid'; } diff --git a/modules/module-mysql/src/replication/zongji/zongji.d.ts b/modules/module-mysql/src/replication/zongji/zongji.d.ts deleted file mode 100644 index f5640497..00000000 --- a/modules/module-mysql/src/replication/zongji/zongji.d.ts +++ /dev/null @@ -1,129 +0,0 @@ -declare module '@powersync/mysql-zongji' { - import { Socket } from 'net'; - - export type ZongjiOptions = { - host: string; - user: string; - password: string; - dateStrings?: boolean; - timeZone?: string; - }; - - interface DatabaseFilter { - [databaseName: string]: string[] | true; - } - - export type StartOptions = { - includeEvents?: string[]; - excludeEvents?: string[]; - /** - * Describe which databases and tables to include (Only for row events). Use database names as the key and pass an array of table names or true (for the entire database). - * Example: { 'my_database': ['allow_table', 'another_table'], 'another_db': true } - */ - includeSchema?: DatabaseFilter; - /** - * Object describing which databases and tables to exclude (Same format as includeSchema) - * Example: { 'other_db': ['disallowed_table'], 'ex_db': true } - */ - excludeSchema?: DatabaseFilter; - /** - * BinLog position filename to start reading events from - */ - filename?: string; - /** - * BinLog position offset to start reading events from in file specified - */ - position?: number; - - /** - * Unique server ID for this replication client. - */ - serverId?: number; - }; - - export type ColumnSchema = { - COLUMN_NAME: string; - COLLATION_NAME: string; - CHARACTER_SET_NAME: string; - COLUMN_COMMENT: string; - COLUMN_TYPE: string; - }; - - export type ColumnDefinition = { - name: string; - charset: string; - type: number; - metadata: Record; - }; - - export type TableMapEntry = { - columnSchemas: ColumnSchema[]; - parentSchema: string; - tableName: string; - columns: ColumnDefinition[]; - }; - - export type BaseBinLogEvent = { - timestamp: number; - getEventName(): string; - - /** - * Next position in BinLog file to read from after - * this event. - */ - nextPosition: number; - /** - * Size of this event - */ - size: number; - flags: number; - useChecksum: boolean; - }; - - export type BinLogRotationEvent = BaseBinLogEvent & { - binlogName: string; - position: number; - }; - - export type BinLogGTIDLogEvent = BaseBinLogEvent & { - serverId: Buffer; - transactionRange: number; - }; - - export type BinLogXidEvent = BaseBinLogEvent & { - xid: number; - }; - - export type BinLogMutationEvent = BaseBinLogEvent & { - tableId: number; - numberOfColumns: number; - tableMap: Record; - rows: Record[]; - }; - - export type BinLogUpdateEvent = Omit & { - rows: { - before: Record; - after: Record; - }[]; - }; - - export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent; - - // @vlasky/mysql Connection - export interface MySQLConnection { - _socket?: Socket; - /** There are other forms of this method as well - this is the most basic one. */ - query(sql: string, callback: (error: any, results: any, fields: any) => void): void; - } - - export default class ZongJi { - connection: MySQLConnection; - constructor(options: ZongjiOptions); - - start(options: StartOptions): void; - stop(): void; - - on(type: 'binlog' | string, callback: (event: BinLogEvent) => void); - } -} diff --git a/modules/module-mysql/test/tsconfig.json b/modules/module-mysql/test/tsconfig.json index 5257b273..18898c4e 100644 --- a/modules/module-mysql/test/tsconfig.json +++ b/modules/module-mysql/test/tsconfig.json @@ -13,7 +13,7 @@ "@core-tests/*": ["../../../packages/service-core/test/src/*"] } }, - "include": ["src", "../src/replication/zongji/zongji.d.ts"], + "include": ["src"], "references": [ { "path": "../" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6f0c20ca..2a61565d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -271,8 +271,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: ^0.1.0 - version: 0.1.0 + specifier: 0.0.0-dev-20250521092520 + version: 0.0.0-dev-20250521092520 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1312,8 +1312,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.1.0': - resolution: {integrity: sha512-2GjOxVws+wtbb+xFUJe4Ozzkp/f0Gsna0fje9art76bmz6yfLCW4K3Mf2/M310xMnAIp8eP9hsJ6DYwwZCo1RA==} + '@powersync/mysql-zongji@0.0.0-dev-20250521092520': + resolution: {integrity: sha512-AZ03eO5O/LQ8MFl/Z6OWyLJ4Mykd/gSbfIA8Iy0XImIKQt+XY8MqvtU/u3LLIZOJ+1ea43h0BfPvnFMsgwVxZg==} engines: {node: '>=20.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -4778,7 +4778,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.1.0': + '@powersync/mysql-zongji@0.0.0-dev-20250521092520': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.51 From d03360ea89e36f0322389b3afd600cbc62db4f8d Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Wed, 21 May 2025 16:29:53 +0200 Subject: [PATCH 02/13] Moved most of the binlog event handling logic to a separate BinlogListener class. Introduced a mechanism to limit the maximum size of the binlog processing queue, thus also limiting memory usage. This maximum processing queue size is configurable --- .../src/replication/MySQLConnectionManager.ts | 2 +- .../src/replication/zongji/BinlogListener.ts | 186 ++++++++++++++++++ modules/module-mysql/src/types/types.ts | 9 +- .../test/src/BinlogListener.test.ts | 136 +++++++++++++ .../test/src/mysql-to-sqlite.test.ts | 2 +- 5 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 modules/module-mysql/src/replication/zongji/BinlogListener.ts create mode 100644 modules/module-mysql/test/src/BinlogListener.test.ts diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index 4548d838..d464a975 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -2,8 +2,8 @@ import { NormalizedMySQLConnectionConfig } from '../types/types.js'; import mysqlPromise from 'mysql2/promise'; import mysql, { FieldPacket, RowDataPacket } from 'mysql2'; import * as mysql_utils from '../utils/mysql-utils.js'; -import ZongJi from '@powersync/mysql-zongji'; import { logger } from '@powersync/lib-services-framework'; +import { ZongJi } from '@powersync/mysql-zongji'; export class MySQLConnectionManager { /** diff --git a/modules/module-mysql/src/replication/zongji/BinlogListener.ts b/modules/module-mysql/src/replication/zongji/BinlogListener.ts new file mode 100644 index 00000000..f67af740 --- /dev/null +++ b/modules/module-mysql/src/replication/zongji/BinlogListener.ts @@ -0,0 +1,186 @@ +import * as common from '../../common/common-index.js'; +import async from 'async'; +import { BinLogEvent, StartOptions, TableMapEntry, ZongJi } from '@powersync/mysql-zongji'; +import * as zongji_utils from './zongji-utils.js'; +import { logger } from '@powersync/lib-services-framework'; +import { MySQLConnectionManager } from '../MySQLConnectionManager.js'; + +export type Row = Record; + +export interface BinlogEventHandler { + onWrite: (rows: Row[], tableMap: TableMapEntry) => Promise; + onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise; + onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; + onCommit: (lsn: string) => Promise; +} + +export interface BinlogListenerOptions { + connectionManager: MySQLConnectionManager; + eventHandler: BinlogEventHandler; + includedTables: string[]; + serverId: number; + startPosition: common.BinLogPosition; + abortSignal: AbortSignal; +} + +export class BinlogListener { + private connectionManager: MySQLConnectionManager; + private eventHandler: BinlogEventHandler; + private binLogPosition: common.BinLogPosition; + private currentGTID: common.ReplicatedGTID | null; + + zongji: ZongJi; + processingQueue: async.QueueObject; + + constructor(public options: BinlogListenerOptions) { + this.connectionManager = options.connectionManager; + this.eventHandler = options.eventHandler; + this.binLogPosition = options.startPosition; + this.currentGTID = null; + + this.processingQueue = async.queue(this.createQueueWorker(), 1); + this.zongji = this.createZongjiListener(); + } + + public async start(): Promise { + logger.info(`Starting replication. Created replica client with serverId:${this.options.serverId}`); + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + await new Promise((resolve, reject) => { + this.zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + function (error: any, results: any, fields: any) { + if (error) { + reject(error); + } else { + resolve(results); + } + } + ); + }); + logger.info('Successfully set up replication connection heartbeat...'); + + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = this.zongji.connection._socket!; + socket.setTimeout(60_000, () => { + socket.destroy(new Error('Replication connection timeout.')); + }); + + logger.info(`Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}`); + this.zongji.start({ + // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive + // tablemap events always need to be included for the other row events to work + includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], + includeSchema: { [this.connectionManager.databaseName]: this.options.includedTables }, + filename: this.binLogPosition.filename, + position: this.binLogPosition.offset, + serverId: this.options.serverId + } satisfies StartOptions); + + await new Promise((resolve, reject) => { + this.zongji.on('error', (error) => { + logger.error('Binlog listener error:', error); + this.zongji.stop(); + this.processingQueue.kill(); + reject(error); + }); + + this.processingQueue.error((error) => { + logger.error('BinlogEvent processing error:', error); + this.zongji.stop(); + this.processingQueue.kill(); + reject(error); + }); + + this.zongji.on('stopped', () => { + logger.info('Binlog listener stopped. Replication ended.'); + resolve(); + }); + + const stop = () => { + logger.info('Abort signal received, stopping replication...'); + this.zongji.stop(); + this.processingQueue.kill(); + resolve(); + }; + + this.options.abortSignal.addEventListener('abort', stop, { once: true }); + + if (this.options.abortSignal.aborted) { + // Generally this should have been picked up early, but we add this here as a failsafe. + stop(); + } + }); + } + + private createZongjiListener(): ZongJi { + const zongji = this.connectionManager.createBinlogListener(); + + zongji.on('binlog', async (evt) => { + logger.info(`Received Binlog event:${evt.getEventName()}`); + this.processingQueue.push(evt); + + // When the processing queue grows past the threshold, we pause the binlog listener + if (this.processingQueue.length() > this.connectionManager.options.max_binlog_queue_size) { + logger.info( + `Max Binlog processing queue length [${this.connectionManager.options.max_binlog_queue_size}] reached. Pausing Binlog listener.` + ); + zongji.pause(); + await this.processingQueue.empty(); + logger.info(`Binlog processing queue backlog cleared. Resuming Binlog listener.`); + zongji.resume(); + } + }); + + return zongji; + } + + private createQueueWorker() { + return async (evt: BinLogEvent) => { + switch (true) { + case zongji_utils.eventIsGTIDLog(evt): + this.currentGTID = common.ReplicatedGTID.fromBinLogEvent({ + raw_gtid: { + server_id: evt.serverId, + transaction_range: evt.transactionRange + }, + position: { + filename: this.binLogPosition.filename, + offset: evt.nextPosition + } + }); + break; + case zongji_utils.eventIsRotation(evt): + this.binLogPosition.filename = evt.binlogName; + this.binLogPosition.offset = evt.position; + break; + case zongji_utils.eventIsWriteMutation(evt): + await this.eventHandler.onWrite(evt.rows, evt.tableMap[evt.tableId]); + break; + case zongji_utils.eventIsUpdateMutation(evt): + await this.eventHandler.onUpdate( + evt.rows.map((row) => row.after), + evt.rows.map((row) => row.before), + evt.tableMap[evt.tableId] + ); + break; + case zongji_utils.eventIsDeleteMutation(evt): + await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]); + break; + case zongji_utils.eventIsXid(evt): + const LSN = new common.ReplicatedGTID({ + raw_gtid: this.currentGTID!.raw, + position: { + filename: this.binLogPosition.filename, + offset: evt.nextPosition + } + }).comparable; + await this.eventHandler.onCommit(LSN); + break; + } + }; + } +} diff --git a/modules/module-mysql/src/types/types.ts b/modules/module-mysql/src/types/types.ts index c0826187..d8079d1f 100644 --- a/modules/module-mysql/src/types/types.ts +++ b/modules/module-mysql/src/types/types.ts @@ -23,6 +23,8 @@ export interface NormalizedMySQLConnectionConfig { client_private_key?: string; lookup?: LookupFunction; + + max_binlog_queue_size: number; } export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -40,7 +42,9 @@ export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.a client_certificate: t.string.optional(), client_private_key: t.string.optional(), - reject_ip_ranges: t.array(t.string).optional() + reject_ip_ranges: t.array(t.string).optional(), + // The maximum number of binlog events that can be queued in memory before throttling is applied. + max_binlog_queue_size: t.number.optional() }) ); @@ -114,6 +118,9 @@ export function normalizeConnectionConfig(options: MySQLConnectionConfig): Norma server_id: options.server_id ?? 1, + // Based on profiling, a queue size of 1000 uses about 50MB of memory. + max_binlog_queue_size: options.max_binlog_queue_size ?? 1000, + lookup }; } diff --git a/modules/module-mysql/test/src/BinlogListener.test.ts b/modules/module-mysql/test/src/BinlogListener.test.ts new file mode 100644 index 00000000..278812cd --- /dev/null +++ b/modules/module-mysql/test/src/BinlogListener.test.ts @@ -0,0 +1,136 @@ +import { describe, test, beforeEach, vi, expect, afterEach } from 'vitest'; +import { BinlogEventHandler, BinlogListener, Row } from '@module/replication/zongji/BinlogListener.js'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import { v4 as uuid } from 'uuid'; +import * as common from '@module/common/common-index.js'; +import { createRandomServerId } from '@module/utils/mysql-utils.js'; +import { TableMapEntry } from '@powersync/mysql-zongji'; + +describe('BinlogListener tests', () => { + const MAX_QUEUE_SIZE = 10; + const BINLOG_LISTENER_CONNECTION_OPTIONS = { + ...TEST_CONNECTION_OPTIONS, + max_binlog_queue_size: MAX_QUEUE_SIZE + }; + + let connectionManager: MySQLConnectionManager; + let abortController: AbortController; + let eventHandler: TestBinlogEventHandler; + let binlogListener: BinlogListener; + + beforeEach(async () => { + connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {}); + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); + connection.release(); + const fromGTID = await getFromGTID(connectionManager); + + abortController = new AbortController(); + eventHandler = new TestBinlogEventHandler(); + binlogListener = new BinlogListener({ + connectionManager: connectionManager, + eventHandler: eventHandler, + startPosition: fromGTID.position, + includedTables: ['test_DATA'], + serverId: createRandomServerId(1), + abortSignal: abortController.signal + }); + }); + + afterEach(async () => { + await connectionManager.end(); + }); + + test('Binlog listener stops on abort signal', async () => { + const stopSpy = vi.spyOn(binlogListener.zongji, 'stop'); + + setTimeout(() => abortController.abort(), 10); + await expect(binlogListener.start()).resolves.toBeUndefined(); + expect(stopSpy).toHaveBeenCalled(); + }); + + test('Pause Zongji binlog listener when processing queue reaches max size', async () => { + const pauseSpy = vi.spyOn(binlogListener.zongji, 'pause'); + const resumeSpy = vi.spyOn(binlogListener.zongji, 'resume'); + const queueSpy = vi.spyOn(binlogListener.processingQueue, 'length'); + + const ROW_COUNT = 100; + await insertRows(connectionManager, ROW_COUNT); + + const startPromise = binlogListener.start(); + + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); + abortController.abort(); + await expect(startPromise).resolves.toBeUndefined(); + + // Count how many times the queue reached the max size. Consequently, we expect the listener to have paused and resumed that many times. + const overThresholdCount = queueSpy.mock.results.map((r) => r.value).filter((v) => v === MAX_QUEUE_SIZE).length; + expect(pauseSpy).toHaveBeenCalledTimes(overThresholdCount); + expect(resumeSpy).toHaveBeenCalledTimes(overThresholdCount); + }); + + test('Binlog events are correctly forwarded to provided binlog events handler', async () => { + const startPromise = binlogListener.start(); + + const ROW_COUNT = 10; + await insertRows(connectionManager, ROW_COUNT); + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); + expect(eventHandler.commitCount).equals(ROW_COUNT); + + await updateRows(connectionManager); + await vi.waitFor(() => expect(eventHandler.rowsUpdated).equals(ROW_COUNT), { timeout: 5000 }); + + await deleteRows(connectionManager); + await vi.waitFor(() => expect(eventHandler.rowsDeleted).equals(ROW_COUNT), { timeout: 5000 }); + + abortController.abort(); + await expect(startPromise).resolves.toBeUndefined(); + }); +}); + +async function getFromGTID(connectionManager: MySQLConnectionManager) { + const connection = await connectionManager.getConnection(); + const fromGTID = await common.readExecutedGtid(connection); + connection.release(); + + return fromGTID; +} + +async function insertRows(connectionManager: MySQLConnectionManager, count: number) { + for (let i = 0; i < count; i++) { + await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i}')`); + } +} + +async function updateRows(connectionManager: MySQLConnectionManager) { + await connectionManager.query(`UPDATE test_DATA SET description='updated'`); +} + +async function deleteRows(connectionManager: MySQLConnectionManager) { + await connectionManager.query(`DELETE FROM test_DATA`); +} + +class TestBinlogEventHandler implements BinlogEventHandler { + rowsWritten = 0; + rowsUpdated = 0; + rowsDeleted = 0; + commitCount = 0; + + async onWrite(rows: Row[], tableMap: TableMapEntry) { + this.rowsWritten = this.rowsWritten + rows.length; + } + + async onUpdate(afterRows: Row[], beforeRows: Row[], tableMap: TableMapEntry) { + this.rowsUpdated = this.rowsUpdated + afterRows.length; + } + + async onDelete(rows: Row[], tableMap: TableMapEntry) { + this.rowsDeleted = this.rowsDeleted + rows.length; + } + + async onCommit(lsn: string) { + this.commitCount++; + } +} diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 3b172955..97c5db93 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -3,7 +3,7 @@ import { afterAll, describe, expect, test } from 'vitest'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import { eventIsWriteMutation, eventIsXid } from '@module/replication/zongji/zongji-utils.js'; import * as common from '@module/common/common-index.js'; -import ZongJi, { BinLogEvent } from '@powersync/mysql-zongji'; +import { BinLogEvent, ZongJi } from '@powersync/mysql-zongji'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; import { toColumnDescriptors } from '@module/common/common-index.js'; From 924ecd87f81d979436d5b44e496a2c8017064ba5 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Wed, 21 May 2025 16:30:30 +0200 Subject: [PATCH 03/13] Updated the BinLogStream to use the new BinLogListener --- .../src/replication/BinLogStream.ts | 245 +++++------------- 1 file changed, 65 insertions(+), 180 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index a1af98ac..be4fabfb 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -1,6 +1,5 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError } from '@powersync/lib-services-framework'; import * as sync_rules from '@powersync/service-sync-rules'; -import async from 'async'; import { ColumnDescriptor, @@ -9,16 +8,15 @@ import { MetricsEngine, storage } from '@powersync/service-core'; -import mysql, { FieldPacket } from 'mysql2'; - -import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongji'; +import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; + +import { TableMapEntry } from '@powersync/mysql-zongji'; import * as common from '../common/common-index.js'; -import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js'; import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; -import * as zongji_utils from './zongji/zongji-utils.js'; import { ReplicationMetric } from '@powersync/service-types'; +import { BinlogEventHandler, BinlogListener, Row } from './zongji/BinlogListener.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -34,16 +32,14 @@ interface MysqlRelId { interface WriteChangePayload { type: storage.SaveOperationTag; - data: Data; - previous_data?: Data; + row: Row; + previous_row?: Row; database: string; table: string; sourceTable: storage.SourceTable; columns: Map; } -export type Data = Record; - export class BinlogConfigurationError extends Error { constructor(message: string) { super(message); @@ -247,7 +243,7 @@ AND table_type = 'BASE TABLE';`, // Check if the binlog is still available. If it isn't we need to snapshot again. const connection = await this.connections.getConnection(); try { - const isAvailable = await isBinlogStillAvailable(connection, lastKnowGTID.position.filename); + const isAvailable = await common.isBinlogStillAvailable(connection, lastKnowGTID.position.filename); if (!isAvailable) { logger.info( `Binlog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` @@ -288,7 +284,7 @@ AND table_type = 'BASE TABLE';`, const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, tablePattern); @@ -324,9 +320,9 @@ AND table_type = 'BASE TABLE';`, const stream = query.stream(); let columns: Map | undefined = undefined; - stream.on('fields', (fields: FieldPacket[]) => { + stream.on('fields', (fields: mysql.FieldPacket[]) => { // Map the columns and their types - columns = toColumnDescriptors(fields); + columns = common.toColumnDescriptors(fields); }); for await (let row of stream) { @@ -383,7 +379,7 @@ AND table_type = 'BASE TABLE';`, // This is needed for includeSchema to work correctly. const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { for (let tablePattern of sourceTables) { await this.getQualifiedTableNames(batch, tablePattern); @@ -407,14 +403,12 @@ AND table_type = 'BASE TABLE';`, // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); const serverId = createRandomServerId(this.storage.group_id); - logger.info(`Starting replication. Created replica client with serverId:${serverId}`); const connection = await this.connections.getConnection(); const { checkpoint_lsn } = await this.storage.getStatus(); if (checkpoint_lsn) { logger.info(`Existing checkpoint found: ${checkpoint_lsn}`); } - const fromGTID = checkpoint_lsn ? common.ReplicatedGTID.fromSerialized(checkpoint_lsn) : await common.readExecutedGtid(connection); @@ -423,179 +417,70 @@ AND table_type = 'BASE TABLE';`, if (!this.stopped) { await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { - const zongji = this.connections.createBinlogListener(); - - let currentGTID: common.ReplicatedGTID | null = null; - - const queue = async.queue(async (evt: BinLogEvent) => { - // State machine - switch (true) { - case zongji_utils.eventIsGTIDLog(evt): - currentGTID = common.ReplicatedGTID.fromBinLogEvent({ - raw_gtid: { - server_id: evt.serverId, - transaction_range: evt.transactionRange - }, - position: { - filename: binLogPositionState.filename, - offset: evt.nextPosition - } - }); - break; - case zongji_utils.eventIsRotation(evt): - // Update the position - binLogPositionState.filename = evt.binlogName; - binLogPositionState.offset = evt.position; - break; - case zongji_utils.eventIsWriteMutation(evt): - const writeTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.INSERT, - data: evt.rows, - tableEntry: writeTableInfo - }); - break; - case zongji_utils.eventIsUpdateMutation(evt): - const updateTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.UPDATE, - data: evt.rows.map((row) => row.after), - previous_data: evt.rows.map((row) => row.before), - tableEntry: updateTableInfo - }); - break; - case zongji_utils.eventIsDeleteMutation(evt): - const deleteTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.DELETE, - data: evt.rows, - tableEntry: deleteTableInfo - }); - break; - case zongji_utils.eventIsXid(evt): - this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); - // Need to commit with a replicated GTID with updated next position - await batch.commit( - new common.ReplicatedGTID({ - raw_gtid: currentGTID!.raw, - position: { - filename: binLogPositionState.filename, - offset: evt.nextPosition - } - }).comparable - ); - currentGTID = null; - // chunks_replicated_total.add(1); - break; - } - }, 1); - - zongji.on('binlog', (evt: BinLogEvent) => { - if (!this.stopped) { - logger.info(`Received Binlog event:${evt.getEventName()}`); - queue.push(evt); - } else { - logger.info(`Replication is busy stopping, ignoring event ${evt.getEventName()}`); - } - }); - - // Set a heartbeat interval for the Zongji replication connection - // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown - // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. - await new Promise((resolve, reject) => { - zongji.connection.query( - // In nanoseconds, 10^9 = 1s - 'set @master_heartbeat_period=28*1000000000', - function (error: any, results: any, fields: any) { - if (error) { - reject(error); - } else { - resolve(results); - } - } - ); - }); - logger.info('Successfully set up replication connection heartbeat...'); - - // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. - // The timeout here must be greater than the master_heartbeat_period. - const socket = zongji.connection._socket!; - socket.setTimeout(60_000, () => { - socket.destroy(new Error('Replication connection timeout.')); - }); - - if (this.stopped) { - // Powersync is shutting down, don't start replicating - return; - } - - logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); + const binlogEventHandler = this.createBinlogEventHandler(batch); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); - zongji.start({ - // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive - includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], - excludeEvents: [], - includeSchema: { [this.defaultSchema]: includedTables }, - filename: binLogPositionState.filename, - position: binLogPositionState.offset, - serverId: serverId - } satisfies StartOptions); - - // Forever young - await new Promise((resolve, reject) => { - zongji.on('error', (error) => { - logger.error('Binlog listener error:', error); - zongji.stop(); - queue.kill(); - reject(error); - }); - - zongji.on('stopped', () => { - logger.info('Binlog listener stopped. Replication ended.'); - resolve(); - }); - - queue.error((error) => { - logger.error('Binlog listener queue error:', error); - zongji.stop(); - queue.kill(); - reject(error); - }); - - const stop = () => { - logger.info('Abort signal received, stopping replication...'); - zongji.stop(); - queue.kill(); - resolve(); - }; - - this.abortSignal.addEventListener('abort', stop, { once: true }); - - if (this.stopped) { - // Generally this should have been picked up early, but we add this here as a failsafe. - stop(); - } + const binlogListener = new BinlogListener({ + abortSignal: this.abortSignal, + includedTables: includedTables, + startPosition: binLogPositionState, + connectionManager: this.connections, + serverId: serverId, + eventHandler: binlogEventHandler }); + + await binlogListener.start(); } ); } } + private createBinlogEventHandler(batch: storage.BucketStorageBatch): BinlogEventHandler { + return { + onWrite: async (rows: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.INSERT, + rows: rows, + tableEntry: tableMap + }); + }, + + onUpdate: async (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.UPDATE, + rows: rowsAfter, + rows_before: rowsBefore, + tableEntry: tableMap + }); + }, + onDelete: async (rows: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.DELETE, + rows: rows, + tableEntry: tableMap + }); + }, + onCommit: async (lsn: string) => { + this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); + await batch.commit(lsn); + } + }; + } + private async writeChanges( batch: storage.BucketStorageBatch, msg: { type: storage.SaveOperationTag; - data: Data[]; - previous_data?: Data[]; + rows: Row[]; + rows_before?: Row[]; tableEntry: TableMapEntry; } ): Promise { - const columns = toColumnDescriptors(msg.tableEntry); + const columns = common.toColumnDescriptors(msg.tableEntry); - for (const [index, row] of msg.data.entries()) { + for (const [index, row] of msg.rows.entries()) { await this.writeChange(batch, { type: msg.type, database: msg.tableEntry.parentSchema, @@ -607,8 +492,8 @@ AND table_type = 'BASE TABLE';`, ), table: msg.tableEntry.tableName, columns: columns, - data: row, - previous_data: msg.previous_data?.[index] + row: row, + previous_row: msg.rows_before?.[index] }); } return null; @@ -621,7 +506,7 @@ AND table_type = 'BASE TABLE';`, switch (payload.type) { case storage.SaveOperationTag.INSERT: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const record = common.toSQLiteRow(payload.data, payload.columns); + const record = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.INSERT, sourceTable: payload.sourceTable, @@ -634,10 +519,10 @@ AND table_type = 'BASE TABLE';`, this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); // "before" may be null if the replica id columns are unchanged // It's fine to treat that the same as an insert. - const beforeUpdated = payload.previous_data - ? common.toSQLiteRow(payload.previous_data, payload.columns) + const beforeUpdated = payload.previous_row + ? common.toSQLiteRow(payload.previous_row, payload.columns) : undefined; - const after = common.toSQLiteRow(payload.data, payload.columns); + const after = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.UPDATE, @@ -652,7 +537,7 @@ AND table_type = 'BASE TABLE';`, case storage.SaveOperationTag.DELETE: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns); + const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.DELETE, From 404dcdeddf1a6812fed1a6f4c788375113f2e2d8 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Wed, 21 May 2025 16:35:02 +0200 Subject: [PATCH 04/13] Renamed BinlogListener to BinLogListener --- .../src/replication/BinLogStream.ts | 6 ++--- .../{BinlogListener.ts => BinLogListener.ts} | 12 ++++----- ...istener.test.ts => BinLogListener.test.ts} | 26 +++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) rename modules/module-mysql/src/replication/zongji/{BinlogListener.ts => BinLogListener.ts} (96%) rename modules/module-mysql/test/src/{BinlogListener.test.ts => BinLogListener.test.ts} (85%) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index be4fabfb..ff1120a5 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -16,7 +16,7 @@ import * as common from '../common/common-index.js'; import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; import { ReplicationMetric } from '@powersync/service-types'; -import { BinlogEventHandler, BinlogListener, Row } from './zongji/BinlogListener.js'; +import { BinLogEventHandler, BinLogListener, Row } from './zongji/BinLogListener.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -422,7 +422,7 @@ AND table_type = 'BASE TABLE';`, const binlogEventHandler = this.createBinlogEventHandler(batch); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); - const binlogListener = new BinlogListener({ + const binlogListener = new BinLogListener({ abortSignal: this.abortSignal, includedTables: includedTables, startPosition: binLogPositionState, @@ -437,7 +437,7 @@ AND table_type = 'BASE TABLE';`, } } - private createBinlogEventHandler(batch: storage.BucketStorageBatch): BinlogEventHandler { + private createBinlogEventHandler(batch: storage.BucketStorageBatch): BinLogEventHandler { return { onWrite: async (rows: Row[], tableMap: TableMapEntry) => { await this.writeChanges(batch, { diff --git a/modules/module-mysql/src/replication/zongji/BinlogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts similarity index 96% rename from modules/module-mysql/src/replication/zongji/BinlogListener.ts rename to modules/module-mysql/src/replication/zongji/BinLogListener.ts index f67af740..9ee8563b 100644 --- a/modules/module-mysql/src/replication/zongji/BinlogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -7,32 +7,32 @@ import { MySQLConnectionManager } from '../MySQLConnectionManager.js'; export type Row = Record; -export interface BinlogEventHandler { +export interface BinLogEventHandler { onWrite: (rows: Row[], tableMap: TableMapEntry) => Promise; onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise; onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; onCommit: (lsn: string) => Promise; } -export interface BinlogListenerOptions { +export interface BinLogListenerOptions { connectionManager: MySQLConnectionManager; - eventHandler: BinlogEventHandler; + eventHandler: BinLogEventHandler; includedTables: string[]; serverId: number; startPosition: common.BinLogPosition; abortSignal: AbortSignal; } -export class BinlogListener { +export class BinLogListener { private connectionManager: MySQLConnectionManager; - private eventHandler: BinlogEventHandler; + private eventHandler: BinLogEventHandler; private binLogPosition: common.BinLogPosition; private currentGTID: common.ReplicatedGTID | null; zongji: ZongJi; processingQueue: async.QueueObject; - constructor(public options: BinlogListenerOptions) { + constructor(public options: BinLogListenerOptions) { this.connectionManager = options.connectionManager; this.eventHandler = options.eventHandler; this.binLogPosition = options.startPosition; diff --git a/modules/module-mysql/test/src/BinlogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts similarity index 85% rename from modules/module-mysql/test/src/BinlogListener.test.ts rename to modules/module-mysql/test/src/BinLogListener.test.ts index 278812cd..038621c6 100644 --- a/modules/module-mysql/test/src/BinlogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -1,5 +1,5 @@ import { describe, test, beforeEach, vi, expect, afterEach } from 'vitest'; -import { BinlogEventHandler, BinlogListener, Row } from '@module/replication/zongji/BinlogListener.js'; +import { BinLogEventHandler, BinLogListener, Row } from '@module/replication/zongji/BinLogListener.js'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import { v4 as uuid } from 'uuid'; @@ -16,8 +16,8 @@ describe('BinlogListener tests', () => { let connectionManager: MySQLConnectionManager; let abortController: AbortController; - let eventHandler: TestBinlogEventHandler; - let binlogListener: BinlogListener; + let eventHandler: TestBinLogEventHandler; + let binLogListener: BinLogListener; beforeEach(async () => { connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {}); @@ -28,8 +28,8 @@ describe('BinlogListener tests', () => { const fromGTID = await getFromGTID(connectionManager); abortController = new AbortController(); - eventHandler = new TestBinlogEventHandler(); - binlogListener = new BinlogListener({ + eventHandler = new TestBinLogEventHandler(); + binLogListener = new BinLogListener({ connectionManager: connectionManager, eventHandler: eventHandler, startPosition: fromGTID.position, @@ -44,22 +44,22 @@ describe('BinlogListener tests', () => { }); test('Binlog listener stops on abort signal', async () => { - const stopSpy = vi.spyOn(binlogListener.zongji, 'stop'); + const stopSpy = vi.spyOn(binLogListener.zongji, 'stop'); setTimeout(() => abortController.abort(), 10); - await expect(binlogListener.start()).resolves.toBeUndefined(); + await expect(binLogListener.start()).resolves.toBeUndefined(); expect(stopSpy).toHaveBeenCalled(); }); test('Pause Zongji binlog listener when processing queue reaches max size', async () => { - const pauseSpy = vi.spyOn(binlogListener.zongji, 'pause'); - const resumeSpy = vi.spyOn(binlogListener.zongji, 'resume'); - const queueSpy = vi.spyOn(binlogListener.processingQueue, 'length'); + const pauseSpy = vi.spyOn(binLogListener.zongji, 'pause'); + const resumeSpy = vi.spyOn(binLogListener.zongji, 'resume'); + const queueSpy = vi.spyOn(binLogListener.processingQueue, 'length'); const ROW_COUNT = 100; await insertRows(connectionManager, ROW_COUNT); - const startPromise = binlogListener.start(); + const startPromise = binLogListener.start(); await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); abortController.abort(); @@ -72,7 +72,7 @@ describe('BinlogListener tests', () => { }); test('Binlog events are correctly forwarded to provided binlog events handler', async () => { - const startPromise = binlogListener.start(); + const startPromise = binLogListener.start(); const ROW_COUNT = 10; await insertRows(connectionManager, ROW_COUNT); @@ -112,7 +112,7 @@ async function deleteRows(connectionManager: MySQLConnectionManager) { await connectionManager.query(`DELETE FROM test_DATA`); } -class TestBinlogEventHandler implements BinlogEventHandler { +class TestBinLogEventHandler implements BinLogEventHandler { rowsWritten = 0; rowsUpdated = 0; rowsDeleted = 0; From b1b8c30074dc0d33e7a4d1cf4288f784e35149a6 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Wed, 21 May 2025 16:42:21 +0200 Subject: [PATCH 05/13] Added changeset --- .changeset/honest-ties-crash.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/honest-ties-crash.md diff --git a/.changeset/honest-ties-crash.md b/.changeset/honest-ties-crash.md new file mode 100644 index 00000000..1644a321 --- /dev/null +++ b/.changeset/honest-ties-crash.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mysql': minor +--- + +Added a configurable limit for the MySQL binlog processing queue to limit memory usage. From 126f9b3104122bafe71068a7d0f66578a409b79e Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Thu, 22 May 2025 15:25:12 +0200 Subject: [PATCH 06/13] Simplified BinLogListener stopping mechanism Cleaned up BinLogStream logs a bit --- modules/module-mysql/package.json | 2 +- .../src/replication/BinLogStream.ts | 16 +++++++++--- .../src/replication/zongji/BinLogListener.ts | 25 ++++++++----------- .../src/replication/zongji/zongji-utils.ts | 10 ++++---- .../test/src/BinLogListener.test.ts | 19 +++++++------- pnpm-lock.yaml | 20 +++++++++------ 6 files changed, 51 insertions(+), 41 deletions(-) diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index e4bea339..2d19145a 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20250521092520", + "@powersync/mysql-zongji": "0.0.0-dev-20250522110942", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index ff1120a5..f8be521b 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -246,7 +246,7 @@ AND table_type = 'BASE TABLE';`, const isAvailable = await common.isBinlogStillAvailable(connection, lastKnowGTID.position.filename); if (!isAvailable) { logger.info( - `Binlog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` + `BinLog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` ); } return isAvailable; @@ -355,7 +355,7 @@ AND table_type = 'BASE TABLE';`, // all connections automatically closed, including this one. await this.initReplication(); await this.streamChanges(); - logger.info('BinlogStream has been shut down'); + logger.info('BinLogStream has been shut down.'); } catch (e) { await this.storage.reportError(e); throw e; @@ -368,7 +368,7 @@ AND table_type = 'BASE TABLE';`, connection.release(); if (errors.length > 0) { - throw new BinlogConfigurationError(`Binlog Configuration Errors: ${errors.join(', ')}`); + throw new BinlogConfigurationError(`BinLog Configuration Errors: ${errors.join(', ')}`); } const initialReplicationCompleted = await this.checkInitialReplicated(); @@ -423,7 +423,6 @@ AND table_type = 'BASE TABLE';`, // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); const binlogListener = new BinLogListener({ - abortSignal: this.abortSignal, includedTables: includedTables, startPosition: binLogPositionState, connectionManager: this.connections, @@ -431,6 +430,15 @@ AND table_type = 'BASE TABLE';`, eventHandler: binlogEventHandler }); + this.abortSignal.addEventListener( + 'abort', + () => { + logger.info('Abort signal received, stopping replication...'); + binlogListener.stop(); + }, + { once: true } + ); + await binlogListener.start(); } ); diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index 9ee8563b..d0dfef4a 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -20,9 +20,12 @@ export interface BinLogListenerOptions { includedTables: string[]; serverId: number; startPosition: common.BinLogPosition; - abortSignal: AbortSignal; } +/** + * Wrapper class for the Zongji BinLog listener. Internally handles the creation and management of the listener and posts + * events on the provided BinLogEventHandler. + */ export class BinLogListener { private connectionManager: MySQLConnectionManager; private eventHandler: BinLogEventHandler; @@ -96,24 +99,16 @@ export class BinLogListener { }); this.zongji.on('stopped', () => { - logger.info('Binlog listener stopped. Replication ended.'); - resolve(); - }); - - const stop = () => { - logger.info('Abort signal received, stopping replication...'); - this.zongji.stop(); this.processingQueue.kill(); resolve(); - }; + }); + }); - this.options.abortSignal.addEventListener('abort', stop, { once: true }); + logger.info('BinLog listener stopped. Replication ended.'); + } - if (this.options.abortSignal.aborted) { - // Generally this should have been picked up early, but we add this here as a failsafe. - stop(); - } - }); + public stop(): void { + this.zongji.stop(); } private createZongjiListener(): ZongJi { diff --git a/modules/module-mysql/src/replication/zongji/zongji-utils.ts b/modules/module-mysql/src/replication/zongji/zongji-utils.ts index 24d01663..ee9e4c53 100644 --- a/modules/module-mysql/src/replication/zongji/zongji-utils.ts +++ b/modules/module-mysql/src/replication/zongji/zongji-utils.ts @@ -1,10 +1,10 @@ import { BinLogEvent, BinLogGTIDLogEvent, - BinLogMutationEvent, + BinLogRowEvent, BinLogRotationEvent, BinLogTableMapEvent, - BinLogUpdateEvent, + BinLogRowUpdateEvent, BinLogXidEvent } from '@powersync/mysql-zongji'; @@ -24,14 +24,14 @@ export function eventIsRotation(event: BinLogEvent): event is BinLogRotationEven return event.getEventName() == 'rotate'; } -export function eventIsWriteMutation(event: BinLogEvent): event is BinLogMutationEvent { +export function eventIsWriteMutation(event: BinLogEvent): event is BinLogRowEvent { return event.getEventName() == 'writerows'; } -export function eventIsDeleteMutation(event: BinLogEvent): event is BinLogMutationEvent { +export function eventIsDeleteMutation(event: BinLogEvent): event is BinLogRowEvent { return event.getEventName() == 'deleterows'; } -export function eventIsUpdateMutation(event: BinLogEvent): event is BinLogUpdateEvent { +export function eventIsUpdateMutation(event: BinLogEvent): event is BinLogRowUpdateEvent { return event.getEventName() == 'updaterows'; } diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts index 038621c6..cf385f52 100644 --- a/modules/module-mysql/test/src/BinLogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -15,7 +15,6 @@ describe('BinlogListener tests', () => { }; let connectionManager: MySQLConnectionManager; - let abortController: AbortController; let eventHandler: TestBinLogEventHandler; let binLogListener: BinLogListener; @@ -27,15 +26,13 @@ describe('BinlogListener tests', () => { connection.release(); const fromGTID = await getFromGTID(connectionManager); - abortController = new AbortController(); eventHandler = new TestBinLogEventHandler(); binLogListener = new BinLogListener({ connectionManager: connectionManager, eventHandler: eventHandler, startPosition: fromGTID.position, includedTables: ['test_DATA'], - serverId: createRandomServerId(1), - abortSignal: abortController.signal + serverId: createRandomServerId(1) }); }); @@ -43,12 +40,16 @@ describe('BinlogListener tests', () => { await connectionManager.end(); }); - test('Binlog listener stops on abort signal', async () => { + test('Stop binlog listener', async () => { const stopSpy = vi.spyOn(binLogListener.zongji, 'stop'); + const queueStopSpy = vi.spyOn(binLogListener.processingQueue, 'kill'); - setTimeout(() => abortController.abort(), 10); - await expect(binLogListener.start()).resolves.toBeUndefined(); + const startPromise = binLogListener.start(); + setTimeout(async () => binLogListener.stop(), 50); + + await expect(startPromise).resolves.toBeUndefined(); expect(stopSpy).toHaveBeenCalled(); + expect(queueStopSpy).toHaveBeenCalled(); }); test('Pause Zongji binlog listener when processing queue reaches max size', async () => { @@ -62,7 +63,7 @@ describe('BinlogListener tests', () => { const startPromise = binLogListener.start(); await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); - abortController.abort(); + binLogListener.stop(); await expect(startPromise).resolves.toBeUndefined(); // Count how many times the queue reached the max size. Consequently, we expect the listener to have paused and resumed that many times. @@ -85,7 +86,7 @@ describe('BinlogListener tests', () => { await deleteRows(connectionManager); await vi.waitFor(() => expect(eventHandler.rowsDeleted).equals(ROW_COUNT), { timeout: 5000 }); - abortController.abort(); + binLogListener.stop(); await expect(startPromise).resolves.toBeUndefined(); }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 60d6c7b2..33c54c7d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20250521092520 - version: 0.0.0-dev-20250521092520 + specifier: 0.0.0-dev-20250522110942 + version: 0.0.0-dev-20250522110942 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,9 +1281,9 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20250521092520': - resolution: {integrity: sha512-AZ03eO5O/LQ8MFl/Z6OWyLJ4Mykd/gSbfIA8Iy0XImIKQt+XY8MqvtU/u3LLIZOJ+1ea43h0BfPvnFMsgwVxZg==} - engines: {node: '>=20.0.0'} + '@powersync/mysql-zongji@0.0.0-dev-20250522110942': + resolution: {integrity: sha512-6Sx6FUQeWBdOxUp8NucRAAyMKc+jkWIKZoPW4V2Y8hiEJZsqEOK7+HEOvkVMTHKF/dK5MOGtNk5tcUqbd23d2g==} + engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': resolution: {integrity: sha512-BgxgUewuw4HFCM9MzuzlIuRKHya6rimNPYqUItt7CO3ySUeUnX8Qn9eZpMxu9AT5Y8zqkSyxvduY36zZueNojg==} @@ -1753,6 +1753,10 @@ packages: resolution: {integrity: sha512-GPEid2Y9QU1Exl1rpO9B2IPJGHPSupF5GnVIP0blYvNOMer2bTvSWs1jGOUg04hTmu67nmLsQ9TBo1puaotBHg==} engines: {node: '>=0.6'} + big-integer@1.6.52: + resolution: {integrity: sha512-QxD8cf2eVqJOOz63z6JIN9BzvVs/dlySa5HGSBH5xtR8dPteIRQnBxxKqkNTiT6jbDTF6jAfrd4oMcND9RGbQg==} + engines: {node: '>=0.6'} + bignumber.js@9.1.1: resolution: {integrity: sha512-pHm4LsMJ6lzgNGVfZHjMoO8sdoRhOzOH4MLmY65Jg70bpxCKu5iOHNJyfF6OyvYw7t8Fpf35RuzUyqnQsj8Vig==} @@ -4744,10 +4748,10 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20250521092520': + '@powersync/mysql-zongji@0.0.0-dev-20250522110942': dependencies: '@vlasky/mysql': 2.18.6 - big-integer: 1.6.51 + big-integer: 1.6.52 iconv-lite: 0.6.3 '@powersync/service-jsonbig@0.17.10': @@ -5192,6 +5196,8 @@ snapshots: big-integer@1.6.51: {} + big-integer@1.6.52: {} + bignumber.js@9.1.1: {} binary-extensions@2.3.0: {} From a03260f8203db56f7c78190d5f0fea04626774b8 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Mon, 26 May 2025 14:22:13 +0200 Subject: [PATCH 07/13] Corrected BinLogListener name. Simplified BinLogListener stopping mechanism --- modules/module-mysql/package.json | 2 +- modules/module-mysql/src/replication/BinLogStream.ts | 4 +++- .../src/replication/zongji/BinLogListener.ts | 2 +- pnpm-lock.yaml | 10 +++++----- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 2d19145a..2db0f5e9 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20250522110942", + "@powersync/mysql-zongji": "0.0.0-dev-20250526121208", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index f8be521b..5829fdaf 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -439,7 +439,9 @@ AND table_type = 'BASE TABLE';`, { once: true } ); - await binlogListener.start(); + if (!this.stopped) { + await binlogListener.start(); + } } ); } diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index d0dfef4a..7e2e08cf 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -73,7 +73,7 @@ export class BinLogListener { }); logger.info(`Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}`); - this.zongji.start({ + await this.zongji.start({ // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive // tablemap events always need to be included for the other row events to work includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 33c54c7d..129b7ac3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20250522110942 - version: 0.0.0-dev-20250522110942 + specifier: 0.0.0-dev-20250526121208 + version: 0.0.0-dev-20250526121208 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,8 +1281,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20250522110942': - resolution: {integrity: sha512-6Sx6FUQeWBdOxUp8NucRAAyMKc+jkWIKZoPW4V2Y8hiEJZsqEOK7+HEOvkVMTHKF/dK5MOGtNk5tcUqbd23d2g==} + '@powersync/mysql-zongji@0.0.0-dev-20250526121208': + resolution: {integrity: sha512-YOMYUU7oTHDlMrgboy3Zj0StThlBc26yley4UCvtykwxcc75+OkZZ5f3flCkKuhUk8aghG5aNNpMtdAaR9uOWQ==} engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -4748,7 +4748,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20250522110942': + '@powersync/mysql-zongji@0.0.0-dev-20250526121208': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52 From e147318136303eb5774abeaad0c446b35bac3f83 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 27 May 2025 09:42:13 +0200 Subject: [PATCH 08/13] Supply port for binlog listener connections. --- modules/module-mysql/src/replication/MySQLConnectionManager.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index d464a975..b648ab26 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -46,6 +46,7 @@ export class MySQLConnectionManager { createBinlogListener(): ZongJi { const listener = new ZongJi({ host: this.options.hostname, + port: this.options.port, user: this.options.username, password: this.options.password }); From 999a8dce6f2ba3efe38fcc6150e90f94e0665ac8 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 27 May 2025 11:57:33 +0200 Subject: [PATCH 09/13] Only set up binlog heartbeat once the listener is fully started up. Added a few more defensive stopped checks to the binlog listener --- modules/module-mysql/package.json | 2 +- .../src/replication/BinLogStream.ts | 5 +- .../src/replication/zongji/BinLogListener.ts | 103 +++++++++++------- pnpm-lock.yaml | 10 +- 4 files changed, 72 insertions(+), 48 deletions(-) diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 2db0f5e9..eb0de824 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20250526121208", + "@powersync/mysql-zongji": "0.0.0-dev-20250527085137", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 5829fdaf..fe137ae0 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -439,9 +439,8 @@ AND table_type = 'BASE TABLE';`, { once: true } ); - if (!this.stopped) { - await binlogListener.start(); - } + // Only returns when the replication is stopped or interrupted by an error + await binlogListener.start(); } ); } diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index 7e2e08cf..21163f84 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -46,34 +46,12 @@ export class BinLogListener { } public async start(): Promise { + if (this.isStopped) { + return; + } logger.info(`Starting replication. Created replica client with serverId:${this.options.serverId}`); - // Set a heartbeat interval for the Zongji replication connection - // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown - // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. - await new Promise((resolve, reject) => { - this.zongji.connection.query( - // In nanoseconds, 10^9 = 1s - 'set @master_heartbeat_period=28*1000000000', - function (error: any, results: any, fields: any) { - if (error) { - reject(error); - } else { - resolve(results); - } - } - ); - }); - logger.info('Successfully set up replication connection heartbeat...'); - - // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. - // The timeout here must be greater than the master_heartbeat_period. - const socket = this.zongji.connection._socket!; - socket.setTimeout(60_000, () => { - socket.destroy(new Error('Replication connection timeout.')); - }); - logger.info(`Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}`); - await this.zongji.start({ + this.zongji.start({ // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive // tablemap events always need to be included for the other row events to work includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], @@ -83,32 +61,49 @@ export class BinLogListener { serverId: this.options.serverId } satisfies StartOptions); - await new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.zongji.on('error', (error) => { - logger.error('Binlog listener error:', error); - this.zongji.stop(); - this.processingQueue.kill(); - reject(error); + if (!this.isStopped) { + logger.error('Binlog listener error:', error); + this.stop(); + reject(error); + } else { + logger.warn('Binlog listener error during shutdown:', error); + } }); this.processingQueue.error((error) => { - logger.error('BinlogEvent processing error:', error); - this.zongji.stop(); - this.processingQueue.kill(); - reject(error); + if (!this.isStopped) { + logger.error('BinlogEvent processing error:', error); + this.stop(); + reject(error); + } else { + logger.warn('BinlogEvent processing error during shutdown:', error); + } }); this.zongji.on('stopped', () => { - this.processingQueue.kill(); resolve(); + logger.info('BinLog listener stopped. Replication ended.'); }); - }); - logger.info('BinLog listener stopped. Replication ended.'); + // Handle the edge case where the listener has already been stopped before completing startup + if (this.isStopped) { + logger.info('BinLog listener was stopped before startup completed.'); + resolve(); + } + }); } public stop(): void { - this.zongji.stop(); + if (!this.isStopped) { + this.zongji.stop(); + this.processingQueue.kill(); + } + } + + private get isStopped(): boolean { + return this.zongji.stopped; } private createZongjiListener(): ZongJi { @@ -130,6 +125,36 @@ export class BinLogListener { } }); + zongji.on('ready', async () => { + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + await new Promise((resolve, reject) => { + this.zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + function (error: any, results: any, fields: any) { + if (error) { + reject(error); + } else { + logger.info('Successfully set up replication connection heartbeat...'); + resolve(results); + } + } + ); + }); + + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = this.zongji.connection._socket!; + socket.setTimeout(60_000, () => { + socket.destroy(new Error('Replication connection timeout.')); + }); + logger.info( + `BinLog listener setup complete. Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` + ); + }); + return zongji; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 129b7ac3..94b215cf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20250526121208 - version: 0.0.0-dev-20250526121208 + specifier: 0.0.0-dev-20250527085137 + version: 0.0.0-dev-20250527085137 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,8 +1281,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20250526121208': - resolution: {integrity: sha512-YOMYUU7oTHDlMrgboy3Zj0StThlBc26yley4UCvtykwxcc75+OkZZ5f3flCkKuhUk8aghG5aNNpMtdAaR9uOWQ==} + '@powersync/mysql-zongji@0.0.0-dev-20250527085137': + resolution: {integrity: sha512-3NPUfq1rcLpTCMkOwCGsJeS/zKoidDYsPqrJ/sy4Vcsgp1vXMkdnLlQyiAq7vYu5a15zUTK+mQoVruwUi82ANg==} engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -4748,7 +4748,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20250526121208': + '@powersync/mysql-zongji@0.0.0-dev-20250527085137': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52 From 07201e8e321a9aa5a63a530ae5829aeddc70c057 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 27 May 2025 13:29:25 +0200 Subject: [PATCH 10/13] Updated changeset --- .changeset/honest-ties-crash.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.changeset/honest-ties-crash.md b/.changeset/honest-ties-crash.md index 1644a321..71533ce1 100644 --- a/.changeset/honest-ties-crash.md +++ b/.changeset/honest-ties-crash.md @@ -3,3 +3,5 @@ --- Added a configurable limit for the MySQL binlog processing queue to limit memory usage. +Removed MySQL Zongji type definitions, they are now instead imported from the `@powersync/mysql-zongji` package. +Now passing in port for the Zongji connection, so that it can be used with MySQL servers that are not running on the default port 3306. From 079a2f5f8d693735fc92871e31328324fb57746e Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Thu, 29 May 2025 13:02:44 +0200 Subject: [PATCH 11/13] Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events --- modules/module-mysql/package.json | 2 +- .../src/replication/zongji/BinLogListener.ts | 36 +++++++++++---- modules/module-mysql/src/types/types.ts | 10 ++--- .../test/src/BinLogListener.test.ts | 45 ++++++++++++++----- pnpm-lock.yaml | 10 ++--- 5 files changed, 72 insertions(+), 31 deletions(-) diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index eb0de824..d8a4ec0d 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20250527085137", + "@powersync/mysql-zongji": "0.0.0-dev-20250528105319", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index 21163f84..0a1adfce 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -34,6 +34,10 @@ export class BinLogListener { zongji: ZongJi; processingQueue: async.QueueObject; + /** + * The combined size in bytes of all the binlog events currently in the processing queue. + */ + queueMemoryUsage: number; constructor(public options: BinLogListenerOptions) { this.connectionManager = options.connectionManager; @@ -42,9 +46,18 @@ export class BinLogListener { this.currentGTID = null; this.processingQueue = async.queue(this.createQueueWorker(), 1); + this.queueMemoryUsage = 0; this.zongji = this.createZongjiListener(); } + /** + * The queue memory limit in bytes as defined in the connection options. + * @private + */ + private get queueMemoryLimit(): number { + return this.connectionManager.options.binlog_queue_memory_limit * 1024 * 1024; + } + public async start(): Promise { if (this.isStopped) { return; @@ -62,6 +75,12 @@ export class BinLogListener { } satisfies StartOptions); return new Promise((resolve, reject) => { + // Handle an edge case where the listener has already been stopped before completing startup + if (this.isStopped) { + logger.info('BinLog listener was stopped before startup completed.'); + resolve(); + } + this.zongji.on('error', (error) => { if (!this.isStopped) { logger.error('Binlog listener error:', error); @@ -86,12 +105,6 @@ export class BinLogListener { resolve(); logger.info('BinLog listener stopped. Replication ended.'); }); - - // Handle the edge case where the listener has already been stopped before completing startup - if (this.isStopped) { - logger.info('BinLog listener was stopped before startup completed.'); - resolve(); - } }); } @@ -112,11 +125,12 @@ export class BinLogListener { zongji.on('binlog', async (evt) => { logger.info(`Received Binlog event:${evt.getEventName()}`); this.processingQueue.push(evt); + this.queueMemoryUsage += evt.size; // When the processing queue grows past the threshold, we pause the binlog listener - if (this.processingQueue.length() > this.connectionManager.options.max_binlog_queue_size) { + if (this.isQueueOverCapacity()) { logger.info( - `Max Binlog processing queue length [${this.connectionManager.options.max_binlog_queue_size}] reached. Pausing Binlog listener.` + `Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.` ); zongji.pause(); await this.processingQueue.empty(); @@ -201,6 +215,12 @@ export class BinLogListener { await this.eventHandler.onCommit(LSN); break; } + + this.queueMemoryUsage -= evt.size; }; } + + isQueueOverCapacity(): boolean { + return this.queueMemoryUsage >= this.queueMemoryLimit; + } } diff --git a/modules/module-mysql/src/types/types.ts b/modules/module-mysql/src/types/types.ts index d8079d1f..aae72f5b 100644 --- a/modules/module-mysql/src/types/types.ts +++ b/modules/module-mysql/src/types/types.ts @@ -24,7 +24,7 @@ export interface NormalizedMySQLConnectionConfig { lookup?: LookupFunction; - max_binlog_queue_size: number; + binlog_queue_memory_limit: number; } export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -43,8 +43,8 @@ export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.a client_private_key: t.string.optional(), reject_ip_ranges: t.array(t.string).optional(), - // The maximum number of binlog events that can be queued in memory before throttling is applied. - max_binlog_queue_size: t.number.optional() + // The combined size of binlog events that can be queued in memory before throttling is applied. + binlog_queue_memory_limit: t.number.optional() }) ); @@ -118,8 +118,8 @@ export function normalizeConnectionConfig(options: MySQLConnectionConfig): Norma server_id: options.server_id ?? 1, - // Based on profiling, a queue size of 1000 uses about 50MB of memory. - max_binlog_queue_size: options.max_binlog_queue_size ?? 1000, + // Binlog processing queue memory limit before throttling is applied. + binlog_queue_memory_limit: options.binlog_queue_memory_limit ?? 50, lookup }; diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts index cf385f52..263eff26 100644 --- a/modules/module-mysql/test/src/BinLogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -6,12 +6,13 @@ import { v4 as uuid } from 'uuid'; import * as common from '@module/common/common-index.js'; import { createRandomServerId } from '@module/utils/mysql-utils.js'; import { TableMapEntry } from '@powersync/mysql-zongji'; +import crypto from 'crypto'; describe('BinlogListener tests', () => { - const MAX_QUEUE_SIZE = 10; + const MAX_QUEUE_CAPACITY_MB = 1; const BINLOG_LISTENER_CONNECTION_OPTIONS = { ...TEST_CONNECTION_OPTIONS, - max_binlog_queue_size: MAX_QUEUE_SIZE + binlog_queue_memory_limit: MAX_QUEUE_CAPACITY_MB }; let connectionManager: MySQLConnectionManager; @@ -22,7 +23,7 @@ describe('BinlogListener tests', () => { connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {}); const connection = await connectionManager.getConnection(); await clearTestDb(connection); - await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); + await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); connection.release(); const fromGTID = await getFromGTID(connectionManager); @@ -52,24 +53,30 @@ describe('BinlogListener tests', () => { expect(queueStopSpy).toHaveBeenCalled(); }); - test('Pause Zongji binlog listener when processing queue reaches max size', async () => { + test('Pause Zongji binlog listener when processing queue reaches maximum memory size', async () => { const pauseSpy = vi.spyOn(binLogListener.zongji, 'pause'); const resumeSpy = vi.spyOn(binLogListener.zongji, 'resume'); - const queueSpy = vi.spyOn(binLogListener.processingQueue, 'length'); - const ROW_COUNT = 100; + // Pause the event handler to force a backlog on the processing queue + eventHandler.pause(); + + const ROW_COUNT = 10; await insertRows(connectionManager, ROW_COUNT); const startPromise = binLogListener.start(); + // Wait for listener to pause due to queue reaching capacity + await vi.waitFor(() => expect(pauseSpy).toHaveBeenCalled(), { timeout: 5000 }); + + expect(binLogListener.isQueueOverCapacity()).toBeTruthy(); + // Resume event processing + eventHandler.unpause!(); + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); binLogListener.stop(); await expect(startPromise).resolves.toBeUndefined(); - - // Count how many times the queue reached the max size. Consequently, we expect the listener to have paused and resumed that many times. - const overThresholdCount = queueSpy.mock.results.map((r) => r.value).filter((v) => v === MAX_QUEUE_SIZE).length; - expect(pauseSpy).toHaveBeenCalledTimes(overThresholdCount); - expect(resumeSpy).toHaveBeenCalledTimes(overThresholdCount); + // Confirm resume was called after unpausing + expect(resumeSpy).toHaveBeenCalled(); }); test('Binlog events are correctly forwarded to provided binlog events handler', async () => { @@ -101,7 +108,9 @@ async function getFromGTID(connectionManager: MySQLConnectionManager) { async function insertRows(connectionManager: MySQLConnectionManager, count: number) { for (let i = 0; i < count; i++) { - await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i}')`); + await connectionManager.query( + `INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i} ${crypto.randomBytes(100_000).toString('hex')}')` + ); } } @@ -119,7 +128,19 @@ class TestBinLogEventHandler implements BinLogEventHandler { rowsDeleted = 0; commitCount = 0; + unpause: ((value: void | PromiseLike) => void) | undefined; + private pausedPromise: Promise | undefined; + + pause() { + this.pausedPromise = new Promise((resolve) => { + this.unpause = resolve; + }); + } + async onWrite(rows: Row[], tableMap: TableMapEntry) { + if (this.pausedPromise) { + await this.pausedPromise; + } this.rowsWritten = this.rowsWritten + rows.length; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 94b215cf..bba138f3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20250527085137 - version: 0.0.0-dev-20250527085137 + specifier: 0.0.0-dev-20250528105319 + version: 0.0.0-dev-20250528105319 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,8 +1281,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20250527085137': - resolution: {integrity: sha512-3NPUfq1rcLpTCMkOwCGsJeS/zKoidDYsPqrJ/sy4Vcsgp1vXMkdnLlQyiAq7vYu5a15zUTK+mQoVruwUi82ANg==} + '@powersync/mysql-zongji@0.0.0-dev-20250528105319': + resolution: {integrity: sha512-67MRLJi7hHb0371/6gffkZlAaDoAFy1pVBGKP17i3MltumAF+ZlukC8Q0nKsqOcEwVMvbhmaalMeVgtJeZ/VfA==} engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -4748,7 +4748,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20250527085137': + '@powersync/mysql-zongji@0.0.0-dev-20250528105319': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52 From 286ba164ec23a7afb55f9bdc08a2a0b1bdfc70a0 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Thu, 29 May 2025 16:20:23 +0200 Subject: [PATCH 12/13] Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events. Introduced a maximum timeout that the binlog processing queue can be paused before auto-resuming. This is to prevent the replication connection timing out. --- modules/module-mysql/package.json | 2 +- .../src/replication/zongji/BinLogListener.ts | 12 +++++++++++- pnpm-lock.yaml | 10 +++++----- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 384326eb..09016a42 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20250528105319", + "@powersync/mysql-zongji": "0.2.0", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index 0a1adfce..e22ff8b2 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -5,6 +5,10 @@ import * as zongji_utils from './zongji-utils.js'; import { logger } from '@powersync/lib-services-framework'; import { MySQLConnectionManager } from '../MySQLConnectionManager.js'; +// Maximum time the processing queue can be paused before resuming automatically +// MySQL server will automatically terminate replication connections after 60 seconds of inactivity, so this guards against that. +const MAX_QUEUE_PAUSE_TIME_MS = 45_000; + export type Row = Record; export interface BinLogEventHandler { @@ -133,7 +137,12 @@ export class BinLogListener { `Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.` ); zongji.pause(); - await this.processingQueue.empty(); + const resumeTimeoutPromise = new Promise((resolve) => { + setTimeout(() => resolve('timeout'), MAX_QUEUE_PAUSE_TIME_MS); + }); + + await Promise.race([this.processingQueue.empty(), resumeTimeoutPromise]); + logger.info(`Binlog processing queue backlog cleared. Resuming Binlog listener.`); zongji.resume(); } @@ -162,6 +171,7 @@ export class BinLogListener { // The timeout here must be greater than the master_heartbeat_period. const socket = this.zongji.connection._socket!; socket.setTimeout(60_000, () => { + logger.info('Destroying socket due to replication connection timeout.'); socket.destroy(new Error('Replication connection timeout.')); }); logger.info( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bba138f3..c7367649 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20250528105319 - version: 0.0.0-dev-20250528105319 + specifier: 0.2.0 + version: 0.2.0 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,8 +1281,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20250528105319': - resolution: {integrity: sha512-67MRLJi7hHb0371/6gffkZlAaDoAFy1pVBGKP17i3MltumAF+ZlukC8Q0nKsqOcEwVMvbhmaalMeVgtJeZ/VfA==} + '@powersync/mysql-zongji@0.2.0': + resolution: {integrity: sha512-ua/n7WFfoiXmqfgwLikcm/AaDE6+t5gFVTWHWsbiuRQMNtXE1F2gXpZJdwKhr8WsOCYkB/A1ZOgbJKi4tK342g==} engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -4748,7 +4748,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20250528105319': + '@powersync/mysql-zongji@0.2.0': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52 From cce00b9b24ee0558abe6a16b71a6f8a77033305f Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Mon, 2 Jun 2025 14:40:50 +0200 Subject: [PATCH 13/13] Fix: Use port for mysql pool creation --- modules/module-mysql/src/utils/mysql-utils.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/module-mysql/src/utils/mysql-utils.ts b/modules/module-mysql/src/utils/mysql-utils.ts index 61a2e5d3..f1733f3e 100644 --- a/modules/module-mysql/src/utils/mysql-utils.ts +++ b/modules/module-mysql/src/utils/mysql-utils.ts @@ -41,6 +41,7 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option return mysql.createPool({ host: config.hostname, user: config.username, + port: config.port, password: config.password, database: config.database, ssl: hasSSLOptions ? sslOptions : undefined,