Skip to content

Commit

Permalink
TRANSFER-783: Use generic parser for JSON-s
Browse files Browse the repository at this point in the history
  • Loading branch information
laskoviymishka committed Aug 16, 2024
1 parent bb3d39c commit 886a3c1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 98 deletions.
7 changes: 6 additions & 1 deletion transfer_manager/go/pkg/providers/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func appendSystemColsTableSchema(cols []abstract.ColSchema) *abstract.TableSchem
return abstract.NewTableSchema(cols)
}

func New(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (Reader, error) {
func New(
src *s3.S3Source,
lgr log.Logger,
sess *session.Session,
metrics *stats.SourceStats,
) (Reader, error) {
switch src.InputFormat {
case server.ParsingFormatPARQUET:
reader, err := NewParquet(src, lgr, sess, metrics)
Expand Down
141 changes: 45 additions & 96 deletions transfer_manager/go/pkg/providers/s3/reader/reader_json_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"fmt"
"io"
"math"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
aws_s3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/doublecloud/tross/kikimr/public/sdk/go/persqueue"
"github.com/doublecloud/tross/library/go/core/xerrors"
"github.com/doublecloud/tross/library/go/slices"
"github.com/doublecloud/tross/transfer_manager/go/pkg/abstract"
"github.com/doublecloud/tross/transfer_manager/go/pkg/abstract/changeitem/strictify"
"github.com/doublecloud/tross/transfer_manager/go/pkg/parsers"
jsonparser "github.com/doublecloud/tross/transfer_manager/go/pkg/parsers/registry/json"
"github.com/doublecloud/tross/transfer_manager/go/pkg/parsers/scanner"
"github.com/doublecloud/tross/transfer_manager/go/pkg/providers/s3"
chunk_pusher "github.com/doublecloud/tross/transfer_manager/go/pkg/providers/s3/pusher"
Expand Down Expand Up @@ -54,6 +55,8 @@ type JSONLineReader struct {
pathPattern string
metrics *stats.SourceStats
unparsedPolicy s3.UnparsedPolicy

parser parsers.Parser
}

func (r *JSONLineReader) openReader(ctx context.Context, filePath string) (*S3Reader, error) {
Expand Down Expand Up @@ -151,18 +154,37 @@ func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk
var buff []abstract.ChangeItem
var currentSize int64
for _, line := range lines {
ci, err := r.doParse(line, filePath, s3Reader.LastModified(), lineCounter)
if err != nil {
unparsedCI, err := handleParseError(r.table, r.unparsedPolicy, filePath, int(lineCounter), err)
if err != nil {
return err
cis := r.parser.Do(persqueue.ReadMessage{
Offset: 0,
SeqNo: 0,
SourceID: nil,
CreateTime: s3Reader.LastModified(),
WriteTime: s3Reader.LastModified(),
IP: "",
Data: []byte(line),
Codec: 0,
ExtraFields: nil,
}, abstract.NewPartition(filePath, 0))
for i := range cis {
if parsers.IsUnparsed(cis[i]) {
if r.unparsedPolicy == s3.UnparsedPolicyFail {
return abstract.NewFatalError(xerrors.Errorf("unable to parse line: %s: %w", line, err))
}
buff = append(buff, cis[i])
continue
}
buff = append(buff, *unparsedCI)
continue
cis[i].Table = r.table.Name
cis[i].Schema = r.table.Namespace
cis[i].PartID = filePath
cis[i].ColumnValues[0] = filePath
cis[i].ColumnValues[1] = lineCounter
cis[i].Size = abstract.RawEventSize(util.DeepSizeof(cis[i].ColumnValues))
buff = append(buff, cis[i])
}
currentSize += int64(ci.Size.Values)

currentSize += int64(len(line))
lineCounter++
buff = append(buff, *ci)

if len(buff) > r.batchSize {
if err := pusher.Push(ctx, chunk_pusher.Chunk{
Items: buff,
Expand Down Expand Up @@ -196,96 +218,11 @@ func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk
return nil
}

func (r *JSONLineReader) doParse(line string, filePath string, lastModified time.Time, lineCounter uint64) (*abstract.ChangeItem, error) {
row := make(map[string]any)
if err := json.Unmarshal([]byte(line), &row); err != nil {
return nil, xerrors.Errorf("failed to unmarshal json line: %w", err)
}

ci, err := r.constructCI(row, filePath, lastModified, lineCounter)
if err != nil {
return nil, xerrors.Errorf("unable to construct change item: %w", err)
}

if err := strictify.Strictify(ci, r.fastCols); err != nil {
return nil, xerrors.Errorf("failed to convert value to the expected data type: %w", err)
}
return ci, nil
}

func (r *JSONLineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem {
// the most complex and useful method in the world
return chunk.Items
}

func (r *JSONLineReader) constructCI(row map[string]any, fname string, lastModified time.Time, idx uint64) (*abstract.ChangeItem, error) {
vals := make([]interface{}, len(r.tableSchema.Columns()))
rest := make(map[string]any)
for key, val := range row {
known := false
for _, col := range r.tableSchema.Columns() {
if col.ColumnName == key {
known = true
break
}
}
if !known {
if r.unexpectedFieldBehavior == s3.Infer {
rest[key] = val
} else if r.unexpectedFieldBehavior == s3.Ignore {
continue
} else {
return nil, xerrors.NewSentinel("unexpected json field found in jsonline file")
}
}
}
// TODO: add support for col.Path
for i, col := range r.tableSchema.Columns() {
if col.PrimaryKey {
if r.hideSystemCols {
continue
}
switch col.ColumnName {
case FileNameSystemCol:
vals[i] = fname
case RowIndexSystemCol:
vals[i] = idx
default:
continue
}
continue
}
val, ok := row[col.ColumnName]
if !ok {
if col.ColumnName == RestColumnName && r.unexpectedFieldBehavior == s3.Infer {
vals[i] = abstract.Restore(col, rest)
} else {
vals[i] = nil
}
continue
}
vals[i] = val
}

return &abstract.ChangeItem{
CommitTime: uint64(lastModified.UnixNano()),
Kind: abstract.InsertKind,
Table: r.table.Name,
Schema: r.table.Namespace,
ColumnNames: r.colNames,
ColumnValues: vals,
TableSchema: r.tableSchema,
PartID: fname,
ID: 0,
LSN: 0,
Counter: 0,
OldKeys: abstract.EmptyOldKeys(),
TxID: "",
Query: "",
Size: abstract.RawEventSize(util.DeepSizeof(vals)),
}, nil
}

func (r *JSONLineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error) {
if r.tableSchema != nil && len(r.tableSchema.Columns()) != 0 {
return r.tableSchema, nil
Expand Down Expand Up @@ -512,6 +449,7 @@ func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session,
colNames: nil,
metrics: metrics,
unparsedPolicy: src.UnparsedPolicy,
parser: nil,
}

if len(reader.tableSchema.Columns()) == 0 {
Expand All @@ -528,6 +466,17 @@ func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session,
reader.tableSchema = appendSystemColsTableSchema(cols)
}

cfg := new(jsonparser.ParserConfigJSONCommon)
cfg.AddRest = reader.unexpectedFieldBehavior == s3.Infer
cfg.NullKeysAllowed = true
cfg.Fields = reader.tableSchema.Columns()
cfg.AddDedupeKeys = false
p, err := jsonparser.NewParserJSON(cfg, false, lgr, metrics)
if err != nil {
return nil, xerrors.Errorf("unable to construct generic parser: %w", err)
}

reader.parser = p
reader.colNames = slices.Map(reader.tableSchema.Columns(), func(t abstract.ColSchema) string { return t.ColumnName })
reader.fastCols = reader.tableSchema.FastColumns() // need to cache it, so we will not construct it for every line
return reader, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestCanonParquet(t *testing.T) {

func TestCanonJsonline(t *testing.T) {
testCasePath := "test_jsonline_files"
cfg := s3.PrepareCfg(t, "jsonline_canon", server.ParsingFormatJSONLine)
cfg := s3.PrepareCfg(t, "jsonlinecanon", server.ParsingFormatJSONLine)
cfg.PathPrefix = testCasePath
if os.Getenv("S3MDS_PORT") != "" { // for local recipe we need to upload test case to internet
s3.PrepareTestCase(t, cfg, cfg.PathPrefix)
Expand Down

0 comments on commit 886a3c1

Please sign in to comment.