Skip to content

Commit

Permalink
Add private flag for empty table YT dyn Sink
Browse files Browse the repository at this point in the history
Add private flag for transfering even empty tables in YT dyn dst
commit_hash:f6cbb2f58bec24b39dd7116675ac1eb88b595ef8
  • Loading branch information
DenisEvd committed Jan 17, 2025
1 parent 228daa1 commit 2acfc61
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -3017,6 +3017,8 @@
"tests/e2e/pg2yt/snapshot/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2yt/snapshot/dump/type_check.sql",
"tests/e2e/pg2yt/snapshot_and_replication/check_db_test.go":"transfer_manager/go/tests/e2e/pg2yt/snapshot_and_replication/check_db_test.go",
"tests/e2e/pg2yt/snapshot_and_replication/dump/dump.sql":"transfer_manager/go/tests/e2e/pg2yt/snapshot_and_replication/dump/dump.sql",
"tests/e2e/pg2yt/snapshot_empty_tables/check_db_test.go":"transfer_manager/go/tests/e2e/pg2yt/snapshot_empty_tables/check_db_test.go",
"tests/e2e/pg2yt/snapshot_empty_tables/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2yt/snapshot_empty_tables/dump/type_check.sql",
"tests/e2e/pg2yt/snapshot_incremental/check_db_test.go":"transfer_manager/go/tests/e2e/pg2yt/snapshot_incremental/check_db_test.go",
"tests/e2e/pg2yt/snapshot_incremental/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2yt/snapshot_incremental/dump/type_check.sql",
"tests/e2e/pg2yt/snapshot_incremental_sharded/check_db_test.go":"transfer_manager/go/tests/e2e/pg2yt/snapshot_incremental_sharded/check_db_test.go",
Expand Down
8 changes: 8 additions & 0 deletions pkg/providers/yt/model_yt_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type YtDestinationModel interface {
// with the priority to the latter one
// It guarantees to keep unchanged both the argument and custom attributes map in the model
MergeAttributes(tableSettings map[string]any) map[string]any

// If is true, creates empty tables
CreateEmptyTables() bool
}

type YtDestination struct {
Expand Down Expand Up @@ -127,6 +130,7 @@ type YtDestination struct {
Ordered bool
TransformerConfig map[string]string
UseStaticTableOnSnapshot bool // optional.Optional[bool] breaks compatibility
CreateEmptyTables bool
AltNames map[string]string
Cleanup dp_model.CleanupType
Spec YTSpec
Expand Down Expand Up @@ -503,6 +507,10 @@ func (d *YtDestinationWrapper) LegacyModel() interface{} {
return d.Model
}

func (d *YtDestinationWrapper) CreateEmptyTables() bool {
return d.Model.UseStaticTableOnSnapshot && d.Model.CreateEmptyTables && d.Model.Rotation == nil
}

func NewYtDestinationV1(model YtDestination) YtDestinationModel {
return &YtDestinationWrapper{
Model: &model,
Expand Down
7 changes: 7 additions & 0 deletions pkg/providers/yt/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (s *sinker) Push(input []abstract.ChangeItem) error {
s.logger.Info("static table snapshot is initiated because policy UseStaticTableOnSnapshot is on")
// delay handler application until schema is unknown
s.staticSnapshotState[rotatedName] = StaticTableSnapshotInitialized

if s.config.CreateEmptyTables() {
s.logger.Info("create empty table", log.String("name", name))
if err := s.checkTable(item.TableSchema.Columns(), name); err != nil {
return xerrors.Errorf("unable to create table on init table load (CreateEmptyTables is true): %w", err)
}
}
} else {
s.logger.Info("static table snapshot is not initiated because policy UseStaticTableOnSnapshot is off")
}
Expand Down
63 changes: 63 additions & 0 deletions tests/e2e/pg2yt/snapshot_empty_tables/check_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package snapshot

import (
"os"
"testing"

"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/providers/postgres"
yt_provider "github.com/doublecloud/transfer/pkg/providers/yt"
"github.com/doublecloud/transfer/tests/helpers"
yt_helpers "github.com/doublecloud/transfer/tests/helpers/yt"
"github.com/stretchr/testify/require"
)

var (
Source = postgres.PgSource{
ClusterID: os.Getenv("PG_CLUSTER_ID"),
Hosts: []string{"localhost"},
User: os.Getenv("PG_LOCAL_USER"),
Password: model.SecretString(os.Getenv("PG_LOCAL_PASSWORD")),
Database: os.Getenv("PG_LOCAL_DATABASE"),
Port: helpers.GetIntFromEnv("PG_LOCAL_PORT"),
DBTables: []string{"public.__test_empty"},
}
Target = yt_helpers.RecipeYtTarget("//home/cdc/test/pg2yt_e2e").(*yt_provider.YtDestinationWrapper)
)

func init() {
_ = os.Setenv("YC", "1") // to not go to vanga
Source.WithDefaults()
}

func TestMain(m *testing.M) {
yt_provider.InitExe()
os.Exit(m.Run())
}

func TestGroup(t *testing.T) {
targetPort, err := helpers.GetPortFromStr(Target.Cluster())
require.NoError(t, err)
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
helpers.LabeledPort{Label: "YT target", Port: targetPort},
))
}()

t.Run("Group after port check", func(t *testing.T) {
t.Run("Snapshot", Snapshot)
})
}

func Snapshot(t *testing.T) {
Source.PreSteps.Constraint = true
Target.Model.UseStaticTableOnSnapshot = true
Target.Model.CreateEmptyTables = true
transfer := helpers.MakeTransfer(helpers.TransferID, &Source, Target, abstract.TransferTypeSnapshotOnly)

_ = helpers.Activate(t, transfer)

require.NoError(t, helpers.CompareStorages(t, Source, Target.LegacyModel(), helpers.NewCompareStorageParams()))
}
4 changes: 4 additions & 0 deletions tests/e2e/pg2yt/snapshot_empty_tables/dump/type_check.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create table __test_empty (
id SERIAL PRIMARY KEY,
name TEXT
);

0 comments on commit 2acfc61

Please sign in to comment.