Skip to content

Commit

Permalink
Use map converter for transformer
Browse files Browse the repository at this point in the history
Actually Fixes: #225

---

Pull Request resolved: #228

Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
Co-authored-by: tserakhau <tserakhau@double.cloud>
commit_hash:99aa6fd3dd559c07877756e3f92c2a3fb9b68b7f
  • Loading branch information
laskoviymishka authored and robot-piglet committed Feb 27, 2025
1 parent a47e5ed commit 79914a2
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 1 deletion.
25 changes: 25 additions & 0 deletions cmd/trcli/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,34 @@ func ParseTransferYaml(rawData []byte) (*TransferYamlView, error) {
if err == nil {
transfer.Dst.Params = string(res)
}
if transfer.Transformation != nil {
for _, tr := range transfer.Transformation.Transformers {
for k, v := range tr {
tr[k] = convertMap(v)
}
}
}
return &transfer, nil
}

func convertMap(input interface{}) interface{} {
switch value := input.(type) {
case map[interface{}]interface{}:
newMap := make(map[string]interface{})
for k, v := range value {
if key, ok := k.(string); ok {
newMap[key] = convertMap(v)
}
}
return newMap
case []interface{}:
for i, v := range value {
value[i] = convertMap(v)
}
}
return input
}

func TablesFromYaml(tablesParams *string) (*UploadTables, error) {
rawData, err := os.ReadFile(*tablesParams)
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions cmd/trcli/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/doublecloud/transfer/pkg/providers/mongo"
_ "github.com/doublecloud/transfer/pkg/transformer/registry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -122,3 +123,75 @@ dst:
require.True(t, ok)
require.Equal(t, msrc.BatchingParams.BatchFlushInterval, 10*time.Second)
}

func TestTransformer(t *testing.T) {
t.Run("valid", func(t *testing.T) {
transfer, err := ParseTransferYaml([]byte(`
src:
type: mongo
params:
BatchingParams:
BatchFlushInterval: 10s
dst:
type: stdout
params:
ShowData: false
regular_snapshot:
enabled: false
interval: 0s
cron_expression: ""
increment_delay_seconds: 0
incremental: []
transformation:
debugmode: false
transformers:
- renameTables:
renameTables:
- newName:
name: a
nameSpace: ""
originalName:
name: a
nameSpace: a_namespace
transformerId: ""
errorsoutput: null
data_objects:
include_objects:
- sgd6096.order
type_system_version: 9
`))
require.NoError(t, err)
require.Len(t, transfer.Transformation.Transformers, 1)
require.NoError(t, transfer.Validate())
})
t.Run("invalid", func(t *testing.T) {
transfer, err := ParseTransferYaml([]byte(`
src:
type: mango
params:
BatchingParams:
BatchFlushInterval: 10s
dst:
type: stdout
params:
ShowData: false
transformation:
transformers:
- boboTables:
renameTables:
- newName:
name: a
nameSpace: ""
originalName:
name: a
nameSpace: a_namespace
transformerId: ""
data_objects:
include_objects:
- sgd6096.order
type_system_version: 9
`))
require.NoError(t, err)
require.Error(t, transfer.Validate())
})
}
22 changes: 21 additions & 1 deletion cmd/trcli/config/model.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package config

import (
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/transformer"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)

type Endpoint struct {
Expand Down Expand Up @@ -49,3 +52,20 @@ type TransferYamlView struct {
DataObjects *model.DataObjects `yaml:"data_objects"`
TypeSystemVersion int `yaml:"type_system_version"`
}

func (v TransferYamlView) Validate() error {
if v.Transformation == nil || v.Transformation.Transformers == nil {
return nil
}
var errs error
for _, tr := range v.Transformation.Transformers {
_, err := transformer.New(tr.Type(), tr.Config(), logger.Log, abstract.TransformationRuntimeOpts{JobIndex: 0})
if err != nil {
errs = multierr.Append(errs, xerrors.Errorf("unable to construct %s(%s): %w", tr.Type(), tr.ID(), err))
}
}
if errs != nil {
return xerrors.Errorf("transformers invalid: %w", errs)
}
return nil
}

0 comments on commit 79914a2

Please sign in to comment.