Skip to content

Commit

Permalink
TRANSFER-811: Sync rest of e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
laskoviymishka committed Aug 22, 2024
1 parent f5fdb6a commit b6a45cb
Show file tree
Hide file tree
Showing 158 changed files with 28,110 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7
github.com/ydb-platform/ydb-go-sdk/v3 v3.76.1
github.com/ydb-platform/ydb-go-sdk/v3 v3.77.0
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1
go.mongodb.org/mongo-driver v1.11.7
go.uber.org/atomic v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2516,8 +2516,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20221215182650-986f9d10542f/go.mo
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 h1:nL8XwD6fSst7xFUirkaWJmE7kM0CdWRYgu6+YQer1d4=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk/v3 v3.44.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGgzPJusoI3zWMNb8I+GnM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.76.1 h1:/5F1WtNrTIBLzzi8DPTlmv+ZIfzrnS/VCcmGigKEDDk=
github.com/ydb-platform/ydb-go-sdk/v3 v3.76.1/go.mod h1:QMmT1fMKZnpid73USXLJawh+32bKySSE2WtEnBUIKd8=
github.com/ydb-platform/ydb-go-sdk/v3 v3.77.0 h1:5sBuCxyS0hfILvcT9MZUpsS844/8rIz/NyRo15sOvyo=
github.com/ydb-platform/ydb-go-sdk/v3 v3.77.0/go.mod h1:IHwuXyolaAmGK2Dp7+dlhsnXphG1pwCoaP/OITT3+tU=
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 h1:9E5q8Nsy2RiJMZDNVy0A3KUrIMBPakJ2VgloeWbcI84=
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1/go.mod h1:NW4LXW2WhY2tLAwCBHBuHAwRUVF5lsscaSPjdAFKldc=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
Expand Down
4 changes: 0 additions & 4 deletions kikimr/public/sdk/python/persqueue/README.md

This file was deleted.

20 changes: 0 additions & 20 deletions kikimr/public/sdk/python/ydb_v3_new_behavior/README.md

This file was deleted.

1 change: 1 addition & 0 deletions transfer_manager/go/tests/e2e/lf2ch/jsonlogs/dump/dst.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE mtmobproxy;
97 changes: 97 additions & 0 deletions transfer_manager/go/tests/e2e/lf2ch/jsonlogs/json_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package jsonlogs

import (
"strings"
"testing"
"time"

"github.com/doublecloud/transfer/library/go/core/metrics/solomon"
"github.com/doublecloud/transfer/transfer_manager/go/internal/logger"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/coordinator"
server "github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/model"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers"
jsonparser "github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers/registry/json"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/clickhouse/model"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/logbroker"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/runtime/local"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers/lbenv"
"github.com/stretchr/testify/require"
ytschema "go.ytsaurus.tech/yt/go/schema"
)

func TestPushClientLogs(t *testing.T) {
lbEnv, stop := lbenv.NewLbEnv(t)

sourcePort := lbEnv.ConsumerOptions().Port
loggerPort := lbEnv.ProducerOptions().Port
targetNativePort := helpers.GetIntFromEnv("DB0_RECIPE_CLICKHOUSE_NATIVE_PORT")

defer func() {
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "LF source", Port: sourcePort},
helpers.LabeledPort{Label: "Logger LB writer", Port: loggerPort},
helpers.LabeledPort{Label: "CH target", Port: targetNativePort},
))
}()

defer stop()
loggerLbWriter, err := logger.NewLogbrokerLoggerFromConfig(&logger.LogbrokerConfig{
Instance: lbEnv.ProducerOptions().Endpoint,
Port: loggerPort,
Topic: lbEnv.DefaultTopic,
SourceID: "test",
Credentials: lbEnv.ProducerOptions().Credentials,
}, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.NoError(t, err)

parserConfigStruct := &jsonparser.ParserConfigJSONLb{
Fields: []abstract.ColSchema{
{ColumnName: "msg", DataType: ytschema.TypeString.String()},
},
AddRest: false,
}
parserConfigMap, err := parsers.ParserConfigStructToMap(parserConfigStruct)
require.NoError(t, err)

src := &logbroker.LfSource{
Instance: logbroker.LogbrokerInstance(lbEnv.Endpoint),
Topics: []string{lbEnv.DefaultTopic},
Credentials: lbEnv.ConsumerOptions().Credentials,
Consumer: lbEnv.DefaultConsumer,
Port: sourcePort,
ParserConfig: parserConfigMap,
}
dst := &model.ChDestination{
ShardsList: []model.ClickHouseShard{
{
Name: "_",
Hosts: []string{
"localhost",
},
},
},
User: "default",
Password: "",
Database: "mtmobproxy",
HTTPPort: helpers.GetIntFromEnv("DB0_RECIPE_CLICKHOUSE_HTTP_PORT"),
NativePort: targetNativePort,
ProtocolUnspecified: true,
}
dst.WithDefaults()
transfer := &server.Transfer{
ID: "e2e_test",
Src: src,
Dst: dst,
}
// SEND TO LOGBROKER
go func() {
for i := 0; i < 50; i++ {
loggerLbWriter.Infof("line:%v", i)
}
}()
w := local.NewLocalWorker(coordinator.NewFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), logger.Log)
w.Start()
require.NoError(t, helpers.WaitDestinationEqualRowsCount(dst.Database, strings.ReplaceAll(lbEnv.DefaultTopic, "-", "_"), helpers.GetSampleableStorageByModel(t, dst), 60*time.Second, 50))
}
156 changes: 156 additions & 0 deletions transfer_manager/go/tests/e2e/lf2kafkamock/mirror/mirror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package main

import (
"encoding/json"
"testing"
"time"

"github.com/doublecloud/transfer/library/go/core/metrics/solomon"
"github.com/doublecloud/transfer/transfer_manager/go/internal/logger"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/coordinator"
server "github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/model"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers"
blankparser "github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers/registry/blank"
kafka2 "github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/kafka"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/logbroker"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/runtime/local"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers/lbenv"
"github.com/golang/mock/gomock"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/require"
)

//---------------------------------------------------------------------------------------------------------------------

var index = 0
var tt *testing.T

func callbackFunc(_ interface{}, msgs ...interface{}) error {
for _, el := range msgs {
switch v := el.(type) {
case kafka.Message:
var j map[string]string
err := json.Unmarshal(v.Value, &j)
require.NoError(tt, err)
delete(j, "ts")
delete(j, "caller")
jj, err := json.Marshal(j)
require.NoError(tt, err)

require.Equal(tt, []byte(nil), v.Key)
require.Equal(tt, `{"level":"INFO","msg":"blablabla"}`, string(jj))
default:
require.FailNow(tt, "")
}
}

index++
return nil
}

//---------------------------------------------------------------------------------------------------------------------

func TestReplication(t *testing.T) {
lbEnv, stop := lbenv.NewLbEnv(t)
defer stop()
lbSendingPort := lbEnv.ProducerOptions().Port
lbReceivingPort := lbEnv.Port

defer require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: lbSendingPort},
))

//------------------------------------------------------------------------------

tt = t

// prepare producer

loggerLbWriter, err := logger.NewLogbrokerLoggerFromConfig(&logger.LogbrokerConfig{
Instance: lbEnv.ProducerOptions().Endpoint,
Port: lbSendingPort,
Topic: lbEnv.DefaultTopic,
SourceID: "test",
Credentials: lbEnv.ProducerOptions().Credentials,
}, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.NoError(t, err)

// prepare src

parserConfigMap, err := parsers.ParserConfigStructToMap(&blankparser.ParserConfigBlankLb{})
require.NoError(t, err)

src := logbroker.LfSource{
Instance: logbroker.LogbrokerInstance(lbEnv.Endpoint),
Topics: []string{lbEnv.DefaultTopic},
Credentials: lbEnv.ConsumerOptions().Credentials,
Consumer: lbEnv.DefaultConsumer,
Port: lbReceivingPort,
ParserConfig: parserConfigMap,
}

// prepare dst

dst := kafka2.KafkaDestination{
Connection: &kafka2.KafkaConnectionOptions{
TLS: logbroker.DefaultTLS,
Brokers: []string{"my_broker_0"},
},
Auth: &kafka2.KafkaAuth{
Enabled: true,
Mechanism: "SHA-512",
User: "user1",
Password: "qwert12345",
},
Topic: "foo_bar",
FormatSettings: server.SerializationFormat{
Name: server.SerializationFormatLbMirror,
},
ParralelWriterCount: 10,
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

producer := kafka2.NewMockwriter(ctrl)
producer.EXPECT().WriteMessages(gomock.Any(), gomock.Any()).AnyTimes().Do(callbackFunc)
producer.EXPECT().Close().AnyTimes()

kafkaMockClient := kafka2.NewMockclient(ctrl)
kafkaMockClient.EXPECT().BuildWriter([]string{"my_broker_0"}, gomock.Any(), gomock.Any(), gomock.Any()).Return(producer)
kafkaMockClient.EXPECT().CreateTopicIfNotExist([]string{"my_broker_0"}, "foo_bar", gomock.Any(), gomock.Any(), gomock.Any())

sink, err := kafka2.NewSinkImpl(
&dst,
solomon.NewRegistry(nil).WithTags(map[string]string{"ts": time.Now().String()}),
logger.Log,
kafkaMockClient,
false,
)
require.NoError(t, err)

target := server.MockDestination{SinkerFactory: func() abstract.Sinker { return sink }}

// activate transfer

helpers.InitSrcDst(helpers.TransferID, &src, &dst, abstract.TransferTypeIncrementOnly)
transfer := helpers.MakeTransfer(helpers.TransferID, &src, &target, abstract.TransferTypeIncrementOnly)

localWorker := local.NewLocalWorker(coordinator.NewFakeClient(), transfer, solomon.NewRegistry(solomon.NewRegistryOpts()), logger.Log)
localWorker.Start()
defer localWorker.Stop()

//-----------------------------------------------------------------------------------------------------------------
// send to logbroker

loggerLbWriter.Infof("blablabla")

for {
if index == 1 {
break
}
time.Sleep(time.Second)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package mirror

import (
"testing"

"github.com/doublecloud/transfer/library/go/core/metrics/solomon"
"github.com/doublecloud/transfer/transfer_manager/go/internal/logger"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract"
server "github.com/doublecloud/transfer/transfer_manager/go/pkg/abstract/model"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/parsers/registry/json"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/providers/logbroker"
"github.com/doublecloud/transfer/transfer_manager/go/pkg/transformer/registry/filter"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers"
"github.com/doublecloud/transfer/transfer_manager/go/tests/helpers/lbenv"
"github.com/stretchr/testify/require"
"go.ytsaurus.tech/library/go/core/log"
ytschema "go.ytsaurus.tech/yt/go/schema"
)

func TestPushClientLogs(t *testing.T) {
lbEnv, stop := lbenv.NewLbEnv(t)
defer stop()
lbSendingPort := lbEnv.ProducerOptions().Port
lbReceivingPort := lbEnv.Port

loggerLbWriter, err := logger.NewLogbrokerLoggerFromConfig(&logger.LogbrokerConfig{
Instance: lbEnv.ProducerOptions().Endpoint,
Port: lbSendingPort,
Topic: "src/topic",
SourceID: "test",
Credentials: lbEnv.ProducerOptions().Credentials,
}, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.NoError(t, err)

parserConfigMap, err := parsers.ParserConfigStructToMap(&json.ParserConfigJSONLb{
Fields: []abstract.ColSchema{
{ColumnName: "level", DataType: ytschema.TypeString.String(), PrimaryKey: true}, // PrimaryKey:true - is a workaround to 'CustomFilterColumnsTransformer' work
{ColumnName: "caller", DataType: ytschema.TypeString.String()},
{ColumnName: "ts", DataType: ytschema.TypeTimestamp.String()},
{ColumnName: "array", DataType: ytschema.TypeAny.String()},
},
SkipSystemKeys: true,
AddRest: true,
})
require.NoError(t, err)

src := logbroker.LfSource{
Instance: logbroker.LogbrokerInstance(lbEnv.Endpoint),
Topics: []string{"src/topic"},
Credentials: lbEnv.ConsumerOptions().Credentials,
Consumer: lbEnv.DefaultConsumer,
Port: lbReceivingPort,
ParserConfig: parserConfigMap,
}

dst := logbroker.LbDestination{
Instance: lbEnv.Endpoint,
Topic: "dst/topic",
Credentials: lbEnv.ConsumerOptions().Credentials,
Port: lbReceivingPort,
FormatSettings: server.SerializationFormat{
Name: server.SerializationFormatJSON,
},
TLS: logbroker.DisabledTLS,
}

helpers.InitSrcDst(helpers.TransferID, &src, &dst, abstract.TransferTypeIncrementOnly)
transfer := helpers.MakeTransfer(helpers.TransferID, &src, &dst, abstract.TransferTypeIncrementOnly)
tableF, err := filter.NewFilter(nil, nil)
require.NoError(t, err)
columnF, err := filter.NewFilter([]string{}, []string{"_timestamp", "_partition", "_offset", "_idx"})
require.NoError(t, err)
require.NoError(t, transfer.AddExtraTransformer(filter.NewCustomFilterColumnsTransformer(
tableF,
columnF,
logger.Log,
)))
worker := helpers.Activate(t, transfer)
defer worker.Close(t)

loggerLbWriter.Info("My_message", log.Array("array", []string{}))

dataCmp := func(in string, index int) bool {
logger.Log.Infof("received string: %s\n", in)
return lbenv.EqualLogLineContent(t, `{"_rest":{"msg":"My_message"},"array":[],"level":"INFO"}`, in)
}

lbenv.CheckResult(t, lbEnv.Env, dst.Database, dst.Topic, 1, dataCmp, lbenv.ComparatorStub, lbenv.ComparatorStub, lbenv.ComparatorStub, lbenv.ComparatorStub)
}
Loading

0 comments on commit b6a45cb

Please sign in to comment.