Skip to content

Commit

Permalink
added warm-up schema-registry cache
Browse files Browse the repository at this point in the history
commit_hash:45eab2973f7551af2af6ed620bfe839e0984ce52
  • Loading branch information
timmyb32r committed Jan 27, 2025
1 parent b504cf4 commit ddf89c2
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 17 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,7 @@
"pkg/schemaregistry/format/gotest/canondata/result.json":"transfer_manager/go/pkg/schemaregistry/format/gotest/canondata/result.json",
"pkg/schemaregistry/format/json_schema_format.go":"transfer_manager/go/pkg/schemaregistry/format/json_schema_format.go",
"pkg/schemaregistry/format/json_schema_format_test.go":"transfer_manager/go/pkg/schemaregistry/format/json_schema_format_test.go",
"pkg/schemaregistry/warmup/warmup.go":"transfer_manager/go/pkg/schemaregistry/warmup/warmup.go",
"pkg/serializer/batch.go":"transfer_manager/go/pkg/serializer/batch.go",
"pkg/serializer/batch_test.go":"transfer_manager/go/pkg/serializer/batch_test.go",
"pkg/serializer/csv.go":"transfer_manager/go/pkg/serializer/csv.go",
Expand Down
6 changes: 3 additions & 3 deletions pkg/debezium/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

type Receiver struct {
originalTypes map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo // map: table -> [fieldName -> originalType]
unpacker unpacker.Unpacker
Unpacker unpacker.Unpacker
schemaFormat string
tableSchemaCache map[string]tableSchemaCacheItem
tableSchemaCacheMutex *sync.RWMutex
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *Receiver) convertSchemaFormat(schema []byte) ([]byte, error) {
}

func (r *Receiver) Receive(in string) (*abstract.ChangeItem, error) {
schema, payload, err := r.unpacker.Unpack([]byte(in))
schema, payload, err := r.Unpacker.Unpack([]byte(in))
if err != nil {
return nil, xerrors.Errorf("can't unpack message: %w", err)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func NewReceiver(originalTypes map[abstract.TableID]map[string]*debeziumcommon.O
}
return &Receiver{
originalTypes: originalTypes,
unpacker: currUnpacker,
Unpacker: currUnpacker,
schemaFormat: schemaFormat,
tableSchemaCache: make(map[string]tableSchemaCacheItem),
tableSchemaCacheMutex: new(sync.RWMutex),
Expand Down
7 changes: 5 additions & 2 deletions pkg/debezium/unpacker/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ func (s *SchemaRegistry) Unpack(message []byte) ([]byte, []byte, error) {

schema, _ := backoff.RetryNotifyWithData(func() (*confluent.Schema, error) {
return s.schemaRegistryClient.GetSchema(int(schemaID))
}, backoff.NewConstantBackOff(time.Second), util.BackoffLogger(logger.Log, "getting schema"),
)
}, backoff.NewConstantBackOff(time.Second), util.BackoffLogger(logger.Log, "getting schema"))

return []byte(schema.Schema), message[5:], nil
}

func (s *SchemaRegistry) SchemaRegistryClient() *confluent.SchemaRegistryClient {
return s.schemaRegistryClient
}

func NewSchemaRegistry(srClient *confluent.SchemaRegistryClient) *SchemaRegistry {
return &SchemaRegistry{
schemaRegistryClient: srClient,
Expand Down
21 changes: 13 additions & 8 deletions pkg/parsers/registry/confluentschemaregistry/engine/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engine
import (
"encoding/binary"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -11,15 +12,17 @@ import (
"github.com/doublecloud/transfer/pkg/parsers"
genericparser "github.com/doublecloud/transfer/pkg/parsers/generic"
"github.com/doublecloud/transfer/pkg/schemaregistry/confluent"
"github.com/doublecloud/transfer/pkg/schemaregistry/warmup"
"github.com/doublecloud/transfer/pkg/util"
"go.ytsaurus.tech/library/go/core/log"
)

type ConfluentSrImpl struct {
logger log.Logger
SchemaRegistryClient *confluent.SchemaRegistryClient
SendSrNotFoundToUnparsed bool
inMDBuilder *mdBuilder
logger log.Logger
SchemaRegistryClient *confluent.SchemaRegistryClient
schemaRegistryClientMutex sync.Mutex
SendSrNotFoundToUnparsed bool
inMDBuilder *mdBuilder
}

func (p *ConfluentSrImpl) doWithSchema(partition abstract.Partition, schema *confluent.Schema, refs map[string]confluent.Schema, name string, buf []byte, offset uint64, writeTime time.Time, isCloudevents bool) ([]byte, []abstract.ChangeItem) {
Expand Down Expand Up @@ -115,6 +118,7 @@ func (p *ConfluentSrImpl) Do(msg parsers.Message, partition abstract.Partition)
}

func (p *ConfluentSrImpl) DoBatch(batch parsers.MessageBatch) []abstract.ChangeItem {
warmup.WarmUpSRCache(p.logger, &p.schemaRegistryClientMutex, batch, p.SchemaRegistryClient, p.SendSrNotFoundToUnparsed)
result := make([]abstract.ChangeItem, 0, len(batch.Messages))
for _, msg := range batch.Messages {
result = append(result, p.Do(msg, abstract.Partition{Cluster: "", Partition: batch.Partition, Topic: batch.Topic})...)
Expand All @@ -130,9 +134,10 @@ func NewConfluentSchemaRegistryImpl(srURL string, caCert string, username string
}
client.SetCredentials(username, password)
return &ConfluentSrImpl{
logger: logger,
SchemaRegistryClient: client,
SendSrNotFoundToUnparsed: SendSrNotFoundToUnparsed,
inMDBuilder: newMDBuilder(),
logger: logger,
SchemaRegistryClient: client,
schemaRegistryClientMutex: sync.Mutex{},
SendSrNotFoundToUnparsed: SendSrNotFoundToUnparsed,
inMDBuilder: newMDBuilder(),
}
}
29 changes: 25 additions & 4 deletions pkg/parsers/registry/debezium/engine/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"encoding/binary"
"fmt"
"strings"
"sync"
"time"

"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/debezium"
"github.com/doublecloud/transfer/pkg/parsers"
"github.com/doublecloud/transfer/pkg/parsers/generic"
"github.com/doublecloud/transfer/pkg/schemaregistry/confluent"
"github.com/doublecloud/transfer/pkg/schemaregistry/warmup"
"github.com/doublecloud/transfer/pkg/util"
"github.com/doublecloud/transfer/pkg/util/pool"
"go.ytsaurus.tech/library/go/core/log"
Expand All @@ -21,7 +23,8 @@ type DebeziumImpl struct {
logger log.Logger
debeziumReceiver *debezium.Receiver

threadsNumber uint64
threadsNumber uint64
schemaRegistryClientMutex sync.Mutex
}

// DoOne message with multiple debezium events inside.
Expand Down Expand Up @@ -96,7 +99,24 @@ func (p *DebeziumImpl) Do(msg parsers.Message, partition abstract.Partition) []a
return p.DoBuf(partition, msg.Value, msg.Offset, msg.WriteTime)
}

// It's important to warn-up Schema-Registry cache single-thread, to not to DDoS Schema-Registry
func (p *DebeziumImpl) warmUpSRCache(batch parsers.MessageBatch) {
type SRClient interface {
SchemaRegistryClient() *confluent.SchemaRegistryClient
}

var schemaRegistryClient *confluent.SchemaRegistryClient
if sr, ok := p.debeziumReceiver.Unpacker.(SRClient); ok {
schemaRegistryClient = sr.SchemaRegistryClient()
} else {
return
}

warmup.WarmUpSRCache(p.logger, &p.schemaRegistryClientMutex, batch, schemaRegistryClient, false)
}

func (p *DebeziumImpl) DoBatch(batch parsers.MessageBatch) []abstract.ChangeItem {
p.warmUpSRCache(batch)
if p.threadsNumber > 1 {
return p.doMultiThread(batch)
}
Expand All @@ -109,8 +129,9 @@ func (p *DebeziumImpl) DoBatch(batch parsers.MessageBatch) []abstract.ChangeItem

func NewDebeziumImpl(logger log.Logger, schemaRegistry *confluent.SchemaRegistryClient, threads uint64) *DebeziumImpl {
return &DebeziumImpl{
logger: logger,
debeziumReceiver: debezium.NewReceiver(nil, schemaRegistry),
threadsNumber: threads,
logger: logger,
debeziumReceiver: debezium.NewReceiver(nil, schemaRegistry),
threadsNumber: threads,
schemaRegistryClientMutex: sync.Mutex{},
}
}
58 changes: 58 additions & 0 deletions pkg/schemaregistry/warmup/warmup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package warmup

import (
"bytes"
"encoding/binary"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/pkg/parsers"
"github.com/doublecloud/transfer/pkg/schemaregistry/confluent"
"github.com/doublecloud/transfer/pkg/util"
"github.com/doublecloud/transfer/pkg/util/set"
"go.ytsaurus.tech/library/go/core/log"
)

// It's important to warn-up Schema-Registry cache single-thread, to not to DDoS Schema-Registry
// mutex we need not bcs of something is thread-unsafe, but to reduce schema-registry RPS
func WarmUpSRCache(logger log.Logger, mutex *sync.Mutex, batch parsers.MessageBatch, schemaRegistryClient *confluent.SchemaRegistryClient, notFoundIsOk bool) {
extractSchemaID := func(buf []byte) (uint32, []byte) {
msgLen := len(buf)
if len(buf) != 0 {
if buf[0] == 0 {
zeroIndex := bytes.Index(buf[5:], []byte{0})
if zeroIndex != -1 {
msgLen = 5 + zeroIndex
}
}
}
return binary.BigEndian.Uint32(buf[1:5]), buf[msgLen:]
}

schemaIDs := set.New[uint32]()
var schemaID uint32 = 0
for _, currMsg := range batch.Messages {
leastBuf := currMsg.Value
for {
if len(leastBuf) == 0 {
break
}
schemaID, leastBuf = extractSchemaID(leastBuf)
schemaIDs.Add(schemaID)
}
}

mutex.Lock()
defer mutex.Unlock()
for _, currSchemaID := range schemaIDs.Slice() {
_ = backoff.RetryNotify(func() error {
_, err := schemaRegistryClient.GetSchema(int(currSchemaID))
if notFoundIsOk && err != nil && strings.Contains(err.Error(), "Error code: 404") {
return nil
}
return err
}, backoff.NewConstantBackOff(time.Second), util.BackoffLogger(logger, "getting schema (warm-up cache)"))
}
}

0 comments on commit ddf89c2

Please sign in to comment.