Skip to content

Commit

Permalink
TRANSFER-825: Allow * in ch-source
Browse files Browse the repository at this point in the history
1324d55dfbb24bb5d8bfa0c63dce74b73620d98f
  • Loading branch information
laskoviymishka committed Aug 27, 2024
1 parent 13d08c1 commit dbc3616
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 7 deletions.
16 changes: 10 additions & 6 deletions transfer_manager/go/pkg/providers/clickhouse/a2_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ type HTTPTarget struct {

var syntaxErrorRegexp = regexp.MustCompile(`^.*\(at row ([0-9]+)\).*$`)

func (c *HTTPTarget) toAltName(tableName string) string {
func (c *HTTPTarget) toAltName(dbName string, tableName string) string {
targetDB := c.config.Database()
if targetDB == "" {
targetDB = dbName
}
if altName, ok := c.altNames[tableName]; ok {
tableName = altName
}
return tableName
return fmt.Sprintf("`%s`.`%s`", targetDB, tableName)
}

func (c *HTTPTarget) AsyncPush(input base.EventBatch) chan error {
Expand All @@ -70,8 +74,8 @@ func (c *HTTPTarget) AsyncPush(input base.EventBatch) chan error {
}

blob := []byte(fmt.Sprintf(
"INSERT INTO `%s` (%s) %s FORMAT %s\n",
c.toAltName(table.Name),
"INSERT INTO %s (%s) %s FORMAT %s\n",
c.toAltName(table.Schema, table.Name),
strings.Join(escapedColumnNames, ","),
c.config.InsertSettings().AsQueryPart(),
batch.Format,
Expand Down Expand Up @@ -156,7 +160,7 @@ func (c *HTTPTarget) AsyncPush(input base.EventBatch) chan error {
}

err := c.execDDL(func(distributed bool) error {
q := fmt.Sprintf(ddl, c.tableReferenceForDDL(c.toAltName(event.Name), distributed))
q := fmt.Sprintf(ddl, c.tableReferenceForDDL(c.toAltName(event.Namespace, event.Name), distributed))
return c.client.Exec(context.Background(), c.logger, c.HostByPart(nil), q)
})
if err != nil {
Expand Down Expand Up @@ -262,7 +266,7 @@ func (c *HTTPTarget) tableReferenceForDDL(name string, distributed bool) string
if distributed {
cluster = fmt.Sprintf(" ON CLUSTER `%s`", c.cluster.Name())
}
return fmt.Sprintf("`%s`.`%s`%s", c.config.Database(), name, cluster)
return fmt.Sprintf("%s%s", name, cluster)
}

func (c *HTTPTarget) resolveCluster() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (s *ChSource) fulfilledIncludesImpl(tID abstract.TableID, firstIncludeOnly
strings.Join([]string{tID.Namespace, ".", "*"}, ""),
}
tIDNameVariant := strings.Join([]string{"\"", tID.Name, "\""}, tID.Name)

if s.Database == "*" {
return []string{tID.Fqtn()}
}
for _, table := range s.ExcludeTables {
for _, variant := range tIDVariants {
if table == variant {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (w connConfigWrapper) ResolvePassword() (string, error) {
}

func (w connConfigWrapper) Database() string {
if w.p.Database == "*" {
return ""
}
return w.p.Database
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func MakeDistributedDDL(sql, cluster string) string {
}

func SetTargetDatabase(ddl string, sourceDB, targetDB string) string {
if targetDB == "" {
return ddl
}
switch {
case strings.Contains(ddl, fmt.Sprintf("CREATE TABLE %v.", sourceDB)):
ddl = strings.Replace(ddl, fmt.Sprintf("CREATE TABLE %v.", sourceDB), fmt.Sprintf("CREATE TABLE `%v`.", targetDB), 1)
Expand Down
5 changes: 5 additions & 0 deletions transfer_manager/go/pkg/providers/clickhouse/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

var (
systemDBs = util.NewSet("INFORMATION_SCHEMA", "information_schema", "system")
nonTransferableTypes = []string{
// For now we do not support next types:
// Array(Date)
Expand Down Expand Up @@ -504,6 +505,10 @@ func (s *Storage) listTables(schema, name string) ([]table, error) {
if err := tablesRes.Scan(&database, &name, &totalRows, &pkeys, &engine); err != nil {
return nil, xerrors.Errorf("unable to parse table list query result: %w", err)
}

if systemDBs.Contains(database) {
continue
}
if totalRows != nil {
rows = *totalRows
}
Expand Down
40 changes: 40 additions & 0 deletions transfer_manager/go/tests/e2e/ch2ch/multi_db/check_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package snapshot

import (
"context"
"os"
"testing"

"github.com/doublecloud/transfer/library/go/core/metrics/solomon"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/coordinator"
chrecipe "github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/recipe"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/worker/tasks"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers"
"github.com/stretchr/testify/require"
)

var (
TransferType = abstract.TransferTypeSnapshotOnly
Source = *chrecipe.MustSource(chrecipe.WithInitFile("dump/src.sql"), chrecipe.WithDatabase("*"))
Target = *chrecipe.MustTarget(chrecipe.WithInitFile("dump/dst.sql"), chrecipe.WithDatabase(""), chrecipe.WithPrefix("DB0_"))
)

func init() {
_ = os.Setenv("YC", "1") // to not go to vanga
helpers.InitSrcDst(helpers.TransferID, &Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable
}

func TestSnapshot(t *testing.T) {
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "CH source", Port: Source.NativePort},
helpers.LabeledPort{Label: "CH target", Port: Target.NativePort},
))
}()

transfer := helpers.MakeTransfer("fake", &Source, &Target, abstract.TransferTypeSnapshotOnly)
require.NoError(t, tasks.ActivateDelivery(context.Background(), nil, coordinator.NewFakeClient(), *transfer, solomon.NewRegistry(solomon.NewRegistryOpts())))
Target.Database = "*" // to force read all db-s
require.NoError(t, helpers.CompareStorages(t, Source, Target, helpers.NewCompareStorageParams()))
}
3 changes: 3 additions & 0 deletions transfer_manager/go/tests/e2e/ch2ch/multi_db/dump/dst.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE DATABASE IF NOT EXISTS db1;
CREATE DATABASE IF NOT EXISTS db2;
CREATE DATABASE IF NOT EXISTS db3;
51 changes: 51 additions & 0 deletions transfer_manager/go/tests/e2e/ch2ch/multi_db/dump/src.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
CREATE DATABASE IF NOT EXISTS db1;

CREATE TABLE db1.long_line
(
`id` UInt64,
`value` String
)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;

INSERT INTO db1.long_line
(`id`, `value`)
VALUES
(1, repeat('a', 5000))
;

CREATE DATABASE IF NOT EXISTS db2;

CREATE TABLE db2.long_line
(
`id` UInt64,
`value` String
)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;

INSERT INTO db1.long_line
(`id`, `value`)
VALUES
(1, repeat('b', 5000))
;


CREATE DATABASE IF NOT EXISTS db3;

CREATE TABLE db3.long_line
(
`id` UInt64,
`value` String
)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;

INSERT INTO db1.long_line
(`id`, `value`)
VALUES
(1, repeat('c', 5000))
;

0 comments on commit dbc3616

Please sign in to comment.