diff --git a/docs/connectors/mysql.md b/docs/connectors/mysql.md index e7f9158a..98b3fa3f 100644 --- a/docs/connectors/mysql.md +++ b/docs/connectors/mysql.md @@ -124,7 +124,8 @@ You can use this connector both for **source** and **target** endpoints. "ConsistentSnapshot": true, "SnapshotDegreeOfParallelism": 4, "AllowDecimalAsFloat": false, - "RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"] + "RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"], + "ReplicationFlushInterval": 5000000000 } ``` @@ -169,6 +170,9 @@ You can use this connector both for **source** and **target** endpoints. - **PlzNoHomo** (`bool`): Forces disabling of homogeneous features, primarily used for testing and specific configurations. - **RootCAFiles** (`[]string`): List of paths to root CA files for validating SSL connections to the MySQL server. + + - **ReplicationFlushInterval** (`time.Duration`): Specifies the replication flush interval. Defined in nanoseconds. + - Example: `5000000000` (5 seconds) --- diff --git a/examples/mysql2ch/transfer.yaml b/examples/mysql2ch/transfer.yaml index 8ba6bf27..c8ead4e8 100644 --- a/examples/mysql2ch/transfer.yaml +++ b/examples/mysql2ch/transfer.yaml @@ -8,6 +8,7 @@ src: Password: mypassword Database: mydb Port: 3306 + ReplicationFlushInterval: 5000000000 dst: type: ch params: diff --git a/examples/mysql2kafka/transfer.yaml b/examples/mysql2kafka/transfer.yaml index d4d11658..a6852ea6 100644 --- a/examples/mysql2kafka/transfer.yaml +++ b/examples/mysql2kafka/transfer.yaml @@ -8,7 +8,8 @@ src: "User": "testuser", "Password": "testpassword", "Database": "testdb", - "Port": 3306 + "Port": 3306, + "ReplicationFlushInterval": 5000000000 } dst: type: kafka diff --git a/pkg/providers/mysql/model_source.go b/pkg/providers/mysql/model_source.go index 0adae792..b20b5554 100644 --- a/pkg/providers/mysql/model_source.go +++ b/pkg/providers/mysql/model_source.go @@ -1,19 +1,20 @@ package mysql import ( + "github.com/doublecloud/transfer/pkg/abstract" + "github.com/doublecloud/transfer/pkg/abstract/model" "hash/fnv" "regexp" "strings" - - "github.com/doublecloud/transfer/pkg/abstract" - "github.com/doublecloud/transfer/pkg/abstract/model" + "time" ) type MysqlFlavorType string const ( - MysqlFlavorTypeMysql = "mysql" - MysqlFlavorTypeMariaDB = "mariadb" + MysqlFlavorTypeMysql = "mysql" + MysqlFlavorTypeMariaDB = "mariadb" + DefaultReplicationFlushInterval = time.Second ) type MysqlTrackerStorage string @@ -52,6 +53,8 @@ type MysqlSource struct { PlzNoHomo bool // forcefully disable homo features, mostly for tests RootCAFiles []string ConnectionID string + + ReplicationFlushInterval time.Duration } var _ model.Source = (*MysqlSource)(nil) @@ -163,6 +166,9 @@ func (s *MysqlSource) WithDefaults() { if s.SnapshotDegreeOfParallelism <= 0 { s.SnapshotDegreeOfParallelism = 4 } + if s.ReplicationFlushInterval == 0 { + s.ReplicationFlushInterval = DefaultReplicationFlushInterval + } } func (s *MysqlSource) HasTLS() bool { diff --git a/pkg/providers/mysql/source.go b/pkg/providers/mysql/source.go index ec75f92c..456210ab 100644 --- a/pkg/providers/mysql/source.go +++ b/pkg/providers/mysql/source.go @@ -90,7 +90,7 @@ func (h *binlogHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.Qu if len(h.inflight) == 0 { break } - time.Sleep(time.Second) + time.Sleep(h.config.ReplicationFlushInterval) } h.logger.Warn("DDL Query", log.Any("query", string(queryEvent.Query)), log.Any("position", nextPos)) execTS := time.Now() @@ -550,7 +550,7 @@ func (p *publisher) flusher() { h := p.handler h.metrics.Master.Set(1) if len(h.inflight) == 0 { - time.Sleep(time.Second) + time.Sleep(h.config.ReplicationFlushInterval) continue } start := time.Now()