Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make replication flush interval configurable in mysql source #202

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ You can use this connector both for **source** and **target** endpoints.
"ConsistentSnapshot": true,
"SnapshotDegreeOfParallelism": 4,
"AllowDecimalAsFloat": false,
"RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"]
"RootCAFiles": ["/path/to/ca1.pem", "/path/to/ca2.pem"],
"ReplicationFlushInterval": 5000000000
}
```

Expand Down Expand Up @@ -169,6 +170,9 @@ You can use this connector both for **source** and **target** endpoints.
- **PlzNoHomo** (`bool`): Forces disabling of homogeneous features, primarily used for testing and specific configurations.

- **RootCAFiles** (`[]string`): List of paths to root CA files for validating SSL connections to the MySQL server.

- **ReplicationFlushInterval** (`time.Duration`): Specifies the replication flush interval. Defined in nanoseconds.
- Example: `5000000000` (5 seconds)
Comment on lines +174 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change it to milliseconds (1/1000 of second). I don't see a case when user will need to specify interval with such accuracy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see: #215
this would allow to type 5s instead of 5000000000

Copy link
Contributor Author

@saurabhojha saurabhojha Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating to yaml.v3 wouldn't solve this issue. Since we first convert yaml configuration to a TransferYamlView which parses entire config as any type. Converting from json (done later in NewSource and NewDestination methods) to the desired config struct again would raise the following error:

unable to construct missed fields: failed to decode: 1 error(s) decoding:

  • 'ReplicationFlushInterval' expected type 'time.Duration', got unconvertible type 'string', value: '5s'


---

Expand Down
1 change: 1 addition & 0 deletions examples/mysql2ch/transfer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ src:
Password: mypassword
Database: mydb
Port: 3306
ReplicationFlushInterval: 5000000000
dst:
type: ch
params:
Expand Down
3 changes: 2 additions & 1 deletion examples/mysql2kafka/transfer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ src:
"User": "testuser",
"Password": "testpassword",
"Database": "testdb",
"Port": 3306
"Port": 3306,
"ReplicationFlushInterval": 5000000000
}
dst:
type: kafka
Expand Down
16 changes: 11 additions & 5 deletions pkg/providers/mysql/model_source.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package mysql

import (
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
"hash/fnv"
"regexp"
"strings"

"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
"time"
)

type MysqlFlavorType string

const (
MysqlFlavorTypeMysql = "mysql"
MysqlFlavorTypeMariaDB = "mariadb"
MysqlFlavorTypeMysql = "mysql"
MysqlFlavorTypeMariaDB = "mariadb"
DefaultReplicationFlushInterval = time.Second
)

type MysqlTrackerStorage string
Expand Down Expand Up @@ -52,6 +53,8 @@ type MysqlSource struct {
PlzNoHomo bool // forcefully disable homo features, mostly for tests
RootCAFiles []string
ConnectionID string

ReplicationFlushInterval time.Duration
}

var _ model.Source = (*MysqlSource)(nil)
Expand Down Expand Up @@ -163,6 +166,9 @@ func (s *MysqlSource) WithDefaults() {
if s.SnapshotDegreeOfParallelism <= 0 {
s.SnapshotDegreeOfParallelism = 4
}
if s.ReplicationFlushInterval == 0 {
s.ReplicationFlushInterval = DefaultReplicationFlushInterval
}
}

func (s *MysqlSource) HasTLS() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/mysql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (h *binlogHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.Qu
if len(h.inflight) == 0 {
break
}
time.Sleep(time.Second)
time.Sleep(h.config.ReplicationFlushInterval)
}
h.logger.Warn("DDL Query", log.Any("query", string(queryEvent.Query)), log.Any("position", nextPos))
execTS := time.Now()
Expand Down Expand Up @@ -550,7 +550,7 @@ func (p *publisher) flusher() {
h := p.handler
h.metrics.Master.Set(1)
if len(h.inflight) == 0 {
time.Sleep(time.Second)
time.Sleep(h.config.ReplicationFlushInterval)
continue
}
start := time.Now()
Expand Down
Loading