From c03ba46862ad78c00d9d7ddf7f1a70138fad82cd Mon Sep 17 00:00:00 2001 From: denisevdkmv Date: Thu, 27 Feb 2025 12:32:51 +0300 Subject: [PATCH] Add datetime parser validation and test for GenericParser commit_hash:5f78134ef29c7448b99fe272d7cc46e5e6c4ffcf --- pkg/parsers/generic/generic_parser.go | 22 +++++++++++++-- pkg/parsers/tests/generic_parser_test.go | 36 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/pkg/parsers/generic/generic_parser.go b/pkg/parsers/generic/generic_parser.go index 45dca8d4..6dd5ec92 100644 --- a/pkg/parsers/generic/generic_parser.go +++ b/pkg/parsers/generic/generic_parser.go @@ -214,7 +214,6 @@ type lfResult struct { type lfResultBatch []lfResult -var loc = time.Now().Location() var ( UnparsedCols = cols(UnparsedSchema.Columns()) ) @@ -839,7 +838,7 @@ func (p *GenericParser) extractTimeValue(col interface{}, defaultTime time.Time) } } - if t, err := dateparse.ParseIn(v, loc); err == nil { + if t, err := dateparse.ParseLocal(v); err == nil { return &t, true, nil } else { if t, err := time.Parse("2006.01.02 15:04:05.999999", v); err == nil { @@ -1140,6 +1139,24 @@ func (p *GenericParser) ResultSchema() *abstract.TableSchema { return p.schema } +func checkParserCorrectness(opts AuxParserOpts, sch []abstract.ColSchema, lgr log.Logger) { + if opts.InferTimeZone || (opts.TimeField != nil && opts.TimeField.Format != "") { + return + } + + hasDatetime := false + for _, col := range sch { + if col.DataType == schema.TypeDatetime.String() { + hasDatetime = true + break + } + } + + if hasDatetime { + lgr.Warn("the datetime fields will be parsed in local timezone, because infer timezone and time field are not configured") + } +} + func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log.Logger, registry *stats.SourceStats) *GenericParser { var opts *AuxParserOpts var lfCfg *LogfellerParserConfig @@ -1184,6 +1201,7 @@ func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log. } } logger.Info("Final schema", log.Any("columns", colNames), log.Any("s", finalSchema)) + checkParserCorrectness(*opts, finalSchema, logger) return &GenericParser{ rawFields: fields, known: known, diff --git a/pkg/parsers/tests/generic_parser_test.go b/pkg/parsers/tests/generic_parser_test.go index d7ccb9c2..68a8d100 100644 --- a/pkg/parsers/tests/generic_parser_test.go +++ b/pkg/parsers/tests/generic_parser_test.go @@ -16,6 +16,7 @@ import ( "github.com/doublecloud/transfer/pkg/parsers" "github.com/doublecloud/transfer/pkg/parsers/generic" _ "github.com/doublecloud/transfer/pkg/parsers/registry" + jsonparser "github.com/doublecloud/transfer/pkg/parsers/registry/json" "github.com/doublecloud/transfer/pkg/parsers/registry/tskv" "github.com/doublecloud/transfer/pkg/parsers/tests/samples" "github.com/doublecloud/transfer/pkg/providers/kafka" @@ -391,6 +392,41 @@ func TestGenericParser_Parse_vs_Do(t *testing.T) { } } +func TestGenericParserDatetimeZone(t *testing.T) { + parserConfigStruct := &jsonparser.ParserConfigJSONLb{ + Fields: []abstract.ColSchema{ + {ColumnName: "timestamp", DataType: schema.TypeDatetime.String()}, + }, + TimeField: &abstract.TimestampCol{ + Col: "timestamp", + Format: "2006-01-02T15:04:05.000Z", + }, + } + configMap, err := parsers.ParserConfigStructToMap(parserConfigStruct) + require.NoError(t, err) + parser, err := parsers.NewParserFromMap(configMap, false, logger.Log, stats.NewSourceStats(metrics.NewRegistry())) + require.NoError(t, err) + + res := parser.Do(parsers.Message{ + Value: []byte("{\"timestamp\":\"2025-02-19T15:43:30.089Z\"}"), + }, abstract.Partition{Partition: 0, Topic: "test/topic"}) + + require.NotEmpty(t, res) + require.NotEmpty(t, res[0].ColumnValues) + + tsIdx := -1 + for idx, col := range res[0].ColumnNames { + if col == "timestamp" { + tsIdx = idx + } + } + require.True(t, tsIdx >= 0) + + ts, ok := res[0].ColumnValues[tsIdx].(time.Time) + require.True(t, ok) + require.Equal(t, time.UTC.String(), ts.Location().String()) +} + func BenchmarkGenericParser(b *testing.B) { for _, testCase := range []string{ samples.MetrikaSample, // tskv