Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ThroughputLimit in samplers #1300

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
Expand Down Expand Up @@ -198,6 +199,8 @@ func newStartedApp(
&inject.Object{Value: samplerFactory},
&inject.Object{Value: &health.Health{}},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: &pubsub.LocalPubSub{}},
&inject.Object{Value: &collect.EMAThroughputCalculator{}, Name: "throughputCalculator"},
&inject.Object{Value: &collect.MockStressReliever{}, Name: "stressRelief"},
&inject.Object{Value: &a},
)
Expand Down
1 change: 1 addition & 0 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func main() {
{Value: version, Name: "version"},
{Value: samplerFactory},
{Value: stressRelief, Name: "stressRelief"},
{Value: &collect.EMAThroughputCalculator{}, Name: "throughputCalculator"},
{Value: &health.Health{}},
{Value: &configwatcher.ConfigWatcher{}},
{Value: &a},
Expand Down
19 changes: 13 additions & 6 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ type InMemCollector struct {
Health health.Recorder `inject:""`
Sharder sharder.Sharder `inject:""`

Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
ThroughputCalculator *EMAThroughputCalculator `inject:"throughputCalculator"`
Peers peer.Peers `inject:""`

// For test use only
BlockOnAddSpan bool
Expand Down Expand Up @@ -736,6 +737,8 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}
trace.Sent = true

i.ThroughputCalculator.IncrementEventCount(int(trace.DescendantCount()))

traceDur := i.Clock.Since(trace.ArrivalTime)
i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds()))
i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount()))
Expand Down Expand Up @@ -781,7 +784,11 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}

// make sampling decision and update the trace
rate, shouldSend, reason, key := sampler.GetSampleRate(trace)
originalRate, reason, key := sampler.GetSampleRate(trace)
sampleRateMultiplier := i.ThroughputCalculator.GetSamplingRateMultiplier()
rate := uint(float64(originalRate) * sampleRateMultiplier)
shouldSend := sampler.MakeSamplingDecision(rate, trace)

trace.SetSampleRate(rate)
trace.KeepSample = shouldSend
logFields["reason"] = reason
Expand Down
33 changes: 22 additions & 11 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
Expand Down Expand Up @@ -50,14 +51,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
healthReporter.Start()

return &InMemCollector{
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
ThroughputCalculator: &EMAThroughputCalculator{},
SamplerFactory: &sample.SamplerFactory{
Config: conf,
Metrics: s,
Expand Down Expand Up @@ -423,14 +425,20 @@ func TestDryRunMode(t *testing.T) {
var traceID2 = "def456"
var traceID3 = "ghi789"
// sampling decisions based on trace ID
sampleRate1, keepTraceID1, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID1})
trace1 := &types.Trace{TraceID: traceID1}
sampleRate1, _, _ := sampler.GetSampleRate(trace1)
keepTraceID1 := sampler.MakeSamplingDecision(sampleRate1, trace1)
// would be dropped if dry run mode was not enabled
assert.False(t, keepTraceID1)
assert.Equal(t, uint(10), sampleRate1)
sampleRate2, keepTraceID2, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID2})
trace2 := &types.Trace{TraceID: traceID2}
sampleRate2, _, _ := sampler.GetSampleRate(trace2)
keepTraceID2 := sampler.MakeSamplingDecision(sampleRate2, trace2)
assert.True(t, keepTraceID2)
assert.Equal(t, uint(10), sampleRate2)
sampleRate3, keepTraceID3, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID3})
trace3 := &types.Trace{TraceID: traceID3}
sampleRate3, _, _ := sampler.GetSampleRate(trace3)
keepTraceID3 := sampler.MakeSamplingDecision(sampleRate3, trace3)
// would be dropped if dry run mode was not enabled
assert.False(t, keepTraceID3)
assert.Equal(t, uint(10), sampleRate3)
Expand Down Expand Up @@ -827,8 +835,11 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &sharder.SingleServerSharder{}},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "metrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
&inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"},
&inject.Object{Value: &pubsub.LocalPubSub{}},
&inject.Object{Value: &EMAThroughputCalculator{}, Name: "throughputCalculator"},
&inject.Object{Value: &peer.MockPeers{}},
)
if err != nil {
Expand Down
188 changes: 188 additions & 0 deletions collect/throughput_calculator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package collect

import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
)

const emaThroughputTopic = "ema_throughput"

// EMAThroughputCalculator encapsulates the logic to calculate a throughput value using an Exponential Moving Average (EMA).
type EMAThroughputCalculator struct {
Config config.Config `inject:""`
Clock clockwork.Clock `inject:""`
Pubsub pubsub.PubSub `inject:""`
Peer peer.Peers `inject:""`

throughputLimit uint
weight float64 // Smoothing factor for EMA
intervalLength time.Duration // Length of the interval
hostID string

mut sync.RWMutex
throughputs map[string]throughputReport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a generics.SetWithTTL here, which will avoid the explicit timeout checks?

clusterEMA uint
lastEMA uint // Previous EMA value
eventCount int // Internal count of events in the current interval
done chan struct{}
}

// NewEMAThroughputCalculator creates a new instance of EMAThroughputCalculator.
func (c *EMAThroughputCalculator) Start() error {
cfg := c.Config.GetAllSamplerRules().ThroughPutLimit
c.throughputLimit = uint(cfg.Limit)
c.done = make(chan struct{})

// if throughput limit is not set, disable the calculator
if c.throughputLimit == 0 {
return nil
}
c.intervalLength = time.Duration(cfg.AdjustmentInterval)
c.weight = cfg.Weight
c.lastEMA = 0

peerID, err := c.Peer.GetInstanceID()
if err != nil {
return err
}
c.hostID = peerID
c.throughputs = make(map[string]throughputReport)
// Subscribe to the throughput topic so we can react to throughput
// changes in the cluster.
c.Pubsub.Subscribe(context.Background(), stressReliefTopic, c.onThroughputUpdate)

go func() {
ticker := c.Clock.NewTicker(c.intervalLength)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only publish if the throughput is different from the previous calculation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

for {
select {
case <-c.done:
return
case <-ticker.Chan():
currentEMA := c.updateEMA()
c.Pubsub.Publish(context.Background(), emaThroughputTopic, newThroughputMessage(currentEMA, peerID).String())
}
}

}()

return nil
}

func (c *EMAThroughputCalculator) onThroughputUpdate(ctx context.Context, msg string) {
throughputMsg, err := unmarshalThroughputMessage(msg)
if err != nil {
return
}
c.mut.Lock()
c.throughputs[throughputMsg.peerID] = throughputReport{
key: throughputMsg.peerID,
throughput: throughputMsg.throughput,
timestamp: c.Clock.Now(),
}
c.mut.Unlock()
}

func (c *EMAThroughputCalculator) Stop() {
close(c.done)
}

// IncrementEventCount increments the internal event count by a specified amount.
func (c *EMAThroughputCalculator) IncrementEventCount(count int) {
c.mut.Lock()
c.eventCount += count
c.mut.Unlock()
}

// updateEMA calculates the current throughput and updates the EMA.
func (c *EMAThroughputCalculator) updateEMA() uint {
c.mut.Lock()
defer c.mut.Unlock()

currentThroughput := float64(c.eventCount) / c.intervalLength.Seconds()

c.lastEMA = uint(math.Ceil(c.weight*currentThroughput + (1-c.weight)*float64(c.lastEMA)))
report := throughputReport{
key: c.hostID,
throughput: c.lastEMA,
timestamp: c.Clock.Now(),
}
c.throughputs[report.key] = report
var clusterEMA uint

for _, report := range c.throughputs {
if c.Clock.Since(report.timestamp) > peer.PeerEntryTimeout {
delete(c.throughputs, report.key)
continue
}
// we don't want to include peers that are just starting up
if report.throughput == 0 {
continue
}
clusterEMA += report.throughput
}
c.clusterEMA = clusterEMA
c.eventCount = 0 // Reset the event count for the new interval

return c.lastEMA
}

// GetSamplingRateMultiplier calculates and returns a sampling rate multiplier
// based on the difference between the configured throughput limit and the current throughput.
func (c *EMAThroughputCalculator) GetSamplingRateMultiplier() float64 {
if c.throughputLimit == 0 {
return 1.0 // No limit set, so no adjustment needed
}

c.mut.RLock()
currentEMA := c.clusterEMA
c.mut.RUnlock()

if currentEMA <= c.throughputLimit {
return 1.0 // Throughput is within the limit, no adjustment needed
}

return float64(currentEMA) / float64(c.throughputLimit)
}

type throughputReport struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can be refactored to be a shared logic in both stress relief and throughput calculator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're quite similar.

Would it make sense to go even farther, and bundle the updates into the same messages? So the system maintains a map of named values that can be updated internally by each peer, and the peers send the map through pubsub?

key string
throughput uint
timestamp time.Time
}

type throughputMessage struct {
peerID string
throughput uint
}

func newThroughputMessage(throughput uint, peerID string) *throughputMessage {
return &throughputMessage{throughput: throughput, peerID: peerID}
}

func (msg *throughputMessage) String() string {
return msg.peerID + "|" + fmt.Sprint(msg.throughput)
}

func unmarshalThroughputMessage(msg string) (*throughputMessage, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives me an idea. Instead of taking a string for a message type, Pubsub could take a PubsubMessage, which would maybe just embed encoding.TextMarshaler and encoding.TextUnmarshaler.

That would kind of normalize the way we do these pack and unpack things for pubsub.

Or we could build a general-purpose PubsubMessage class that has the ability to add named fields.

if len(msg) < 2 {
return nil, fmt.Errorf("empty message")
}

parts := strings.SplitN(msg, "|", 2)
throughput, err := strconv.Atoi(parts[1])
if err != nil {
return nil, err
}

return newThroughputMessage(uint(throughput), parts[0]), nil
}
Loading
Loading