Skip to content

Commit

Permalink
MySQL sharding storage
Browse files Browse the repository at this point in the history
Enable sharding by mysql partitions
Closes: #172

---

Pull Request resolved: #205

Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
commit_hash:9d122f2ca92d2b24309bbcb15015aacd8f828b25
  • Loading branch information
laskoviymishka authored and robot-piglet committed Feb 21, 2025
1 parent 7e1cb51 commit 33883c4
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1554,11 +1554,14 @@
"pkg/providers/mysql/sink_test.go":"transfer_manager/go/pkg/providers/mysql/sink_test.go",
"pkg/providers/mysql/source.go":"transfer_manager/go/pkg/providers/mysql/source.go",
"pkg/providers/mysql/storage.go":"transfer_manager/go/pkg/providers/mysql/storage.go",
"pkg/providers/mysql/storage_sharding.go":"transfer_manager/go/pkg/providers/mysql/storage_sharding.go",
"pkg/providers/mysql/storage_test.go":"transfer_manager/go/pkg/providers/mysql/storage_test.go",
"pkg/providers/mysql/sync.go":"transfer_manager/go/pkg/providers/mysql/sync.go",
"pkg/providers/mysql/sync_binlog_position.go":"transfer_manager/go/pkg/providers/mysql/sync_binlog_position.go",
"pkg/providers/mysql/table_progress.go":"transfer_manager/go/pkg/providers/mysql/table_progress.go",
"pkg/providers/mysql/tasks.go":"transfer_manager/go/pkg/providers/mysql/tasks.go",
"pkg/providers/mysql/tests/sharding/source.sql":"transfer_manager/go/pkg/providers/mysql/tests/sharding/source.sql",
"pkg/providers/mysql/tests/sharding/storage_sharding_test.go":"transfer_manager/go/pkg/providers/mysql/tests/sharding/storage_sharding_test.go",
"pkg/providers/mysql/tracker.go":"transfer_manager/go/pkg/providers/mysql/tracker.go",
"pkg/providers/mysql/typesystem.go":"transfer_manager/go/pkg/providers/mysql/typesystem.go",
"pkg/providers/mysql/typesystem.md":"transfer_manager/go/pkg/providers/mysql/typesystem.md",
Expand Down
58 changes: 58 additions & 0 deletions pkg/providers/mysql/storage_sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package mysql

import (
"context"
"fmt"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
)

func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) {
partitions, err := s.resolvePartitions(ctx, table)
if err != nil {
logger.Log.Warnf("unable to load child tables: %v", err)
}
if len(partitions) > 0 {
return partitions, nil
}

return []abstract.TableDescription{table}, nil
}

func (s *Storage) resolvePartitions(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) {
rows, err := s.DB.QueryContext(
ctx,
`
SELECT PARTITION_NAME, TABLE_ROWS
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?;
`,
table.Schema,
table.Name,
)
if err != nil {
return nil, xerrors.Errorf("unable to resolve partitions: %w", err)
}
var res []abstract.TableDescription
for rows.Next() {
var partName string
var tableRows int
if err := rows.Scan(&partName, &tableRows); err != nil {
return nil, xerrors.Errorf("unable to scan partition: %w", err)
}
logger.Log.Infof("resolve part: %s (%v)", partName, tableRows)
res = append(res, abstract.TableDescription{
Name: table.Name,
Schema: table.Schema,
Filter: abstract.WhereStatement(fmt.Sprintf("PARTITION (%s)", partName)),
EtaRow: uint64(tableRows),
Offset: 0,
})
}
if rows.Err() != nil {
return nil, xerrors.Errorf("unable to read rows: %w", rows.Err())
}
return res, nil
}
21 changes: 21 additions & 0 deletions pkg/providers/mysql/tests/sharding/source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE orders (
id INT NOT NULL AUTO_INCREMENT,
order_date DATE NOT NULL,
customer_name VARCHAR(100),
amount DECIMAL(10,2),
PRIMARY KEY (id, order_date)
)
PARTITION BY RANGE (YEAR(order_date)) (
PARTITION p2022 VALUES LESS THAN (2023),
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p_future VALUES LESS THAN MAXVALUE
);

INSERT INTO orders (order_date, customer_name, amount) VALUES
('2022-05-10', 'Alice', 150.00),
('2022-08-21', 'Bob', 200.00),
('2023-03-15', 'Charlie', 300.00),
('2023-11-05', 'David', 400.00),
('2024-01-10', 'Eve', 500.00),
('2024-07-18', 'Frank', 600.00);
59 changes: 59 additions & 0 deletions pkg/providers/mysql/tests/sharding/storage_sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package sharding

import (
"context"
_ "embed"
"testing"

"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/providers/mysql"
"github.com/doublecloud/transfer/pkg/providers/mysql/mysqlrecipe"
default_mysql "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"
)

//go:embed source.sql
var sourceDB []byte

func TestShardingByPartitions(t *testing.T) {
source := mysqlrecipe.RecipeMysqlSource()
if source.Database == "" {
// init database
source.Database = "source"
}
connectionParams, err := mysql.NewConnectionParams(source.ToStorageParams())
require.NoError(t, err)
db, err := mysql.Connect(connectionParams, func(config *default_mysql.Config) error {
config.MultiStatements = true
return nil
})
require.NoError(t, err)
_, err = db.Exec(string(sourceDB))
require.NoError(t, err)
storage, err := mysql.NewStorage(source.ToStorageParams())
require.NoError(t, err)
parts, err := storage.ShardTable(context.Background(), abstract.TableDescription{
Name: "orders",
Schema: source.Database,
Filter: "",
EtaRow: 0,
Offset: 0,
})
require.NoError(t, err)
require.Len(t, parts, 4)
resRows := 0
for _, part := range parts {
require.NoError(
t,
storage.LoadTable(context.Background(), part, func(items []abstract.ChangeItem) error {
for _, r := range items {
if r.IsRowEvent() {
resRows++
}
}
return nil
}),
)
}
require.Equal(t, resRows, 6)
}
12 changes: 11 additions & 1 deletion pkg/providers/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co
)

if table.Filter != "" {
resultQuery += " WHERE " + string(table.Filter)
if IsPartition(table.Filter) {
// we use partition-by sharding mechanism, to query exact partition there is no need to use `where` key-word
// see: https://stackoverflow.com/questions/14112283/how-to-select-rows-from-partition-in-mysql
resultQuery += string(table.Filter)
} else {
resultQuery += " WHERE " + string(table.Filter)
}
}
if table.Offset != 0 {
resultQuery += fmt.Sprintf(" OFFSET %d", table.Offset)
Expand All @@ -127,6 +133,10 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co
return resultQuery
}

func IsPartition(filter abstract.WhereStatement) bool {
return strings.HasPrefix(string(filter), "PARTITION")
}

func MakeArrBacktickedColumnNames(tableSchema *[]abstract.ColSchema) []string {
colNames := make([]string, len(*tableSchema))
for idx, col := range *tableSchema {
Expand Down

0 comments on commit 33883c4

Please sign in to comment.