diff --git a/.changeset/honest-ties-crash.md b/.changeset/honest-ties-crash.md new file mode 100644 index 00000000..71533ce1 --- /dev/null +++ b/.changeset/honest-ties-crash.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mysql': minor +--- + +Added a configurable limit for the MySQL binlog processing queue to limit memory usage. +Removed MySQL Zongji type definitions, they are now instead imported from the `@powersync/mysql-zongji` package. +Now passing in port for the Zongji connection, so that it can be used with MySQL servers that are not running on the default port 3306. diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 77783da8..09016a42 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -29,11 +29,11 @@ }, "dependencies": { "@powersync/lib-services-framework": "workspace:*", - "@powersync/mysql-zongji": "^0.1.0", "@powersync/service-core": "workspace:*", - "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", + "@powersync/service-jsonbig": "workspace:*", + "@powersync/mysql-zongji": "0.2.0", "async": "^3.2.4", "mysql2": "^3.11.0", "semver": "^7.5.4", diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index a1af98ac..fe137ae0 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -1,6 +1,5 @@ import { logger, ReplicationAbortedError, ReplicationAssertionError } from '@powersync/lib-services-framework'; import * as sync_rules from '@powersync/service-sync-rules'; -import async from 'async'; import { ColumnDescriptor, @@ -9,16 +8,15 @@ import { MetricsEngine, storage } from '@powersync/service-core'; -import mysql, { FieldPacket } from 'mysql2'; - -import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongji'; +import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; + +import { TableMapEntry } from '@powersync/mysql-zongji'; import * as common from '../common/common-index.js'; -import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js'; import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; -import * as zongji_utils from './zongji/zongji-utils.js'; import { ReplicationMetric } from '@powersync/service-types'; +import { BinLogEventHandler, BinLogListener, Row } from './zongji/BinLogListener.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -34,16 +32,14 @@ interface MysqlRelId { interface WriteChangePayload { type: storage.SaveOperationTag; - data: Data; - previous_data?: Data; + row: Row; + previous_row?: Row; database: string; table: string; sourceTable: storage.SourceTable; columns: Map; } -export type Data = Record; - export class BinlogConfigurationError extends Error { constructor(message: string) { super(message); @@ -247,10 +243,10 @@ AND table_type = 'BASE TABLE';`, // Check if the binlog is still available. If it isn't we need to snapshot again. const connection = await this.connections.getConnection(); try { - const isAvailable = await isBinlogStillAvailable(connection, lastKnowGTID.position.filename); + const isAvailable = await common.isBinlogStillAvailable(connection, lastKnowGTID.position.filename); if (!isAvailable) { logger.info( - `Binlog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` + `BinLog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` ); } return isAvailable; @@ -288,7 +284,7 @@ AND table_type = 'BASE TABLE';`, const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, tablePattern); @@ -324,9 +320,9 @@ AND table_type = 'BASE TABLE';`, const stream = query.stream(); let columns: Map | undefined = undefined; - stream.on('fields', (fields: FieldPacket[]) => { + stream.on('fields', (fields: mysql.FieldPacket[]) => { // Map the columns and their types - columns = toColumnDescriptors(fields); + columns = common.toColumnDescriptors(fields); }); for await (let row of stream) { @@ -359,7 +355,7 @@ AND table_type = 'BASE TABLE';`, // all connections automatically closed, including this one. await this.initReplication(); await this.streamChanges(); - logger.info('BinlogStream has been shut down'); + logger.info('BinLogStream has been shut down.'); } catch (e) { await this.storage.reportError(e); throw e; @@ -372,7 +368,7 @@ AND table_type = 'BASE TABLE';`, connection.release(); if (errors.length > 0) { - throw new BinlogConfigurationError(`Binlog Configuration Errors: ${errors.join(', ')}`); + throw new BinlogConfigurationError(`BinLog Configuration Errors: ${errors.join(', ')}`); } const initialReplicationCompleted = await this.checkInitialReplicated(); @@ -383,7 +379,7 @@ AND table_type = 'BASE TABLE';`, // This is needed for includeSchema to work correctly. const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { for (let tablePattern of sourceTables) { await this.getQualifiedTableNames(batch, tablePattern); @@ -407,14 +403,12 @@ AND table_type = 'BASE TABLE';`, // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); const serverId = createRandomServerId(this.storage.group_id); - logger.info(`Starting replication. Created replica client with serverId:${serverId}`); const connection = await this.connections.getConnection(); const { checkpoint_lsn } = await this.storage.getStatus(); if (checkpoint_lsn) { logger.info(`Existing checkpoint found: ${checkpoint_lsn}`); } - const fromGTID = checkpoint_lsn ? common.ReplicatedGTID.fromSerialized(checkpoint_lsn) : await common.readExecutedGtid(connection); @@ -423,179 +417,79 @@ AND table_type = 'BASE TABLE';`, if (!this.stopped) { await this.storage.startBatch( - { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { - const zongji = this.connections.createBinlogListener(); - - let currentGTID: common.ReplicatedGTID | null = null; - - const queue = async.queue(async (evt: BinLogEvent) => { - // State machine - switch (true) { - case zongji_utils.eventIsGTIDLog(evt): - currentGTID = common.ReplicatedGTID.fromBinLogEvent({ - raw_gtid: { - server_id: evt.serverId, - transaction_range: evt.transactionRange - }, - position: { - filename: binLogPositionState.filename, - offset: evt.nextPosition - } - }); - break; - case zongji_utils.eventIsRotation(evt): - // Update the position - binLogPositionState.filename = evt.binlogName; - binLogPositionState.offset = evt.position; - break; - case zongji_utils.eventIsWriteMutation(evt): - const writeTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.INSERT, - data: evt.rows, - tableEntry: writeTableInfo - }); - break; - case zongji_utils.eventIsUpdateMutation(evt): - const updateTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.UPDATE, - data: evt.rows.map((row) => row.after), - previous_data: evt.rows.map((row) => row.before), - tableEntry: updateTableInfo - }); - break; - case zongji_utils.eventIsDeleteMutation(evt): - const deleteTableInfo = evt.tableMap[evt.tableId]; - await this.writeChanges(batch, { - type: storage.SaveOperationTag.DELETE, - data: evt.rows, - tableEntry: deleteTableInfo - }); - break; - case zongji_utils.eventIsXid(evt): - this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); - // Need to commit with a replicated GTID with updated next position - await batch.commit( - new common.ReplicatedGTID({ - raw_gtid: currentGTID!.raw, - position: { - filename: binLogPositionState.filename, - offset: evt.nextPosition - } - }).comparable - ); - currentGTID = null; - // chunks_replicated_total.add(1); - break; - } - }, 1); - - zongji.on('binlog', (evt: BinLogEvent) => { - if (!this.stopped) { - logger.info(`Received Binlog event:${evt.getEventName()}`); - queue.push(evt); - } else { - logger.info(`Replication is busy stopping, ignoring event ${evt.getEventName()}`); - } - }); - - // Set a heartbeat interval for the Zongji replication connection - // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown - // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. - await new Promise((resolve, reject) => { - zongji.connection.query( - // In nanoseconds, 10^9 = 1s - 'set @master_heartbeat_period=28*1000000000', - function (error: any, results: any, fields: any) { - if (error) { - reject(error); - } else { - resolve(results); - } - } - ); - }); - logger.info('Successfully set up replication connection heartbeat...'); - - // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. - // The timeout here must be greater than the master_heartbeat_period. - const socket = zongji.connection._socket!; - socket.setTimeout(60_000, () => { - socket.destroy(new Error('Replication connection timeout.')); - }); - - if (this.stopped) { - // Powersync is shutting down, don't start replicating - return; - } - - logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); + const binlogEventHandler = this.createBinlogEventHandler(batch); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); - zongji.start({ - // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive - includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], - excludeEvents: [], - includeSchema: { [this.defaultSchema]: includedTables }, - filename: binLogPositionState.filename, - position: binLogPositionState.offset, - serverId: serverId - } satisfies StartOptions); - - // Forever young - await new Promise((resolve, reject) => { - zongji.on('error', (error) => { - logger.error('Binlog listener error:', error); - zongji.stop(); - queue.kill(); - reject(error); - }); - - zongji.on('stopped', () => { - logger.info('Binlog listener stopped. Replication ended.'); - resolve(); - }); - - queue.error((error) => { - logger.error('Binlog listener queue error:', error); - zongji.stop(); - queue.kill(); - reject(error); - }); - - const stop = () => { - logger.info('Abort signal received, stopping replication...'); - zongji.stop(); - queue.kill(); - resolve(); - }; + const binlogListener = new BinLogListener({ + includedTables: includedTables, + startPosition: binLogPositionState, + connectionManager: this.connections, + serverId: serverId, + eventHandler: binlogEventHandler + }); - this.abortSignal.addEventListener('abort', stop, { once: true }); + this.abortSignal.addEventListener( + 'abort', + () => { + logger.info('Abort signal received, stopping replication...'); + binlogListener.stop(); + }, + { once: true } + ); - if (this.stopped) { - // Generally this should have been picked up early, but we add this here as a failsafe. - stop(); - } - }); + // Only returns when the replication is stopped or interrupted by an error + await binlogListener.start(); } ); } } + private createBinlogEventHandler(batch: storage.BucketStorageBatch): BinLogEventHandler { + return { + onWrite: async (rows: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.INSERT, + rows: rows, + tableEntry: tableMap + }); + }, + + onUpdate: async (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.UPDATE, + rows: rowsAfter, + rows_before: rowsBefore, + tableEntry: tableMap + }); + }, + onDelete: async (rows: Row[], tableMap: TableMapEntry) => { + await this.writeChanges(batch, { + type: storage.SaveOperationTag.DELETE, + rows: rows, + tableEntry: tableMap + }); + }, + onCommit: async (lsn: string) => { + this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); + await batch.commit(lsn); + } + }; + } + private async writeChanges( batch: storage.BucketStorageBatch, msg: { type: storage.SaveOperationTag; - data: Data[]; - previous_data?: Data[]; + rows: Row[]; + rows_before?: Row[]; tableEntry: TableMapEntry; } ): Promise { - const columns = toColumnDescriptors(msg.tableEntry); + const columns = common.toColumnDescriptors(msg.tableEntry); - for (const [index, row] of msg.data.entries()) { + for (const [index, row] of msg.rows.entries()) { await this.writeChange(batch, { type: msg.type, database: msg.tableEntry.parentSchema, @@ -607,8 +501,8 @@ AND table_type = 'BASE TABLE';`, ), table: msg.tableEntry.tableName, columns: columns, - data: row, - previous_data: msg.previous_data?.[index] + row: row, + previous_row: msg.rows_before?.[index] }); } return null; @@ -621,7 +515,7 @@ AND table_type = 'BASE TABLE';`, switch (payload.type) { case storage.SaveOperationTag.INSERT: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const record = common.toSQLiteRow(payload.data, payload.columns); + const record = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.INSERT, sourceTable: payload.sourceTable, @@ -634,10 +528,10 @@ AND table_type = 'BASE TABLE';`, this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); // "before" may be null if the replica id columns are unchanged // It's fine to treat that the same as an insert. - const beforeUpdated = payload.previous_data - ? common.toSQLiteRow(payload.previous_data, payload.columns) + const beforeUpdated = payload.previous_row + ? common.toSQLiteRow(payload.previous_row, payload.columns) : undefined; - const after = common.toSQLiteRow(payload.data, payload.columns); + const after = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.UPDATE, @@ -652,7 +546,7 @@ AND table_type = 'BASE TABLE';`, case storage.SaveOperationTag.DELETE: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns); + const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.DELETE, diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index 4548d838..b648ab26 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -2,8 +2,8 @@ import { NormalizedMySQLConnectionConfig } from '../types/types.js'; import mysqlPromise from 'mysql2/promise'; import mysql, { FieldPacket, RowDataPacket } from 'mysql2'; import * as mysql_utils from '../utils/mysql-utils.js'; -import ZongJi from '@powersync/mysql-zongji'; import { logger } from '@powersync/lib-services-framework'; +import { ZongJi } from '@powersync/mysql-zongji'; export class MySQLConnectionManager { /** @@ -46,6 +46,7 @@ export class MySQLConnectionManager { createBinlogListener(): ZongJi { const listener = new ZongJi({ host: this.options.hostname, + port: this.options.port, user: this.options.username, password: this.options.password }); diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts new file mode 100644 index 00000000..e22ff8b2 --- /dev/null +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -0,0 +1,236 @@ +import * as common from '../../common/common-index.js'; +import async from 'async'; +import { BinLogEvent, StartOptions, TableMapEntry, ZongJi } from '@powersync/mysql-zongji'; +import * as zongji_utils from './zongji-utils.js'; +import { logger } from '@powersync/lib-services-framework'; +import { MySQLConnectionManager } from '../MySQLConnectionManager.js'; + +// Maximum time the processing queue can be paused before resuming automatically +// MySQL server will automatically terminate replication connections after 60 seconds of inactivity, so this guards against that. +const MAX_QUEUE_PAUSE_TIME_MS = 45_000; + +export type Row = Record; + +export interface BinLogEventHandler { + onWrite: (rows: Row[], tableMap: TableMapEntry) => Promise; + onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise; + onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; + onCommit: (lsn: string) => Promise; +} + +export interface BinLogListenerOptions { + connectionManager: MySQLConnectionManager; + eventHandler: BinLogEventHandler; + includedTables: string[]; + serverId: number; + startPosition: common.BinLogPosition; +} + +/** + * Wrapper class for the Zongji BinLog listener. Internally handles the creation and management of the listener and posts + * events on the provided BinLogEventHandler. + */ +export class BinLogListener { + private connectionManager: MySQLConnectionManager; + private eventHandler: BinLogEventHandler; + private binLogPosition: common.BinLogPosition; + private currentGTID: common.ReplicatedGTID | null; + + zongji: ZongJi; + processingQueue: async.QueueObject; + /** + * The combined size in bytes of all the binlog events currently in the processing queue. + */ + queueMemoryUsage: number; + + constructor(public options: BinLogListenerOptions) { + this.connectionManager = options.connectionManager; + this.eventHandler = options.eventHandler; + this.binLogPosition = options.startPosition; + this.currentGTID = null; + + this.processingQueue = async.queue(this.createQueueWorker(), 1); + this.queueMemoryUsage = 0; + this.zongji = this.createZongjiListener(); + } + + /** + * The queue memory limit in bytes as defined in the connection options. + * @private + */ + private get queueMemoryLimit(): number { + return this.connectionManager.options.binlog_queue_memory_limit * 1024 * 1024; + } + + public async start(): Promise { + if (this.isStopped) { + return; + } + logger.info(`Starting replication. Created replica client with serverId:${this.options.serverId}`); + + this.zongji.start({ + // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive + // tablemap events always need to be included for the other row events to work + includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], + includeSchema: { [this.connectionManager.databaseName]: this.options.includedTables }, + filename: this.binLogPosition.filename, + position: this.binLogPosition.offset, + serverId: this.options.serverId + } satisfies StartOptions); + + return new Promise((resolve, reject) => { + // Handle an edge case where the listener has already been stopped before completing startup + if (this.isStopped) { + logger.info('BinLog listener was stopped before startup completed.'); + resolve(); + } + + this.zongji.on('error', (error) => { + if (!this.isStopped) { + logger.error('Binlog listener error:', error); + this.stop(); + reject(error); + } else { + logger.warn('Binlog listener error during shutdown:', error); + } + }); + + this.processingQueue.error((error) => { + if (!this.isStopped) { + logger.error('BinlogEvent processing error:', error); + this.stop(); + reject(error); + } else { + logger.warn('BinlogEvent processing error during shutdown:', error); + } + }); + + this.zongji.on('stopped', () => { + resolve(); + logger.info('BinLog listener stopped. Replication ended.'); + }); + }); + } + + public stop(): void { + if (!this.isStopped) { + this.zongji.stop(); + this.processingQueue.kill(); + } + } + + private get isStopped(): boolean { + return this.zongji.stopped; + } + + private createZongjiListener(): ZongJi { + const zongji = this.connectionManager.createBinlogListener(); + + zongji.on('binlog', async (evt) => { + logger.info(`Received Binlog event:${evt.getEventName()}`); + this.processingQueue.push(evt); + this.queueMemoryUsage += evt.size; + + // When the processing queue grows past the threshold, we pause the binlog listener + if (this.isQueueOverCapacity()) { + logger.info( + `Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.` + ); + zongji.pause(); + const resumeTimeoutPromise = new Promise((resolve) => { + setTimeout(() => resolve('timeout'), MAX_QUEUE_PAUSE_TIME_MS); + }); + + await Promise.race([this.processingQueue.empty(), resumeTimeoutPromise]); + + logger.info(`Binlog processing queue backlog cleared. Resuming Binlog listener.`); + zongji.resume(); + } + }); + + zongji.on('ready', async () => { + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + await new Promise((resolve, reject) => { + this.zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + function (error: any, results: any, fields: any) { + if (error) { + reject(error); + } else { + logger.info('Successfully set up replication connection heartbeat...'); + resolve(results); + } + } + ); + }); + + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = this.zongji.connection._socket!; + socket.setTimeout(60_000, () => { + logger.info('Destroying socket due to replication connection timeout.'); + socket.destroy(new Error('Replication connection timeout.')); + }); + logger.info( + `BinLog listener setup complete. Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` + ); + }); + + return zongji; + } + + private createQueueWorker() { + return async (evt: BinLogEvent) => { + switch (true) { + case zongji_utils.eventIsGTIDLog(evt): + this.currentGTID = common.ReplicatedGTID.fromBinLogEvent({ + raw_gtid: { + server_id: evt.serverId, + transaction_range: evt.transactionRange + }, + position: { + filename: this.binLogPosition.filename, + offset: evt.nextPosition + } + }); + break; + case zongji_utils.eventIsRotation(evt): + this.binLogPosition.filename = evt.binlogName; + this.binLogPosition.offset = evt.position; + break; + case zongji_utils.eventIsWriteMutation(evt): + await this.eventHandler.onWrite(evt.rows, evt.tableMap[evt.tableId]); + break; + case zongji_utils.eventIsUpdateMutation(evt): + await this.eventHandler.onUpdate( + evt.rows.map((row) => row.after), + evt.rows.map((row) => row.before), + evt.tableMap[evt.tableId] + ); + break; + case zongji_utils.eventIsDeleteMutation(evt): + await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]); + break; + case zongji_utils.eventIsXid(evt): + const LSN = new common.ReplicatedGTID({ + raw_gtid: this.currentGTID!.raw, + position: { + filename: this.binLogPosition.filename, + offset: evt.nextPosition + } + }).comparable; + await this.eventHandler.onCommit(LSN); + break; + } + + this.queueMemoryUsage -= evt.size; + }; + } + + isQueueOverCapacity(): boolean { + return this.queueMemoryUsage >= this.queueMemoryLimit; + } +} diff --git a/modules/module-mysql/src/replication/zongji/zongji-utils.ts b/modules/module-mysql/src/replication/zongji/zongji-utils.ts index 36122b63..ee9e4c53 100644 --- a/modules/module-mysql/src/replication/zongji/zongji-utils.ts +++ b/modules/module-mysql/src/replication/zongji/zongji-utils.ts @@ -1,9 +1,10 @@ import { BinLogEvent, BinLogGTIDLogEvent, - BinLogMutationEvent, + BinLogRowEvent, BinLogRotationEvent, - BinLogUpdateEvent, + BinLogTableMapEvent, + BinLogRowUpdateEvent, BinLogXidEvent } from '@powersync/mysql-zongji'; @@ -11,6 +12,10 @@ export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent return event.getEventName() == 'gtidlog'; } +export function eventIsTableMap(event: BinLogEvent): event is BinLogTableMapEvent { + return event.getEventName() == 'tablemap'; +} + export function eventIsXid(event: BinLogEvent): event is BinLogXidEvent { return event.getEventName() == 'xid'; } @@ -19,14 +24,14 @@ export function eventIsRotation(event: BinLogEvent): event is BinLogRotationEven return event.getEventName() == 'rotate'; } -export function eventIsWriteMutation(event: BinLogEvent): event is BinLogMutationEvent { +export function eventIsWriteMutation(event: BinLogEvent): event is BinLogRowEvent { return event.getEventName() == 'writerows'; } -export function eventIsDeleteMutation(event: BinLogEvent): event is BinLogMutationEvent { +export function eventIsDeleteMutation(event: BinLogEvent): event is BinLogRowEvent { return event.getEventName() == 'deleterows'; } -export function eventIsUpdateMutation(event: BinLogEvent): event is BinLogUpdateEvent { +export function eventIsUpdateMutation(event: BinLogEvent): event is BinLogRowUpdateEvent { return event.getEventName() == 'updaterows'; } diff --git a/modules/module-mysql/src/replication/zongji/zongji.d.ts b/modules/module-mysql/src/replication/zongji/zongji.d.ts deleted file mode 100644 index f5640497..00000000 --- a/modules/module-mysql/src/replication/zongji/zongji.d.ts +++ /dev/null @@ -1,129 +0,0 @@ -declare module '@powersync/mysql-zongji' { - import { Socket } from 'net'; - - export type ZongjiOptions = { - host: string; - user: string; - password: string; - dateStrings?: boolean; - timeZone?: string; - }; - - interface DatabaseFilter { - [databaseName: string]: string[] | true; - } - - export type StartOptions = { - includeEvents?: string[]; - excludeEvents?: string[]; - /** - * Describe which databases and tables to include (Only for row events). Use database names as the key and pass an array of table names or true (for the entire database). - * Example: { 'my_database': ['allow_table', 'another_table'], 'another_db': true } - */ - includeSchema?: DatabaseFilter; - /** - * Object describing which databases and tables to exclude (Same format as includeSchema) - * Example: { 'other_db': ['disallowed_table'], 'ex_db': true } - */ - excludeSchema?: DatabaseFilter; - /** - * BinLog position filename to start reading events from - */ - filename?: string; - /** - * BinLog position offset to start reading events from in file specified - */ - position?: number; - - /** - * Unique server ID for this replication client. - */ - serverId?: number; - }; - - export type ColumnSchema = { - COLUMN_NAME: string; - COLLATION_NAME: string; - CHARACTER_SET_NAME: string; - COLUMN_COMMENT: string; - COLUMN_TYPE: string; - }; - - export type ColumnDefinition = { - name: string; - charset: string; - type: number; - metadata: Record; - }; - - export type TableMapEntry = { - columnSchemas: ColumnSchema[]; - parentSchema: string; - tableName: string; - columns: ColumnDefinition[]; - }; - - export type BaseBinLogEvent = { - timestamp: number; - getEventName(): string; - - /** - * Next position in BinLog file to read from after - * this event. - */ - nextPosition: number; - /** - * Size of this event - */ - size: number; - flags: number; - useChecksum: boolean; - }; - - export type BinLogRotationEvent = BaseBinLogEvent & { - binlogName: string; - position: number; - }; - - export type BinLogGTIDLogEvent = BaseBinLogEvent & { - serverId: Buffer; - transactionRange: number; - }; - - export type BinLogXidEvent = BaseBinLogEvent & { - xid: number; - }; - - export type BinLogMutationEvent = BaseBinLogEvent & { - tableId: number; - numberOfColumns: number; - tableMap: Record; - rows: Record[]; - }; - - export type BinLogUpdateEvent = Omit & { - rows: { - before: Record; - after: Record; - }[]; - }; - - export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent; - - // @vlasky/mysql Connection - export interface MySQLConnection { - _socket?: Socket; - /** There are other forms of this method as well - this is the most basic one. */ - query(sql: string, callback: (error: any, results: any, fields: any) => void): void; - } - - export default class ZongJi { - connection: MySQLConnection; - constructor(options: ZongjiOptions); - - start(options: StartOptions): void; - stop(): void; - - on(type: 'binlog' | string, callback: (event: BinLogEvent) => void); - } -} diff --git a/modules/module-mysql/src/types/types.ts b/modules/module-mysql/src/types/types.ts index c0826187..aae72f5b 100644 --- a/modules/module-mysql/src/types/types.ts +++ b/modules/module-mysql/src/types/types.ts @@ -23,6 +23,8 @@ export interface NormalizedMySQLConnectionConfig { client_private_key?: string; lookup?: LookupFunction; + + binlog_queue_memory_limit: number; } export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.and( @@ -40,7 +42,9 @@ export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.a client_certificate: t.string.optional(), client_private_key: t.string.optional(), - reject_ip_ranges: t.array(t.string).optional() + reject_ip_ranges: t.array(t.string).optional(), + // The combined size of binlog events that can be queued in memory before throttling is applied. + binlog_queue_memory_limit: t.number.optional() }) ); @@ -114,6 +118,9 @@ export function normalizeConnectionConfig(options: MySQLConnectionConfig): Norma server_id: options.server_id ?? 1, + // Binlog processing queue memory limit before throttling is applied. + binlog_queue_memory_limit: options.binlog_queue_memory_limit ?? 50, + lookup }; } diff --git a/modules/module-mysql/src/utils/mysql-utils.ts b/modules/module-mysql/src/utils/mysql-utils.ts index 61a2e5d3..f1733f3e 100644 --- a/modules/module-mysql/src/utils/mysql-utils.ts +++ b/modules/module-mysql/src/utils/mysql-utils.ts @@ -41,6 +41,7 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option return mysql.createPool({ host: config.hostname, user: config.username, + port: config.port, password: config.password, database: config.database, ssl: hasSSLOptions ? sslOptions : undefined, diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts new file mode 100644 index 00000000..263eff26 --- /dev/null +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -0,0 +1,158 @@ +import { describe, test, beforeEach, vi, expect, afterEach } from 'vitest'; +import { BinLogEventHandler, BinLogListener, Row } from '@module/replication/zongji/BinLogListener.js'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import { v4 as uuid } from 'uuid'; +import * as common from '@module/common/common-index.js'; +import { createRandomServerId } from '@module/utils/mysql-utils.js'; +import { TableMapEntry } from '@powersync/mysql-zongji'; +import crypto from 'crypto'; + +describe('BinlogListener tests', () => { + const MAX_QUEUE_CAPACITY_MB = 1; + const BINLOG_LISTENER_CONNECTION_OPTIONS = { + ...TEST_CONNECTION_OPTIONS, + binlog_queue_memory_limit: MAX_QUEUE_CAPACITY_MB + }; + + let connectionManager: MySQLConnectionManager; + let eventHandler: TestBinLogEventHandler; + let binLogListener: BinLogListener; + + beforeEach(async () => { + connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {}); + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); + connection.release(); + const fromGTID = await getFromGTID(connectionManager); + + eventHandler = new TestBinLogEventHandler(); + binLogListener = new BinLogListener({ + connectionManager: connectionManager, + eventHandler: eventHandler, + startPosition: fromGTID.position, + includedTables: ['test_DATA'], + serverId: createRandomServerId(1) + }); + }); + + afterEach(async () => { + await connectionManager.end(); + }); + + test('Stop binlog listener', async () => { + const stopSpy = vi.spyOn(binLogListener.zongji, 'stop'); + const queueStopSpy = vi.spyOn(binLogListener.processingQueue, 'kill'); + + const startPromise = binLogListener.start(); + setTimeout(async () => binLogListener.stop(), 50); + + await expect(startPromise).resolves.toBeUndefined(); + expect(stopSpy).toHaveBeenCalled(); + expect(queueStopSpy).toHaveBeenCalled(); + }); + + test('Pause Zongji binlog listener when processing queue reaches maximum memory size', async () => { + const pauseSpy = vi.spyOn(binLogListener.zongji, 'pause'); + const resumeSpy = vi.spyOn(binLogListener.zongji, 'resume'); + + // Pause the event handler to force a backlog on the processing queue + eventHandler.pause(); + + const ROW_COUNT = 10; + await insertRows(connectionManager, ROW_COUNT); + + const startPromise = binLogListener.start(); + + // Wait for listener to pause due to queue reaching capacity + await vi.waitFor(() => expect(pauseSpy).toHaveBeenCalled(), { timeout: 5000 }); + + expect(binLogListener.isQueueOverCapacity()).toBeTruthy(); + // Resume event processing + eventHandler.unpause!(); + + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); + binLogListener.stop(); + await expect(startPromise).resolves.toBeUndefined(); + // Confirm resume was called after unpausing + expect(resumeSpy).toHaveBeenCalled(); + }); + + test('Binlog events are correctly forwarded to provided binlog events handler', async () => { + const startPromise = binLogListener.start(); + + const ROW_COUNT = 10; + await insertRows(connectionManager, ROW_COUNT); + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); + expect(eventHandler.commitCount).equals(ROW_COUNT); + + await updateRows(connectionManager); + await vi.waitFor(() => expect(eventHandler.rowsUpdated).equals(ROW_COUNT), { timeout: 5000 }); + + await deleteRows(connectionManager); + await vi.waitFor(() => expect(eventHandler.rowsDeleted).equals(ROW_COUNT), { timeout: 5000 }); + + binLogListener.stop(); + await expect(startPromise).resolves.toBeUndefined(); + }); +}); + +async function getFromGTID(connectionManager: MySQLConnectionManager) { + const connection = await connectionManager.getConnection(); + const fromGTID = await common.readExecutedGtid(connection); + connection.release(); + + return fromGTID; +} + +async function insertRows(connectionManager: MySQLConnectionManager, count: number) { + for (let i = 0; i < count; i++) { + await connectionManager.query( + `INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i} ${crypto.randomBytes(100_000).toString('hex')}')` + ); + } +} + +async function updateRows(connectionManager: MySQLConnectionManager) { + await connectionManager.query(`UPDATE test_DATA SET description='updated'`); +} + +async function deleteRows(connectionManager: MySQLConnectionManager) { + await connectionManager.query(`DELETE FROM test_DATA`); +} + +class TestBinLogEventHandler implements BinLogEventHandler { + rowsWritten = 0; + rowsUpdated = 0; + rowsDeleted = 0; + commitCount = 0; + + unpause: ((value: void | PromiseLike) => void) | undefined; + private pausedPromise: Promise | undefined; + + pause() { + this.pausedPromise = new Promise((resolve) => { + this.unpause = resolve; + }); + } + + async onWrite(rows: Row[], tableMap: TableMapEntry) { + if (this.pausedPromise) { + await this.pausedPromise; + } + this.rowsWritten = this.rowsWritten + rows.length; + } + + async onUpdate(afterRows: Row[], beforeRows: Row[], tableMap: TableMapEntry) { + this.rowsUpdated = this.rowsUpdated + afterRows.length; + } + + async onDelete(rows: Row[], tableMap: TableMapEntry) { + this.rowsDeleted = this.rowsDeleted + rows.length; + } + + async onCommit(lsn: string) { + this.commitCount++; + } +} diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 3b172955..97c5db93 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -3,7 +3,7 @@ import { afterAll, describe, expect, test } from 'vitest'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import { eventIsWriteMutation, eventIsXid } from '@module/replication/zongji/zongji-utils.js'; import * as common from '@module/common/common-index.js'; -import ZongJi, { BinLogEvent } from '@powersync/mysql-zongji'; +import { BinLogEvent, ZongJi } from '@powersync/mysql-zongji'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; import { toColumnDescriptors } from '@module/common/common-index.js'; diff --git a/modules/module-mysql/test/tsconfig.json b/modules/module-mysql/test/tsconfig.json index 5257b273..18898c4e 100644 --- a/modules/module-mysql/test/tsconfig.json +++ b/modules/module-mysql/test/tsconfig.json @@ -13,7 +13,7 @@ "@core-tests/*": ["../../../packages/service-core/test/src/*"] } }, - "include": ["src", "../src/replication/zongji/zongji.d.ts"], + "include": ["src"], "references": [ { "path": "../" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9eb06d74..c7367649 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: ^0.1.0 - version: 0.1.0 + specifier: 0.2.0 + version: 0.2.0 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1281,9 +1281,9 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.1.0': - resolution: {integrity: sha512-2GjOxVws+wtbb+xFUJe4Ozzkp/f0Gsna0fje9art76bmz6yfLCW4K3Mf2/M310xMnAIp8eP9hsJ6DYwwZCo1RA==} - engines: {node: '>=20.0.0'} + '@powersync/mysql-zongji@0.2.0': + resolution: {integrity: sha512-ua/n7WFfoiXmqfgwLikcm/AaDE6+t5gFVTWHWsbiuRQMNtXE1F2gXpZJdwKhr8WsOCYkB/A1ZOgbJKi4tK342g==} + engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': resolution: {integrity: sha512-BgxgUewuw4HFCM9MzuzlIuRKHya6rimNPYqUItt7CO3ySUeUnX8Qn9eZpMxu9AT5Y8zqkSyxvduY36zZueNojg==} @@ -1753,6 +1753,10 @@ packages: resolution: {integrity: sha512-GPEid2Y9QU1Exl1rpO9B2IPJGHPSupF5GnVIP0blYvNOMer2bTvSWs1jGOUg04hTmu67nmLsQ9TBo1puaotBHg==} engines: {node: '>=0.6'} + big-integer@1.6.52: + resolution: {integrity: sha512-QxD8cf2eVqJOOz63z6JIN9BzvVs/dlySa5HGSBH5xtR8dPteIRQnBxxKqkNTiT6jbDTF6jAfrd4oMcND9RGbQg==} + engines: {node: '>=0.6'} + bignumber.js@9.1.1: resolution: {integrity: sha512-pHm4LsMJ6lzgNGVfZHjMoO8sdoRhOzOH4MLmY65Jg70bpxCKu5iOHNJyfF6OyvYw7t8Fpf35RuzUyqnQsj8Vig==} @@ -4744,10 +4748,10 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.1.0': + '@powersync/mysql-zongji@0.2.0': dependencies: '@vlasky/mysql': 2.18.6 - big-integer: 1.6.51 + big-integer: 1.6.52 iconv-lite: 0.6.3 '@powersync/service-jsonbig@0.17.10': @@ -5192,6 +5196,8 @@ snapshots: big-integer@1.6.51: {} + big-integer@1.6.52: {} + bignumber.js@9.1.1: {} binary-extensions@2.3.0: {}