Skip to content

Commit

Permalink
Make replication flush interval configurable in mysql source
Browse files Browse the repository at this point in the history
Add a replication flush interval in mysql source config.
This pr closes : #59

---

Pull Request resolved: #202

Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
commit_hash:02ea17e2169b6dac62f1ba31bac43828f96be5ba
  • Loading branch information
saurabhojha authored and robot-piglet committed Feb 17, 2025
1 parent 28cea64 commit 6730607
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 6 deletions.
6 changes: 5 additions & 1 deletion docs/connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ You can use this connector both for **source** and **target** endpoints.
"ConsistentSnapshot": true,
"SnapshotDegreeOfParallelism": 4,
"AllowDecimalAsFloat": false,
"RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"]
"RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"],
"ReplicationFlushInterval": 5000000000
}
```

Expand Down Expand Up @@ -169,6 +170,9 @@ You can use this connector both for **source** and **target** endpoints.
- **PlzNoHomo** (`bool`): Forces disabling of homogeneous features, primarily used for testing and specific configurations.

- **RootCAFiles** (`[]string`): List of paths to root CA files for validating SSL connections to the MySQL server.

- **ReplicationFlushInterval** (`time.Duration`): Specifies the replication flush interval. Defined in nanoseconds.
- Example: `5000000000` (5 seconds)

---

Expand Down
1 change: 1 addition & 0 deletions examples/mysql2ch/transfer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ src:
Password: mypassword
Database: mydb
Port: 3306
ReplicationFlushInterval: 5000000000
dst:
type: ch
params:
Expand Down
3 changes: 2 additions & 1 deletion examples/mysql2kafka/transfer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ src:
"User": "testuser",
"Password": "testpassword",
"Database": "testdb",
"Port": 3306
"Port": 3306,
"ReplicationFlushInterval": 5000000000
}
dst:
type: kafka
Expand Down
11 changes: 9 additions & 2 deletions pkg/providers/mysql/model_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"hash/fnv"
"regexp"
"strings"
"time"

"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
Expand All @@ -12,8 +13,9 @@ import (
type MysqlFlavorType string

const (
MysqlFlavorTypeMysql = "mysql"
MysqlFlavorTypeMariaDB = "mariadb"
MysqlFlavorTypeMysql = "mysql"
MysqlFlavorTypeMariaDB = "mariadb"
DefaultReplicationFlushInterval = time.Second
)

type MysqlTrackerStorage string
Expand Down Expand Up @@ -52,6 +54,8 @@ type MysqlSource struct {
PlzNoHomo bool // forcefully disable homo features, mostly for tests
RootCAFiles []string
ConnectionID string

ReplicationFlushInterval time.Duration
}

var _ model.Source = (*MysqlSource)(nil)
Expand Down Expand Up @@ -163,6 +167,9 @@ func (s *MysqlSource) WithDefaults() {
if s.SnapshotDegreeOfParallelism <= 0 {
s.SnapshotDegreeOfParallelism = 4
}
if s.ReplicationFlushInterval == 0 {
s.ReplicationFlushInterval = DefaultReplicationFlushInterval
}
}

func (s *MysqlSource) HasTLS() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/mysql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (h *binlogHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.Qu
if len(h.inflight) == 0 {
break
}
time.Sleep(time.Second)
time.Sleep(h.config.ReplicationFlushInterval)
}
h.logger.Warn("DDL Query", log.Any("query", string(queryEvent.Query)), log.Any("position", nextPos))
execTS := time.Now()
Expand Down Expand Up @@ -550,7 +550,7 @@ func (p *publisher) flusher() {
h := p.handler
h.metrics.Master.Set(1)
if len(h.inflight) == 0 {
time.Sleep(time.Second)
time.Sleep(h.config.ReplicationFlushInterval)
continue
}
start := time.Now()
Expand Down

0 comments on commit 6730607

Please sign in to comment.