Skip to content

Commit

Permalink
e2e tests without init container
Browse files Browse the repository at this point in the history
  • Loading branch information
laskoviymishka committed Sep 8, 2024
1 parent efe84f3 commit d3ece23
Show file tree
Hide file tree
Showing 38 changed files with 546 additions and 664 deletions.
44 changes: 41 additions & 3 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

jobs:
build:
name: Build and push image
name: Build
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand All @@ -21,10 +21,48 @@ jobs:
go-version: "1.22.0"
- shell: bash
run: |
go install gotest.tools/gotestsum@latest
make build
test-cli:
needs: build
name: CLI
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup Go 1.22.0
uses: actions/setup-go@v5
with:
go-version: "1.22.0"
- shell: bash
run: |
make build
go install gotest.tools/gotestsum@latest
- shell: bash
run: |
make test
e2e-tests:
needs: build
name: e2e
strategy:
continue-on-error: true
matrix:
suite: [pg2pg, pg2ch, kafka2ch]
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup Go 1.22.0
uses: actions/setup-go@v5
with:
go-version: "1.22.0"
- shell: bash
run: |
go install gotest.tools/gotestsum@latest
- shell: bash
run: |
curl https://clickhouse.com/ | sh
- shell: bash
run: |
export USE_TESTCONTAINERS=1
gotestsum --rerun-fails --format github-actions --packages="./transfer_manager/go/tests/e2e/${{ matrix.suite }}/..." -- -timeout=30m
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ build:

test:
USE_TESTCONTAINERS=1 gotestsum --rerun-fails --format github-actions --packages="./transfer_manager/go/cmd/..." -- -timeout=30m
USE_TESTCONTAINERS=1 gotestsum --rerun-fails --format github-actions --packages="./transfer_manager/go/tests/e2e/pg2pg/..." -- -timeout=30m
9 changes: 9 additions & 0 deletions library/go/core/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
FieldTypeContext
// FieldTypeLazyCall wraps function to lazy evaluate it after log level confirm
FieldTypeLazyCall
// FieldTypeStringer is for fmt.Stringer
FieldTypeStringer

fieldTypeLast // service type for testing purposes
)
Expand Down Expand Up @@ -166,6 +168,8 @@ func (f Field) Any() interface{} {
return f.Interface()
case FieldTypeLazyCall:
return f.Interface()
case FieldTypeStringer:
return f.Interface()
default:
// For when new field type is not added to this func
panic(fmt.Sprintf("unknown field type: %d", f.Type()))
Expand All @@ -182,6 +186,11 @@ func String(key, value string) Field {
return Field{key: key, ftype: FieldTypeString, string: value}
}

// Stringer constructs field from fmt.Stringer interface
func Stringer(key string, value fmt.Stringer) Field {
return Field{key: key, ftype: FieldTypeStringer, iface: value}
}

// Sprintf constructs field of string type with formatting
func Sprintf(key, format string, args ...interface{}) Field {
return Field{key: key, ftype: FieldTypeString, string: fmt.Sprintf(format, args...)}
Expand Down
2 changes: 2 additions & 0 deletions library/go/core/log/zap/zapify.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func zapifyField(field log.Field) zap.Field {
return zap.Reflect(field.Key(), field.Interface())
case log.FieldTypeByteString:
return zap.ByteString(field.Key(), field.Binary())
case log.FieldTypeStringer:
return zap.Stringer(field.Key(), field.Interface().(fmt.Stringer))
case log.FieldTypeContext:
return Context(field.Interface().(context.Context))
case log.FieldTypeLazyCall:
Expand Down
14 changes: 7 additions & 7 deletions transfer_manager/go/tests/e2e/ch2s3/snapshot/check_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"github.com/stretchr/testify/require"
)

var (
testBucket = s3_provider.EnvOrDefault("TEST_BUCKET", "barrel")
TransferType = abstract.TransferTypeSnapshotOnly
Source = *chrecipe.MustSource(chrecipe.WithInitFile("dump/src.sql"), chrecipe.WithDatabase("clickhouse_test"))
)

func TestSnapshotParquet(t *testing.T) {
var (
testBucket = s3_provider.EnvOrDefault("TEST_BUCKET", "barrel")
TransferType = abstract.TransferTypeSnapshotOnly
Source = *chrecipe.MustSource(chrecipe.WithInitFile("dump/src.sql"), chrecipe.WithDatabase("clickhouse_test"))
)

s3Target := s3_provider.PrepareS3(t, testBucket, server.ParsingFormatPARQUET, s3_provider.GzipEncoding)
s3Target.WithDefaults()

Expand Down Expand Up @@ -77,5 +77,5 @@ func TestSnapshotParquet(t *testing.T) {

data, err := io.ReadAll(obj.Body)
require.NoError(t, err)
logger.Log.Infof("object: %v content:\n%v", *objects.Contents[0].Key, string(data))
logger.Log.Infof("object: %v content:\n%v", *objects.Contents[0].Key, len(data))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (

var (
kafkaTopic = "topic1"
source = *kafkasink.MustSourceRecipe()

chDatabase = "public"
target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
targetAsSource = *chrecipe.MustSource(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))

timestampToUse = time.Date(2024, 03, 19, 0, 0, 0, 0, time.Local)
)

Expand All @@ -51,6 +47,11 @@ func fixTimestampMiddleware(t *testing.T, items []abstract.ChangeItem) abstract.

func TestReplication(t *testing.T) {
// prepare source
var (
source = *kafkasink.MustSourceRecipe()
target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
targetAsSource = *chrecipe.MustSource(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
)

parserConfigStruct := &jsonparser.ParserConfigJSONCommon{
Fields: []abstract.ColSchema{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import (

var (
kafkaTopic = "topic1"
source = *kafkasink.MustSourceRecipe()

chDatabase = "public"
target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
targetAsSource = *chrecipe.MustSource(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
chDatabase = "public"

timestampToUse = time.Date(2024, 03, 19, 0, 0, 0, 0, time.Local)
)
Expand All @@ -51,6 +48,10 @@ func fixTimestampMiddleware(t *testing.T, items []abstract.ChangeItem) abstract.

func TestReplication(t *testing.T) {
// prepare source
var (
source = *kafkasink.MustSourceRecipe()
target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase))
)

target.Cleanup = server.DisabledCleanup
target.InsertParams = model.InsertParams{MaterializedViewsIgnoreErrors: true}
Expand Down
19 changes: 7 additions & 12 deletions transfer_manager/go/tests/e2e/pg2ch/alters/alters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package alters

import (
"context"
"os"
"testing"
"time"

Expand All @@ -18,19 +17,15 @@ import (
"github.com/stretchr/testify/require"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"), pgrecipe.WithPrefix(""))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)

func init() {
_ = os.Setenv("YC", "1") // to not go to vanga
func TestAlter(t *testing.T) {
var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"), pgrecipe.WithPrefix(""))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)
helpers.InitSrcDst(helpers.TransferID, &Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable
}

func TestAlter(t *testing.T) {
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ import (
"github.com/stretchr/testify/require"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"), pgrecipe.WithPrefix(""))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)

func init() {
func TestAlter(t *testing.T) {
var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"), pgrecipe.WithPrefix(""))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)
helpers.InitSrcDst(helpers.TransferID, &Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable
}

func TestAlter(t *testing.T) {
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
Expand Down
34 changes: 12 additions & 22 deletions transfer_manager/go/tests/e2e/pg2ch/chmapper/check_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
server "github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/model"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/model"
chrecipe "github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/recipe"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/postgres"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/postgres/pgrecipe"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/transformer"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/transformer/registry/chmapper"
Expand All @@ -18,18 +17,15 @@ import (
"go.ytsaurus.tech/yt/go/schema"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotOnly
Source = pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)

func init() {
helpers.InitSrcDst(helpers.TransferID, Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable
}
func TestSnapshot(t *testing.T) {
var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotOnly
source = pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"))
target = chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)
helpers.InitSrcDst(helpers.TransferID, source, target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable

func testSnapshot(t *testing.T, source *postgres.PgSource, target model.ChDestination) {
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: source.Port},
Expand All @@ -38,8 +34,8 @@ func testSnapshot(t *testing.T, source *postgres.PgSource, target model.ChDestin
))
}()

Target.ForceJSONMode = false
transfer := helpers.MakeTransfer(helpers.TransferID, source, &target, TransferType)
target.ForceJSONMode = false
transfer := helpers.MakeTransfer(helpers.TransferID, source, target, TransferType)
transfer.TypeSystemVersion = 8
transfer.Transformation = &server.Transformation{Transformers: &transformer.Transformers{Transformers: []transformer.Transformer{
{
Expand Down Expand Up @@ -97,15 +93,9 @@ func testSnapshot(t *testing.T, source *postgres.PgSource, target model.ChDestin
"__test",
target.Database,
"remapped_table",
helpers.GetSampleableStorageByModel(t, Source),
helpers.GetSampleableStorageByModel(t, Target),
helpers.GetSampleableStorageByModel(t, source),
helpers.GetSampleableStorageByModel(t, target),
10*time.Second,
),
)
}

func TestSnapshot(t *testing.T) {
target := Target

testSnapshot(t, Source, target)
}
35 changes: 11 additions & 24 deletions transfer_manager/go/tests/e2e/pg2ch/date_overflow/check_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,40 @@ package snapshot

import (
"context"
"os"
"testing"

"github.com/doublecloud/transfer/library/go/test/yatest"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract"
client2 "github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/coordinator"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/model"
chrecipe "github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/recipe"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/postgres"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/postgres/pgrecipe"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/worker/tasks"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers"
"github.com/stretchr/testify/require"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
Source = pgrecipe.RecipeSource(pgrecipe.WithFiles(yatest.SourcePath("transfer_manager/go/tests/e2e/pg2ch/date_overflow/dump/pg/dump.sql")))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)
func TestSnapshot(t *testing.T) {
var (
TransferType = abstract.TransferTypeSnapshotAndIncrement
databaseName = "public"
Source = pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump/pg"))
Target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(databaseName))
)

func init() {
_ = os.Setenv("YC", "1") // to not go to vanga
helpers.InitSrcDst(helpers.TransferID, Source, &Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable
}

func testSnapshot(t *testing.T, source *postgres.PgSource, target model.ChDestination) {
defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: source.Port},
helpers.LabeledPort{Label: "CH target Native", Port: target.NativePort},
helpers.LabeledPort{Label: "CH target HTTP", Port: target.HTTPPort},
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
helpers.LabeledPort{Label: "CH target Native", Port: Target.NativePort},
helpers.LabeledPort{Label: "CH target HTTP", Port: Target.HTTPPort},
))
}()

transfer := helpers.MakeTransfer(helpers.TransferID, source, &target, TransferType)
transfer := helpers.MakeTransfer(helpers.TransferID, Source, &Target, TransferType)
tables, err := tasks.ObtainAllSrcTables(transfer, helpers.EmptyRegistry())
require.NoError(t, err)
snapshotLoader := tasks.NewSnapshotLoader(client2.NewFakeClient(), "test-operation", transfer, helpers.EmptyRegistry())
err = snapshotLoader.UploadTables(context.Background(), tables.ConvertToTableDescriptions(), true)
require.Error(t, err)
require.True(t, abstract.IsFatal(err))
}

func TestSnapshot(t *testing.T) {
target := Target

testSnapshot(t, Source, target)
}
Loading

0 comments on commit d3ece23

Please sign in to comment.