Skip to content

Commit

Permalink
maint: Refactor cuckoo cache for reusability (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ authored Jan 25, 2024
1 parent cf0e805 commit 82a4c0d
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 59 deletions.
61 changes: 37 additions & 24 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,31 @@ type keptTraceCacheEntry struct {
reason uint32 // which rule was used to decide to keep the trace
}

func NewKeptTraceCacheEntry(trace *types.Trace) *keptTraceCacheEntry {
if trace == nil {
// KeptTrace is an interface for a trace that was kept.
// It contains all the information we need to remember about the trace.
type KeptTrace interface {
ID() string
SampleRate() uint
DescendantCount() uint32
SpanEventCount() uint32
SpanLinkCount() uint32
SpanCount() uint32
SetSentReason(uint)
SentReason() uint
}

func NewKeptTraceCacheEntry(t KeptTrace) *keptTraceCacheEntry {
if t == nil {
return &keptTraceCacheEntry{}
}

return &keptTraceCacheEntry{
rate: uint32(trace.SampleRate),
eventCount: trace.DescendantCount(),
spanEventCount: trace.SpanEventCount(),
spanLinkCount: trace.SpanLinkCount(),
spanCount: trace.SpanCount(),
reason: uint32(trace.SentReason),
rate: uint32(t.SampleRate()),
eventCount: t.DescendantCount(),
spanEventCount: t.SpanEventCount(),
spanLinkCount: t.SpanLinkCount(),
spanCount: t.SpanCount(),
reason: uint32(t.SentReason()),
}
}

Expand Down Expand Up @@ -71,10 +84,6 @@ func (t *keptTraceCacheEntry) SpanCount() uint {
return uint(t.spanCount)
}

func (t *keptTraceCacheEntry) Reason() uint {
return uint(t.reason)
}

// Count records additional spans in the cache record.
func (t *keptTraceCacheEntry) Count(s *types.Span) {
t.eventCount++
Expand Down Expand Up @@ -142,7 +151,8 @@ type cuckooSentCache struct {
done chan struct{}

// This mutex is for managing kept traces
keptMut sync.Mutex
keptMut sync.Mutex
sentReasons *SentReasonsCache
}

// Make sure it implements TraceSentCache
Expand All @@ -156,10 +166,11 @@ func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (Trac
dropped := NewCuckooTraceChecker(cfg.DroppedSize, met)

cache := &cuckooSentCache{
kept: stc,
dropped: dropped,
cfg: cfg,
done: make(chan struct{}),
kept: stc,
dropped: dropped,
cfg: cfg,
sentReasons: NewSentReasonsCache(met),
done: make(chan struct{}),
}
go cache.monitor()
return cache, nil
Expand All @@ -183,37 +194,39 @@ func (c *cuckooSentCache) Stop() {
close(c.done)
}

func (c *cuckooSentCache) Record(trace *types.Trace, keep bool) {
func (c *cuckooSentCache) Record(trace KeptTrace, keep bool, reason string) {
if keep {
// record this decision in the sent record LRU for future spans
trace.SetSentReason(c.sentReasons.Set(reason))
sentRecord := NewKeptTraceCacheEntry(trace)

c.keptMut.Lock()
defer c.keptMut.Unlock()
c.kept.Add(trace.TraceID, sentRecord)
c.kept.Add(trace.ID(), sentRecord)

return
}
// if we're not keeping it, save it in the dropped trace filter
c.dropped.Add(trace.TraceID)
c.dropped.Add(trace.ID())
}

func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, bool) {
func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, string, bool) {
// was it dropped?
if c.dropped.Check(span.TraceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, false
return &cuckooDroppedRecord{}, "", false
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(span.TraceID); found {
// if we kept it, then this span being checked needs counting too
sentRecord.Count(span)
return sentRecord, true
reason, _ := c.sentReasons.Get(uint(sentRecord.reason))
return sentRecord, reason, true
}
// we have no memory of this place
return nil, false
return nil, "", false
}

func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error {
Expand Down
2 changes: 1 addition & 1 deletion collect/cache/sent_reason_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestSentReasonCache(t *testing.T) {
keys = append(keys, c.Set(item))
}
for i, key := range keys {
item, ok := c.Get(uint(key))
item, ok := c.Get(key)
assert.True(t, ok, "key %d should exist", key)
assert.Equal(t, entries[i], item)
}
Expand Down
5 changes: 1 addition & 4 deletions collect/cache/sent_reasons_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cache

import (
"math/rand"
"sync"

"github.com/dgryski/go-wyhash"
"github.com/honeycombio/refinery/metrics"
Expand All @@ -12,14 +11,14 @@ import (
// It acts as a mapping between the string representation of send reason
// and a uint.
// This is used to reduce the memory footprint of the trace cache.
// It is not concurrency-safe.

type SentReasonsCache struct {
Metrics metrics.Metrics

data []string
keys map[uint64]uint32

mu sync.Mutex
hashSeed uint64
}

Expand All @@ -37,8 +36,6 @@ func NewSentReasonsCache(metrics metrics.Metrics) *SentReasonsCache {
// Set adds a new reason to the cache, returning the key.
// The key is generated by incrementing a counter.
func (c *SentReasonsCache) Set(key string) uint {
c.mu.Lock()
defer c.mu.Unlock()
// generate a hash
hash := wyhash.Hash([]byte(key), c.hashSeed)

Expand Down
7 changes: 2 additions & 5 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ type TraceSentRecord interface {
SpanCount() uint
// Count records additional spans in the totals
Count(*types.Span)

// Reason returns the reason the trace was kept or dropped
Reason() uint
}

type TraceSentCache interface {
// Record preserves the record of a trace being sent or not.
Record(trace *types.Trace, keep bool)
Record(trace KeptTrace, keep bool, reason string)
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, bool)
Check(span *types.Span) (TraceSentRecord, string, bool)
// Stop halts the cache in preparation for shutdown
Stop()
// Resize adjusts the size of the cache according to the Config passed in
Expand Down
42 changes: 20 additions & 22 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type InMemCollector struct {
datasetSamplers map[string]sample.Sampler

sampleTraceCache cache.TraceSentCache
sentReasonsCache *cache.SentReasonsCache

incoming chan *types.Span
fromPeer chan *types.Span
Expand Down Expand Up @@ -128,7 +127,6 @@ func (i *InMemCollector) Start() error {
i.Metrics.Store("PEER_CAP", float64(cap(i.fromPeer)))
i.reload = make(chan struct{}, 1)
i.datasetSamplers = make(map[string]sample.Sampler)
i.sentReasonsCache = cache.NewSentReasonsCache(i.Metrics)

if i.Config.GetAddHostMetadataToTrace() {
if hostname, err := os.Hostname(); err == nil && hostname != "" {
Expand Down Expand Up @@ -391,12 +389,12 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
trace := i.cache.Get(sp.TraceID)
if trace == nil {
// if the trace has already been sent, just pass along the span
if sr, found := i.sampleTraceCache.Check(sp); found {
if sr, sentReason, found := i.sampleTraceCache.Check(sp); found {
i.Metrics.Increment("trace_sent_cache_hit")
// bump the count of records on this trace -- if the root span isn't
// the last late span, then it won't be perfect, but it will be better than
// having none at all
i.dealWithSentTrace(sr, sp)
i.dealWithSentTrace(sr, sentReason, sp)
return
}
// trace hasn't already been sent (or this span is really old); let's
Expand All @@ -416,8 +414,8 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now.Add(timeout),
SampleRate: sp.SampleRate, // if it had a sample rate, we want to keep it
}
trace.SetSampleRate(sp.SampleRate) // if it had a sample rate, we want to keep it
// push this into the cache and if we eject an unsent trace, send it ASAP
ejectedTrace := i.cache.Set(trace)
if ejectedTrace != nil {
Expand All @@ -427,7 +425,15 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// if the trace we got back from the cache has already been sent, deal with the
// span.
if trace.Sent {
i.dealWithSentTrace(cache.NewKeptTraceCacheEntry(trace), sp)
if sr, reason, found := i.sampleTraceCache.Check(sp); found {
i.Metrics.Increment("trace_sent_cache_hit")
i.dealWithSentTrace(sr, reason, sp)
return
}
// trace has already been sent, but this is not in the sent cache.
// we will just use the default late span reason as the sent reason which is
// set inside the dealWithSentTrace function
i.dealWithSentTrace(cache.NewKeptTraceCacheEntry(trace), "", sp)
}

// great! trace is live. add the span.
Expand Down Expand Up @@ -468,11 +474,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampl
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
if keep {
reasonKey := i.sentReasonsCache.Set(reason)
trace.SentReason = reasonKey
}
i.sampleTraceCache.Record(trace, keep)
i.sampleTraceCache.Record(trace, keep, reason)
if !keep {
i.Metrics.Increment("dropped_from_stress")
return
Expand All @@ -493,11 +495,11 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampl
// dealWithSentTrace handles a span that has arrived after the sampling decision
// on the trace has already been made, and it obeys that decision by either
// sending the span immediately or dropping it.
func (i *InMemCollector) dealWithSentTrace(tr cache.TraceSentRecord, sp *types.Span) {
func (i *InMemCollector) dealWithSentTrace(tr cache.TraceSentRecord, sentReason string, sp *types.Span) {
if i.Config.GetAddRuleReasonToTrace() {
metaReason, ok := i.sentReasonsCache.Get(tr.Reason())
if ok {
metaReason = fmt.Sprintf("%s - late arriving span", metaReason)
var metaReason string
if len(sentReason) > 0 {
metaReason = fmt.Sprintf("%s - late arriving span", sentReason)
} else {
metaReason = "late arriving span"
}
Expand Down Expand Up @@ -639,7 +641,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {

// make sampling decision and update the trace
rate, shouldSend, reason, key := sampler.GetSampleRate(trace)
trace.SampleRate = rate
trace.SetSampleRate(rate)
trace.KeepSample = shouldSend
logFields["reason"] = reason
if key != "" {
Expand All @@ -648,11 +650,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
// This will observe sample rate attempts even if the trace is dropped
i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate))

if shouldSend {
reasonKey := i.sentReasonsCache.Set(reason)
trace.SentReason = reasonKey
}
i.sampleTraceCache.Record(trace, shouldSend)
i.sampleTraceCache.Record(trace, shouldSend, reason)

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
if !shouldSend && !i.Config.GetIsDryRun() {
Expand Down Expand Up @@ -698,7 +696,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if i.hostname != "" {
sp.Data["meta.refinery.local_hostname"] = i.hostname
}
mergeTraceAndSpanSampleRates(sp, trace.SampleRate, isDryRun)
mergeTraceAndSpanSampleRates(sp, trace.SampleRate(), isDryRun)
i.addAdditionalAttributes(sp)
i.Transmission.EnqueueSpan(sp)
}
Expand Down
1 change: 0 additions & 1 deletion collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
Metrics: s,
Logger: &logger.NullLogger{},
},
sentReasonsCache: cache.NewSentReasonsCache(s),
}
}

Expand Down
24 changes: 22 additions & 2 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type Trace struct {
TraceID string

// SampleRate should only be changed if the changer holds the SendSampleLock
SampleRate uint
sampleRate uint
// KeepSample should only be changed if the changer holds the SendSampleLock
KeepSample bool
// Sent should only be changed if the changer holds the SendSampleLock
Sent bool
SentReason uint
sentReason uint

SendBy time.Time

Expand Down Expand Up @@ -100,6 +100,26 @@ func (t *Trace) GetSpans() []*Span {
return t.spans
}

func (t *Trace) ID() string {
return t.TraceID
}

func (t *Trace) SampleRate() uint {
return t.sampleRate
}

func (t *Trace) SetSampleRate(rate uint) {
t.sampleRate = rate
}

func (t *Trace) SentReason() uint {
return t.sentReason
}

func (t *Trace) SetSentReason(reason uint) {
t.sentReason = reason
}

// DescendantCount gets the number of descendants of all kinds currently in this trace
func (t *Trace) DescendantCount() uint32 {
return uint32(len(t.spans))
Expand Down

0 comments on commit 82a4c0d

Please sign in to comment.