Skip to content

Commit

Permalink
TRANSFER-783: Use generic parser for JSON-s
Browse files Browse the repository at this point in the history
  • Loading branch information
laskoviymishka committed Aug 15, 2024
1 parent 37c95fe commit 975ef3a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 104 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ replace github.com/insomniacslk/dhcp => github.com/insomniacslk/dhcp v0.0.0-2021

replace cloud.google.com/go/pubsub => cloud.google.com/go/pubsub v1.30.0

replace google.golang.org/grpc => google.golang.org/grpc v1.56.3
replace google.golang.org/grpc => google.golang.org/grpc v1.61.2

replace github.com/grpc-ecosystem/grpc-gateway/v2 => github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1

Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARy
cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvjxega5vAdo=
cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOVXvgU0yacs=
cloud.google.com/go/compute v1.19.0/go.mod h1:rikpw2y+UMidAe9tISo04EHNOIf42RLYF/q8Bs93scU=
cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IKd5/kvShxE=
cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI=
cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
Expand Down Expand Up @@ -1421,6 +1420,7 @@ github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230428030218-4003588d1b74/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20231109132714-523115ebc101/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/apd/v2 v2.0.2 h1:weh8u7Cneje73dDh+2tEVLUvyBc89iwepWCD8b8034E=
Expand Down Expand Up @@ -1758,7 +1758,7 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -2707,6 +2707,7 @@ golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
Expand Down Expand Up @@ -2860,6 +2861,7 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
Expand Down Expand Up @@ -2897,6 +2899,7 @@ golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4=
golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0=
golang.org/x/oauth2 v0.14.0/go.mod h1:lAtNWgaWfL4cm7j2OV8TxGi9Qb7ECORx8DktCY74OwM=
golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
Expand All @@ -2919,6 +2922,7 @@ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -3057,6 +3061,7 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -3079,6 +3084,7 @@ golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
Expand Down Expand Up @@ -3501,8 +3507,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc v1.61.2 h1:TzJay21lXCf7BiNFKl7mSskt5DlkKAumAYTs52SpJeo=
google.golang.org/grpc v1.61.2/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
7 changes: 6 additions & 1 deletion transfer_manager/go/pkg/providers/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func appendSystemColsTableSchema(cols []abstract.ColSchema) *abstract.TableSchem
return abstract.NewTableSchema(cols)
}

func New(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (Reader, error) {
func New(
src *s3.S3Source,
lgr log.Logger,
sess *session.Session,
metrics *stats.SourceStats,
) (Reader, error) {
switch src.InputFormat {
case server.ParsingFormatPARQUET:
reader, err := NewParquet(src, lgr, sess, metrics)
Expand Down
133 changes: 36 additions & 97 deletions transfer_manager/go/pkg/providers/s3/reader/reader_json_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"fmt"
"io"
"math"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
aws_s3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/doublecloud/tross/kikimr/public/sdk/go/persqueue"
"github.com/doublecloud/tross/library/go/core/xerrors"
"github.com/doublecloud/tross/library/go/slices"
"github.com/doublecloud/tross/transfer_manager/go/pkg/abstract"
"github.com/doublecloud/tross/transfer_manager/go/pkg/abstract/changeitem/strictify"
"github.com/doublecloud/tross/transfer_manager/go/pkg/parsers"
jsonparser "github.com/doublecloud/tross/transfer_manager/go/pkg/parsers/registry/json"
"github.com/doublecloud/tross/transfer_manager/go/pkg/parsers/scanner"
"github.com/doublecloud/tross/transfer_manager/go/pkg/providers/s3"
chunk_pusher "github.com/doublecloud/tross/transfer_manager/go/pkg/providers/s3/pusher"
Expand Down Expand Up @@ -54,6 +55,8 @@ type JSONLineReader struct {
pathPattern string
metrics *stats.SourceStats
unparsedPolicy s3.UnparsedPolicy

parser parsers.Parser
}

func (r *JSONLineReader) openReader(ctx context.Context, filePath string) (*S3Reader, error) {
Expand Down Expand Up @@ -151,18 +154,27 @@ func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk
var buff []abstract.ChangeItem
var currentSize int64
for _, line := range lines {
ci, err := r.doParse(line, filePath, s3Reader.LastModified(), lineCounter)
if err != nil {
unparsedCI, err := handleParseError(r.table, r.unparsedPolicy, filePath, int(lineCounter), err)
if err != nil {
return err
}
buff = append(buff, *unparsedCI)
continue
cis := r.parser.Do(persqueue.ReadMessage{
Offset: 0,
SeqNo: 0,
SourceID: nil,
CreateTime: s3Reader.LastModified(),
WriteTime: s3Reader.LastModified(),
IP: "",
Data: []byte(line),
Codec: 0,
ExtraFields: nil,
}, abstract.NewPartition(filePath, 0))
for i := range cis {
cis[i].Table = r.table.Name
cis[i].Schema = r.table.Namespace
cis[i].ColumnValues[0] = filePath
cis[i].ColumnValues[1] = lineCounter
}
currentSize += int64(ci.Size.Values)
buff = append(buff, cis...)
currentSize += int64(len(line))
lineCounter++
buff = append(buff, *ci)

if len(buff) > r.batchSize {
if err := pusher.Push(ctx, chunk_pusher.Chunk{
Items: buff,
Expand Down Expand Up @@ -196,96 +208,11 @@ func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk
return nil
}

func (r *JSONLineReader) doParse(line string, filePath string, lastModified time.Time, lineCounter uint64) (*abstract.ChangeItem, error) {
row := make(map[string]any)
if err := json.Unmarshal([]byte(line), &row); err != nil {
return nil, xerrors.Errorf("failed to unmarshal json line: %w", err)
}

ci, err := r.constructCI(row, filePath, lastModified, lineCounter)
if err != nil {
return nil, xerrors.Errorf("unable to construct change item: %w", err)
}

if err := strictify.Strictify(ci, r.fastCols); err != nil {
return nil, xerrors.Errorf("failed to convert value to the expected data type: %w", err)
}
return ci, nil
}

func (r *JSONLineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem {
// the most complex and useful method in the world
return chunk.Items
}

func (r *JSONLineReader) constructCI(row map[string]any, fname string, lastModified time.Time, idx uint64) (*abstract.ChangeItem, error) {
vals := make([]interface{}, len(r.tableSchema.Columns()))
rest := make(map[string]any)
for key, val := range row {
known := false
for _, col := range r.tableSchema.Columns() {
if col.ColumnName == key {
known = true
break
}
}
if !known {
if r.unexpectedFieldBehavior == s3.Infer {
rest[key] = val
} else if r.unexpectedFieldBehavior == s3.Ignore {
continue
} else {
return nil, xerrors.NewSentinel("unexpected json field found in jsonline file")
}
}
}
// TODO: add support for col.Path
for i, col := range r.tableSchema.Columns() {
if col.PrimaryKey {
if r.hideSystemCols {
continue
}
switch col.ColumnName {
case FileNameSystemCol:
vals[i] = fname
case RowIndexSystemCol:
vals[i] = idx
default:
continue
}
continue
}
val, ok := row[col.ColumnName]
if !ok {
if col.ColumnName == RestColumnName && r.unexpectedFieldBehavior == s3.Infer {
vals[i] = abstract.Restore(col, rest)
} else {
vals[i] = nil
}
continue
}
vals[i] = val
}

return &abstract.ChangeItem{
CommitTime: uint64(lastModified.UnixNano()),
Kind: abstract.InsertKind,
Table: r.table.Name,
Schema: r.table.Namespace,
ColumnNames: r.colNames,
ColumnValues: vals,
TableSchema: r.tableSchema,
PartID: fname,
ID: 0,
LSN: 0,
Counter: 0,
OldKeys: abstract.EmptyOldKeys(),
TxID: "",
Query: "",
Size: abstract.RawEventSize(util.DeepSizeof(vals)),
}, nil
}

func (r *JSONLineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error) {
if r.tableSchema != nil && len(r.tableSchema.Columns()) != 0 {
return r.tableSchema, nil
Expand Down Expand Up @@ -512,6 +439,7 @@ func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session,
colNames: nil,
metrics: metrics,
unparsedPolicy: src.UnparsedPolicy,
parser: nil,
}

if len(reader.tableSchema.Columns()) == 0 {
Expand All @@ -528,6 +456,17 @@ func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session,
reader.tableSchema = appendSystemColsTableSchema(cols)
}

cfg := new(jsonparser.ParserConfigJSONCommon)
cfg.AddRest = false
cfg.NullKeysAllowed = true
cfg.Fields = reader.tableSchema.Columns()
cfg.AddDedupeKeys = false
p, err := jsonparser.NewParserJSON(cfg, false, lgr, metrics)
if err != nil {
return nil, xerrors.Errorf("unable to construct generic parser: %w", err)
}

reader.parser = p
reader.colNames = slices.Map(reader.tableSchema.Columns(), func(t abstract.ColSchema) string { return t.ColumnName })
reader.fastCols = reader.tableSchema.FastColumns() // need to cache it, so we will not construct it for every line
return reader, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestCanonParquet(t *testing.T) {

func TestCanonJsonline(t *testing.T) {
testCasePath := "test_jsonline_files"
cfg := s3.PrepareCfg(t, "jsonline_canon", server.ParsingFormatJSONLine)
cfg := s3.PrepareCfg(t, "jsonlinecanon", server.ParsingFormatJSONLine)
cfg.PathPrefix = testCasePath
if os.Getenv("S3MDS_PORT") != "" { // for local recipe we need to upload test case to internet
s3.PrepareTestCase(t, cfg, cfg.PathPrefix)
Expand Down

0 comments on commit 975ef3a

Please sign in to comment.