Skip to content

Commit

Permalink
Enable tests for parsers in github
Browse files Browse the repository at this point in the history
No description

---

Pull Request resolved: #193

Co-authored-by: tserakhau <tserakhau@double.cloud>
commit_hash:ec2cfb6a5fb041730ef4b1845a276fe458f753df
  • Loading branch information
laskoviymishka authored and robot-piglet committed Feb 3, 2025
1 parent 213dec3 commit d69e1f0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 105 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ jobs:
{ group: "pkg", name: "worker", path: "worker" },
{ group: "pkg", name: "schemaregistry", path: "schemaregistry" },
{ group: "pkg", name: "parsers-generic", path: "parsers/generic" },
{ group: "pkg", name: "parsers-tests", path: "parsers/tests" },
{ group: "pkg", name: "parsers-scanner", path: "parsers/scanner" }
]
steps:
Expand Down
5 changes: 4 additions & 1 deletion pkg/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import (
type parserFactory func(interface{}, bool, log.Logger, *stats.SourceStats) (Parser, error)

var parsersRegistry = make(map[string]parserFactory)

var parserConfigRegistry = make(map[string]AbstractParserConfig)

var UnknownParserErr = xerrors.NewSentinel("unregistered parser")

func Register(foo parserFactory, configs []AbstractParserConfig) {
parserName := util.ToSnakeCase(strings.TrimPrefix(getFuncName(foo), "NewParser"))
parsersRegistry[parserName] = foo
Expand Down Expand Up @@ -84,7 +87,7 @@ func ParserConfigMapToStruct(in map[string]interface{}) (AbstractParserConfig, e

registeredStruct, ok := parserConfigRegistry[parserConfigName]
if !ok {
return nil, xerrors.Errorf("unregistered parser_config name: %s, known_parsers: %v", parserConfigName, KnownParsers())
return nil, xerrors.Errorf("parser_config name: %s, known_parsers: %v: %w", parserConfigName, KnownParsers(), UnknownParserErr)
}
pointerToRegisteredStructCopy := reflect.New(reflect.ValueOf(registeredStruct).Type().Elem()).Interface()

Expand Down
135 changes: 31 additions & 104 deletions pkg/parsers/tests/generic_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/internal/metrics"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/base"
"github.com/doublecloud/transfer/pkg/base/events"
"github.com/doublecloud/transfer/pkg/format"
"github.com/doublecloud/transfer/pkg/parsers"
"github.com/doublecloud/transfer/pkg/parsers/generic"
Expand Down Expand Up @@ -195,6 +194,11 @@ func TestGenericParser_DoSensitive(t *testing.T) {
parser, err := parsers.NewParserFromMap(ParserConfigMap(samples.SensitiveSample), false, logger.Log, stats.NewSourceStats(metrics.NewRegistry().WithTags(map[string]string{
"id": "TestGenericParser_DoSensitive",
})))

if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
resArr := parser.Do(samples.Data[samples.SensitiveSample], abstract.Partition{})
require.Equal(t, 1, len(resArr))
Expand All @@ -215,6 +219,11 @@ func TestGenericParser_DoSensitiveDisabled(t *testing.T) {
parser, err := parsers.NewParserFromMap(ParserConfigMap(samples.SensitiveDisabledSample), false, logger.Log, stats.NewSourceStats(metrics.NewRegistry().WithTags(map[string]string{
"id": "TestGenericParser_DoSensitiveDisabled",
})))

if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
resArr := parser.Do(samples.Data[samples.SensitiveDisabledSample], abstract.Partition{})
require.Equal(t, 1, len(resArr))
Expand All @@ -235,6 +244,11 @@ func TestGenericParser_DoKikimr(t *testing.T) {
parser, err := parsers.NewParserFromMap(ParserConfigMap(samples.KikimrSample), false, logger.Log, stats.NewSourceStats(metrics.NewRegistry().WithTags(map[string]string{
"id": "TestGenericParser_DoKikimr",
})))

if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
res := parser.Do(samples.Data[samples.KikimrSample], abstract.Partition{})
require.Equal(t, 213, len(res))
Expand All @@ -244,6 +258,11 @@ func TestGenericParser_DoKikimrNew(t *testing.T) {
parser, err := parsers.NewParserFromMap(ParserConfigMap(samples.KikimrNew), false, logger.Log, stats.NewSourceStats(metrics.NewRegistry().WithTags(map[string]string{
"id": "TestGenericParser_DoKikimrNew",
})))

if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
res := parser.Do(samples.Data[samples.KikimrNew], abstract.Partition{})
abstract.Dump(res)
Expand Down Expand Up @@ -274,37 +293,6 @@ func TestParser_DoJson(t *testing.T) {
require.True(t, ok, "Should have column 'version'")
}
})
t.Run("events", func(t *testing.T) {
genericParserImpl := GetGenericParserImpl(parser)
res := genericParserImpl.Parse(samples.Data[samples.JSONSample], *new(abstract.Partition))
var evs []base.Event
for res.Next() {
ev, err := res.Event()
require.NoError(t, err)
evs = append(evs, ev)
}
require.Equal(t, 36, len(evs))
offset := uint64(0)
var changes []abstract.ChangeItem
for _, row := range evs {
insEv, ok := row.(events.InsertEvent)
require.True(t, ok)
change, err := insEv.ToOldChangeItem()
require.NoError(t, err)
found := false
changes = append(changes, *change)
for ci, cname := range change.ColumnNames {
if cname == "version" {
require.Equal(t, 89488198116272410+offset, change.ColumnValues[ci])
found = true
break
}
}
offset++
require.True(t, found, "Should have column 'version'")
}
abstract.Dump(changes)
})
}

func TestMdbSample(t *testing.T) {
Expand Down Expand Up @@ -339,45 +327,14 @@ func TestLogfellerTimestampParse(t *testing.T) {
),
),
)

if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
require.NotNil(t, parser)

//genericParserImpl := GetGenericParserImpl(parser)
//
//{
// // abstract2 parser
// batch := genericParserImpl.Parse(samples.Data[parserName], *new(abstract.Partition))
// var changes []abstract.ChangeItem
// for batch.Next() {
// ev, err := batch.Event()
// require.NoError(t, err)
// require.NotNil(t, ev)
// iev, ok := ev.(events.InsertEvent)
// require.True(t, ok)
// change, err := iev.ToOldChangeItem()
// require.NoError(t, err)
// changes = append(changes, *change)
// fmt.Println(iev.Table())
// }
// fields := changes[0].AsMap()
// // Timestamps
// require.Equal(t, schema.Timestamp(1234567890123456).Time().Local(), fields["default"])
// require.Equal(t, schema.Timestamp(1234567890123456).Time().Local(), fields["microseconds"])
// require.Equal(t, schema.Timestamp(1234567890123000).Time().Local(), fields["milliseconds"])
// require.Equal(t, schema.Timestamp(1234567890000000).Time().Local(), fields["seconds"])
//
// // Datetime (seconds)
// require.Equal(t, schema.Datetime(1234567890).Time().Local(), fields["datetime_default"])
// require.Equal(t, schema.Datetime(1234567890).Time().Local(), fields["datetime_microseconds"])
// require.Equal(t, schema.Datetime(1234567890).Time().Local(), fields["datetime_milliseconds"])
// require.Equal(t, schema.Datetime(1234567890).Time().Local(), fields["datetime_seconds"])
//
// // Intervals
// require.Equal(t, time.Duration(12345678000), fields["interval_default"])
// require.Equal(t, time.Duration(12345678000), fields["interval_microseconds"])
// require.Equal(t, time.Duration(12345000000), fields["interval_milliseconds"])
// require.Equal(t, time.Duration(12000000000), fields["interval_seconds"])
//}
{
// abstract 1 parser
res := parser.Do(samples.Data[parserName], *new(abstract.Partition))
Expand Down Expand Up @@ -415,30 +372,21 @@ func TestGenericParser_Parse_vs_Do(t *testing.T) {
parser, err := parsers.NewParserFromMap(configMap, false, logger.Log, stats.NewSourceStats(metrics.NewRegistry().WithTags(map[string]string{
"id": "TestGenericParser_Parse_vs_Do",
})))
if xerrors.Is(err, parsers.UnknownParserErr) {
// some parsers tests might be disabled in certain setups
t.Skip()
}
require.NoError(t, err)
if parser == nil {
return
}
require.NoError(t, err)
// genericParserImpl := GetGenericParserImpl(parser)
res := parser.Do(samples.Data[k], *new(abstract.Partition))
// batch := genericParserImpl.Parse(samples.Data[k], *new(abstract.Partition))
var changes []abstract.ChangeItem
//for batch.Next() {
// ev, err := batch.Event()
// require.NoError(t, err)
// require.NotNil(t, ev)
// iev, ok := ev.(events.InsertEvent)
// require.True(t, ok)
// change, err := iev.ToOldChangeItem()
// require.NoError(t, err)
// changes = append(changes, *change)
//}
t.Logf("old parser: %v len", len(res))
abstract.Dump(res)
t.Logf("new parser: %v len", len(changes))
abstract.Dump(changes)
// require.Equal(t, len(res), len(changes))
})
}
}
Expand All @@ -452,27 +400,6 @@ func BenchmarkGenericParser(b *testing.B) {
} {
parser, err := parsers.NewParserFromMap(ParserConfigMap(testCase), false, logger.LoggerWithLevel(zapcore.WarnLevel), stats.NewSourceStats(metrics.NewRegistry()))
require.NoError(b, err)
genericParserImpl := GetGenericParserImpl(parser)
for _, size := range []int{1, 5, 10} {
d := samples.Data[testCase]
for i := 1; i < size; i++ {
d.Value = append(d.Value, []byte("\n")...)
d.Value = append(d.Value, d.Value...)
}
b.Run(fmt.Sprintf("Abstract2 %v %v", testCase, size), func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
r := genericParserImpl.Parse(d, abstract.Partition{})
cntr := 0
for r.Next() {
cntr++
}
require.True(b, cntr > 0)
}
b.SetBytes(int64(len(d.Value) * b.N))
b.ReportAllocs()
})
}
for _, size := range []int{1, 5, 10} {
d := samples.Data[testCase]
for i := 1; i < size; i++ {
Expand All @@ -485,7 +412,7 @@ func BenchmarkGenericParser(b *testing.B) {
r := parser.Do(d, abstract.Partition{})
require.True(b, len(r) > 0)
}
b.SetBytes(int64(len(d.Value) * b.N))
b.SetBytes(int64(len(d.Value)))
b.ReportAllocs()
})
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/parsers/tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
"github.com/doublecloud/transfer/pkg/parsers/registry/protobuf/protoscanner"
"github.com/doublecloud/transfer/pkg/stats"
confluentsrmock "github.com/doublecloud/transfer/tests/helpers/confluent_schema_registry_mock"
"github.com/doublecloud/transfer/tests/tcrecipes"
"github.com/stretchr/testify/require"
)

// TestCanonizeParserConfigsList
// We need canonize parserConfig names, bcs for now they work via reflect dispatching,
// but it will break production, if someone renames parserConfig struct.
func TestCanonizeParserConfigsList(t *testing.T) {
if tcrecipes.Enabled() {
t.Skip()
}
parserConfigs := parsers.KnownParsersConfigs()
parsersConfigsMap := make(map[string]bool)
for _, parserConfig := range parserConfigs {
Expand Down

0 comments on commit d69e1f0

Please sign in to comment.