Skip to content

Commit

Permalink
Use old sink for rotation
Browse files Browse the repository at this point in the history
Use old sink for rotated tables
c4ca10c820cd0eb8c253078cbb8a5e0847ed7a4e
  • Loading branch information
DenisEvd committed Sep 2, 2024
1 parent 8064b3d commit 04b76c2
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
12 changes: 9 additions & 3 deletions transfer_manager/go/pkg/providers/yt/init/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,15 @@ func (p *Provider) Sink(middlewares.Config) (abstract.Sinker, error) {
if !p.transfer.SnapshotOnly() {
return nil, xerrors.Errorf("failed to create YT (static) sinker: can't make '%s' transfer while sinker is static", p.transfer.Type)
}
s, err = staticsink.NewStaticSink(dst, p.cp, p.transfer.ID, p.registry, p.logger)
if err != nil {
return nil, xerrors.Errorf("failed to create YT (static) sinker: %w", err)

if dst.Rotation() != nil {
if s, err = ytsink.NewRotatedStaticSink(dst, p.registry, p.logger, p.cp, p.transfer.ID); err != nil {
return nil, xerrors.Errorf("failed to create YT (static) sinker: %w", err)
}
} else {
if s, err = staticsink.NewStaticSink(dst, p.cp, p.transfer.ID, p.registry, p.logger); err != nil {
return nil, xerrors.Errorf("failed to create YT (static) sinker: %w", err)
}
}
} else {
jobIndex := getJobIndex(p.transfer)
Expand Down
6 changes: 6 additions & 0 deletions transfer_manager/go/pkg/providers/yt/model_yt_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type YtDestinationModel interface {

BuffererConfig() bufferer.BuffererConfig

SupportSharding() bool

CustomAttributes() map[string]any
// MergeAttributes should be used to merge user-defined custom table attributes
// with arbitrary attribute set (usually table settings like medium, ttl, ...)
Expand Down Expand Up @@ -509,6 +511,10 @@ func (d *YtDestinationWrapper) Proxy() string {
return d.Cluster()
}

func (d *YtDestinationWrapper) SupportSharding() bool {
return !(d.Model.Static && d.Rotation() != nil)
}

// this is kusok govna, it here for purpose - backward compatibility and no reuse without backward compatibility
func (d *YtDestinationWrapper) LegacyModel() interface{} {
return d.Model
Expand Down
2 changes: 1 addition & 1 deletion transfer_manager/go/pkg/providers/yt/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ func hackTimestamps(cols []abstract.ColSchema) []abstract.ColSchema {
return res
}

func NewStaticSink(cfg yt2.YtDestinationModel, registry metrics.Registry, logger log.Logger, cp coordinator.Coordinator, transferID string) (abstract.Sinker, error) {
func NewRotatedStaticSink(cfg yt2.YtDestinationModel, registry metrics.Registry, logger log.Logger, cp coordinator.Coordinator, transferID string) (abstract.Sinker, error) {
ytClient, err := ytclient.FromConnParams(cfg, logger)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestCustomAttributesStaticTable(t *testing.T) {
Namespace: "ns",
Name: "weird_table_2",
}
statTable, err := NewStaticSink(cfg, metrics.NewRegistry(), logger.Log, coordinator.NewFakeClient(), "test_transfer")
statTable, err := NewRotatedStaticSink(cfg, metrics.NewRegistry(), logger.Log, coordinator.NewFakeClient(), "test_transfer")
require.NoError(t, err)
// generate some amount of random change items
var items []abstract.ChangeItem
Expand Down
8 changes: 1 addition & 7 deletions transfer_manager/go/pkg/providers/yt/sink/v2/static_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,7 @@ func (s *sink) getTablePath(item abstract.ChangeItem) ypath.Path {
if s.config == nil {
return yt2.SafeChild(s.dir, tableName)
}

path := yt2.SafeChild(s.dir, s.config.GetTableAltName(tableName))
if s.config.Rotation() != nil {
path = yt2.SafeChild(s.dir, s.config.Rotation().AnnotateWithTimeFromColumn(s.config.GetTableAltName(tableName), item))
}

return path
return yt2.SafeChild(s.dir, s.config.GetTableAltName(tableName))
}

func getNameFromTableID(id abstract.TableID) string {
Expand Down

0 comments on commit 04b76c2

Please sign in to comment.