From 293d372fc560ee38af9bc5402e0be7bca2ea5db9 Mon Sep 17 00:00:00 2001 From: denisevdkmv Date: Thu, 29 Aug 2024 10:52:02 +0300 Subject: [PATCH] Fix PG fallback fix: fallback for sharded events b9df19035018a8843ccbaac4cd3f4527a3e38019 --- .../postgres/fallback_date_as_string.go | 2 +- .../yt_static/pg_scripts/create_tables.sql | 13 +++++++++++- .../e2e/pg2yt/yt_static/yt_static_test.go | 21 +++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/transfer_manager/go/pkg/providers/postgres/fallback_date_as_string.go b/transfer_manager/go/pkg/providers/postgres/fallback_date_as_string.go index dd8fb737..6249425a 100644 --- a/transfer_manager/go/pkg/providers/postgres/fallback_date_as_string.go +++ b/transfer_manager/go/pkg/providers/postgres/fallback_date_as_string.go @@ -14,7 +14,7 @@ func init() { Function: func(ci *abstract.ChangeItem) (*abstract.ChangeItem, error) { if !ci.IsRowEvent() { switch ci.Kind { - case abstract.InitTableLoad, abstract.DoneTableLoad: + case abstract.InitShardedTableLoad, abstract.InitTableLoad, abstract.DoneTableLoad, abstract.DoneShardedTableLoad: // perform fallback default: return ci, typesystem.FallbackDoesNotApplyErr diff --git a/transfer_manager/go/tests/e2e/pg2yt/yt_static/pg_scripts/create_tables.sql b/transfer_manager/go/tests/e2e/pg2yt/yt_static/pg_scripts/create_tables.sql index ef872894..c6353807 100644 --- a/transfer_manager/go/tests/e2e/pg2yt/yt_static/pg_scripts/create_tables.sql +++ b/transfer_manager/go/tests/e2e/pg2yt/yt_static/pg_scripts/create_tables.sql @@ -5,4 +5,15 @@ CREATE TABLE test_table INSERT INTO test_table SELECT id -FROM generate_series(1, 100) AS t(id); \ No newline at end of file +FROM generate_series(1, 100) AS t(id); + +CREATE TABLE test_timestamp( + id integer primary key, + tsz timestamp with time zone, + ts timestamp without time zone, + t timestamp not null +); + +INSERT INTO test_timestamp VALUES + (1, '2004-10-19 10:23:54+02', '2004-10-19 10:23:54', '2004-10-19 10:23:54'), + (2, '2004-10-19 10:23:54+02', '2004-10-19 10:23:54', '2004-10-19 10:23:54'); diff --git a/transfer_manager/go/tests/e2e/pg2yt/yt_static/yt_static_test.go b/transfer_manager/go/tests/e2e/pg2yt/yt_static/yt_static_test.go index 38316ed8..1b16f040 100644 --- a/transfer_manager/go/tests/e2e/pg2yt/yt_static/yt_static_test.go +++ b/transfer_manager/go/tests/e2e/pg2yt/yt_static/yt_static_test.go @@ -130,4 +130,25 @@ WHERE id >= 101 AND id <= 200; } require.False(t, table.Next()) }) + + t.Run("upload_with_old_type_system_ver", func(t *testing.T) { + transferWithOldVer := transfer + transferWithOldVer.TypeSystemVersion = 1 + tables := []abstract.TableDescription{{Name: "test_timestamp", Schema: "public"}} + snapshotLoader := tasks.NewSnapshotLoader(coordinator.NewStatefulFakeClient(), "test-operation1", transferWithOldVer, helpers.EmptyRegistry()) + require.NoError(t, snapshotLoader.UploadTables(ctx, tables, true)) + table, err := ytEnv.YT.ReadTable(ctx, ypath.Path("//home/cdc/tests/e2e/pg2yt/yt_static/test_timestamp"), nil) + require.NoError(t, err) + defer func(table yt.TableReader) { + err := table.Close() + require.NoError(t, err) + }(table) + for id := 1; id <= 2; id++ { + require.Truef(t, table.Next(), "no row for id %v", id) + var row testTableRow + require.NoErrorf(t, table.Scan(&row), "unable to scan row for id %v", id) + require.Equal(t, id, row.ID) + } + require.False(t, table.Next()) + }) }