Skip to content

Commit

Permalink
Fix PG fallback
Browse files Browse the repository at this point in the history
fix: fallback for sharded events
b9df19035018a8843ccbaac4cd3f4527a3e38019
  • Loading branch information
DenisEvd committed Aug 29, 2024
1 parent 43d4d81 commit 293d372
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {
Function: func(ci *abstract.ChangeItem) (*abstract.ChangeItem, error) {
if !ci.IsRowEvent() {
switch ci.Kind {
case abstract.InitTableLoad, abstract.DoneTableLoad:
case abstract.InitShardedTableLoad, abstract.InitTableLoad, abstract.DoneTableLoad, abstract.DoneShardedTableLoad:
// perform fallback
default:
return ci, typesystem.FallbackDoesNotApplyErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,15 @@ CREATE TABLE test_table

INSERT INTO test_table
SELECT id
FROM generate_series(1, 100) AS t(id);
FROM generate_series(1, 100) AS t(id);

CREATE TABLE test_timestamp(
id integer primary key,
tsz timestamp with time zone,
ts timestamp without time zone,
t timestamp not null
);

INSERT INTO test_timestamp VALUES
(1, '2004-10-19 10:23:54+02', '2004-10-19 10:23:54', '2004-10-19 10:23:54'),
(2, '2004-10-19 10:23:54+02', '2004-10-19 10:23:54', '2004-10-19 10:23:54');
21 changes: 21 additions & 0 deletions transfer_manager/go/tests/e2e/pg2yt/yt_static/yt_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,25 @@ WHERE id >= 101 AND id <= 200;
}
require.False(t, table.Next())
})

t.Run("upload_with_old_type_system_ver", func(t *testing.T) {
transferWithOldVer := transfer
transferWithOldVer.TypeSystemVersion = 1
tables := []abstract.TableDescription{{Name: "test_timestamp", Schema: "public"}}
snapshotLoader := tasks.NewSnapshotLoader(coordinator.NewStatefulFakeClient(), "test-operation1", transferWithOldVer, helpers.EmptyRegistry())
require.NoError(t, snapshotLoader.UploadTables(ctx, tables, true))
table, err := ytEnv.YT.ReadTable(ctx, ypath.Path("//home/cdc/tests/e2e/pg2yt/yt_static/test_timestamp"), nil)
require.NoError(t, err)
defer func(table yt.TableReader) {
err := table.Close()
require.NoError(t, err)
}(table)
for id := 1; id <= 2; id++ {
require.Truef(t, table.Next(), "no row for id %v", id)
var row testTableRow
require.NoErrorf(t, table.Scan(&row), "unable to scan row for id %v", id)
require.Equal(t, id, row.ID)
}
require.False(t, table.Next())
})
}

0 comments on commit 293d372

Please sign in to comment.