Skip to content

Commit

Permalink
Add datetime parser validation and test for GenericParser
Browse files Browse the repository at this point in the history
commit_hash:5f78134ef29c7448b99fe272d7cc46e5e6c4ffcf
  • Loading branch information
DenisEvd committed Feb 27, 2025
1 parent 79914a2 commit c03ba46
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
22 changes: 20 additions & 2 deletions pkg/parsers/generic/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ type lfResult struct {

type lfResultBatch []lfResult

var loc = time.Now().Location()
var (
UnparsedCols = cols(UnparsedSchema.Columns())
)
Expand Down Expand Up @@ -839,7 +838,7 @@ func (p *GenericParser) extractTimeValue(col interface{}, defaultTime time.Time)
}
}

if t, err := dateparse.ParseIn(v, loc); err == nil {
if t, err := dateparse.ParseLocal(v); err == nil {
return &t, true, nil
} else {
if t, err := time.Parse("2006.01.02 15:04:05.999999", v); err == nil {
Expand Down Expand Up @@ -1140,6 +1139,24 @@ func (p *GenericParser) ResultSchema() *abstract.TableSchema {
return p.schema
}

func checkParserCorrectness(opts AuxParserOpts, sch []abstract.ColSchema, lgr log.Logger) {
if opts.InferTimeZone || (opts.TimeField != nil && opts.TimeField.Format != "") {
return
}

hasDatetime := false
for _, col := range sch {
if col.DataType == schema.TypeDatetime.String() {
hasDatetime = true
break
}
}

if hasDatetime {
lgr.Warn("the datetime fields will be parsed in local timezone, because infer timezone and time field are not configured")
}
}

func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log.Logger, registry *stats.SourceStats) *GenericParser {
var opts *AuxParserOpts
var lfCfg *LogfellerParserConfig
Expand Down Expand Up @@ -1184,6 +1201,7 @@ func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log.
}
}
logger.Info("Final schema", log.Any("columns", colNames), log.Any("s", finalSchema))
checkParserCorrectness(*opts, finalSchema, logger)
return &GenericParser{
rawFields: fields,
known: known,
Expand Down
36 changes: 36 additions & 0 deletions pkg/parsers/tests/generic_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/doublecloud/transfer/pkg/parsers"
"github.com/doublecloud/transfer/pkg/parsers/generic"
_ "github.com/doublecloud/transfer/pkg/parsers/registry"
jsonparser "github.com/doublecloud/transfer/pkg/parsers/registry/json"
"github.com/doublecloud/transfer/pkg/parsers/registry/tskv"
"github.com/doublecloud/transfer/pkg/parsers/tests/samples"
"github.com/doublecloud/transfer/pkg/providers/kafka"
Expand Down Expand Up @@ -391,6 +392,41 @@ func TestGenericParser_Parse_vs_Do(t *testing.T) {
}
}

func TestGenericParserDatetimeZone(t *testing.T) {
parserConfigStruct := &jsonparser.ParserConfigJSONLb{
Fields: []abstract.ColSchema{
{ColumnName: "timestamp", DataType: schema.TypeDatetime.String()},
},
TimeField: &abstract.TimestampCol{
Col: "timestamp",
Format: "2006-01-02T15:04:05.000Z",
},
}
configMap, err := parsers.ParserConfigStructToMap(parserConfigStruct)
require.NoError(t, err)
parser, err := parsers.NewParserFromMap(configMap, false, logger.Log, stats.NewSourceStats(metrics.NewRegistry()))
require.NoError(t, err)

res := parser.Do(parsers.Message{
Value: []byte("{\"timestamp\":\"2025-02-19T15:43:30.089Z\"}"),
}, abstract.Partition{Partition: 0, Topic: "test/topic"})

require.NotEmpty(t, res)
require.NotEmpty(t, res[0].ColumnValues)

tsIdx := -1
for idx, col := range res[0].ColumnNames {
if col == "timestamp" {
tsIdx = idx
}
}
require.True(t, tsIdx >= 0)

ts, ok := res[0].ColumnValues[tsIdx].(time.Time)
require.True(t, ok)
require.Equal(t, time.UTC.String(), ts.Location().String())
}

func BenchmarkGenericParser(b *testing.B) {
for _, testCase := range []string{
samples.MetrikaSample, // tskv
Expand Down

0 comments on commit c03ba46

Please sign in to comment.