From d4b395ba5964cbdcda183b2d580f2ea74124f424 Mon Sep 17 00:00:00 2001 From: tserakhau Date: Thu, 6 Feb 2025 14:33:36 +0100 Subject: [PATCH] MySQL sharding storage Enable sharding by mysql partitions Closes: https://github.com/doublecloud/transfer/issues/172 --- pkg/providers/mysql/storage_sharding.go | 53 +++++++++++++++++++ .../mysql/tests/sharding/dump/source.sql | 21 ++++++++ .../tests/sharding/storage_sharding_test.go | 40 ++++++++++++++ pkg/providers/mysql/utils.go | 10 +++- 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 pkg/providers/mysql/storage_sharding.go create mode 100644 pkg/providers/mysql/tests/sharding/dump/source.sql create mode 100644 pkg/providers/mysql/tests/sharding/storage_sharding_test.go diff --git a/pkg/providers/mysql/storage_sharding.go b/pkg/providers/mysql/storage_sharding.go new file mode 100644 index 00000000..fbcd1205 --- /dev/null +++ b/pkg/providers/mysql/storage_sharding.go @@ -0,0 +1,53 @@ +package mysql + +import ( + "context" + "fmt" + "github.com/doublecloud/transfer/internal/logger" + "github.com/doublecloud/transfer/library/go/core/xerrors" + "github.com/doublecloud/transfer/pkg/abstract" +) + +func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) { + partitions, err := s.resolvePartitions(ctx, table) + if err != nil { + logger.Log.Warnf("unable to load child tables: %v", err) + } + if len(partitions) > 0 { + return partitions, nil + } + + return []abstract.TableDescription{table}, nil +} + +func (s *Storage) resolvePartitions(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) { + rows, err := s.DB.QueryContext( + ctx, + ` +SELECT PARTITION_NAME, TABLE_ROWS +FROM INFORMATION_SCHEMA.PARTITIONS +WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?; +`, + table.Schema, + table.Name, + ) + if err != nil { + return nil, xerrors.Errorf("unable to resolve partitions: %w", err) + } + var res []abstract.TableDescription + for rows.Next() { + var partName string + var tableRows int + if err := rows.Scan(&partName, &tableRows); err != nil { + return nil, xerrors.Errorf("unable to scan partition: %w", err) + } + res = append(res, abstract.TableDescription{ + Name: table.Name, + Schema: table.Schema, + Filter: abstract.WhereStatement(fmt.Sprintf("PARTITION (%s)", partName)), + EtaRow: uint64(tableRows), + Offset: 0, + }) + } + return res, nil +} diff --git a/pkg/providers/mysql/tests/sharding/dump/source.sql b/pkg/providers/mysql/tests/sharding/dump/source.sql new file mode 100644 index 00000000..417ea29b --- /dev/null +++ b/pkg/providers/mysql/tests/sharding/dump/source.sql @@ -0,0 +1,21 @@ +CREATE TABLE orders ( + id INT NOT NULL AUTO_INCREMENT, + order_date DATE NOT NULL, + customer_name VARCHAR(100), + amount DECIMAL(10,2), + PRIMARY KEY (id, order_date) +) +PARTITION BY RANGE (YEAR(order_date)) ( + PARTITION p2022 VALUES LESS THAN (2023), + PARTITION p2023 VALUES LESS THAN (2024), + PARTITION p2024 VALUES LESS THAN (2025), + PARTITION p_future VALUES LESS THAN MAXVALUE +); + +INSERT INTO orders (order_date, customer_name, amount) VALUES + ('2022-05-10', 'Alice', 150.00), + ('2022-08-21', 'Bob', 200.00), + ('2023-03-15', 'Charlie', 300.00), + ('2023-11-05', 'David', 400.00), + ('2024-01-10', 'Eve', 500.00), + ('2024-07-18', 'Frank', 600.00); diff --git a/pkg/providers/mysql/tests/sharding/storage_sharding_test.go b/pkg/providers/mysql/tests/sharding/storage_sharding_test.go new file mode 100644 index 00000000..69966c7b --- /dev/null +++ b/pkg/providers/mysql/tests/sharding/storage_sharding_test.go @@ -0,0 +1,40 @@ +package sharding + +import ( + "context" + "github.com/doublecloud/transfer/pkg/abstract" + "github.com/doublecloud/transfer/pkg/providers/mysql" + "github.com/doublecloud/transfer/pkg/providers/mysql/mysqlrecipe" + "github.com/stretchr/testify/require" + "testing" +) + +func TestShardingByPartitions(t *testing.T) { + source := mysqlrecipe.RecipeMysqlSource() + storage, err := mysql.NewStorage(source.ToStorageParams()) + require.NoError(t, err) + parts, err := storage.ShardTable(context.Background(), abstract.TableDescription{ + Name: "orders", + Schema: source.Database, + Filter: "", + EtaRow: 0, + Offset: 0, + }) + require.NoError(t, err) + require.Len(t, parts, 4) + resRows := 0 + for _, part := range parts { + require.NoError( + t, + storage.LoadTable(context.Background(), part, func(items []abstract.ChangeItem) error { + for _, r := range items { + if r.IsRowEvent() { + resRows++ + } + } + return nil + }), + ) + } + require.Equal(t, resRows, 6) +} diff --git a/pkg/providers/mysql/utils.go b/pkg/providers/mysql/utils.go index b4a04f05..27dcfa45 100644 --- a/pkg/providers/mysql/utils.go +++ b/pkg/providers/mysql/utils.go @@ -118,7 +118,11 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co ) if table.Filter != "" { - resultQuery += " WHERE " + string(table.Filter) + if IsPartition(table.Filter) { + resultQuery += string(table.Filter) + } else { + resultQuery += " WHERE " + string(table.Filter) + } } if table.Offset != 0 { resultQuery += fmt.Sprintf(" OFFSET %d", table.Offset) @@ -127,6 +131,10 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co return resultQuery } +func IsPartition(filter abstract.WhereStatement) bool { + return strings.HasPrefix(string(filter), "PARTITION") +} + func MakeArrBacktickedColumnNames(tableSchema *[]abstract.ColSchema) []string { colNames := make([]string, len(*tableSchema)) for idx, col := range *tableSchema {