Skip to content

Commit

Permalink
fix: decrease network traffic from redis (#1149)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

Turns out transmitting trace IDs back and forth between redis and
refinery is very expensive.

## Short description of the changes

- Use `ZCOUNT` to get the amount of traces in a given state list instead
of `ZRANGE` so that we are only returning a integer value from redis
- Use lua script for deleting expired traces in state lists to avoid
returning trace IDs from redis to refinery during cleanup
  • Loading branch information
VinozzZ authored May 16, 2024
1 parent b735ab4 commit 283066e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 73 deletions.
81 changes: 31 additions & 50 deletions centralstore/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ func (r *RedisBasicStore) Start() error {

stateProcessorCfg := traceStateProcessorConfig{
reaperRunInterval: time.Duration(opt.ReaperRunInterval),
reaperBatchSize: opt.ReaperBatchSize,
maxTraceRetention: time.Duration(opt.TraceTimeout * 10),
changeState: r.RedisClient.NewScript(stateChangeKey, stateChangeScript),
getTraceNeedingDecision: r.RedisClient.NewScript(tracesNeedingDecisionScriptKey, tracesNeedingDecisionScript),
removeExpiredTraces: r.RedisClient.NewScript(removeExpiredTracesKey, removeExpiredTracesScript),
}

stateProcessor := newTraceStateProcessor(stateProcessorCfg, r.Clock, r.Tracer)
Expand Down Expand Up @@ -158,11 +160,11 @@ func (r *RedisBasicStore) RecordMetrics(ctx context.Context) error {

for _, state := range r.states.states {
// get the state counts
traceIDs, err := r.states.traceIDsByState(ctx, conn, state, time.Time{}, time.Time{}, -1)
count, err := conn.ZCount(r.states.stateNameKey(state), 0, -1)
if err != nil {
return err
}
r.Metrics.Gauge(metricsPrefixCount+string(state), len(traceIDs))
r.Metrics.Gauge(metricsPrefixCount+string(state), count)
}

count, err := r.traces.count(ctx, conn)
Expand Down Expand Up @@ -970,10 +972,13 @@ func addToSpanHash(span *CentralSpan) (redis.Command, error) {
// on it's current state.

type traceStateProcessorConfig struct {
reaperRunInterval time.Duration
maxTraceRetention time.Duration
reaperRunInterval time.Duration
reaperBatchSize int
maxTraceRetention time.Duration

changeState redis.Script
getTraceNeedingDecision redis.Script
removeExpiredTraces redis.Script
}

type traceStateProcessor struct {
Expand All @@ -989,6 +994,9 @@ func newTraceStateProcessor(cfg traceStateProcessorConfig, clock clockwork.Clock
if cfg.reaperRunInterval == 0 {
cfg.reaperRunInterval = 10 * time.Second
}
if cfg.reaperBatchSize == 0 {
cfg.reaperBatchSize = 500
}
if cfg.maxTraceRetention < defaultTraceRetention {
cfg.maxTraceRetention = defaultTraceRetention
}
Expand Down Expand Up @@ -1071,40 +1079,6 @@ func (t *traceStateProcessor) randomTraceIDsByState(ctx context.Context, conn re
return ids, err
}

// traceIDsByState returns the traceIDs that are in a given trace state.
// If startTime is not zero, it will return traceIDs that have been in the state since startTime.
// If endTime is not zero, it will return traceIDs that have been in the state until endTime.
// If n is not zero, it will return at most n traceIDs.
func (t *traceStateProcessor) traceIDsByState(ctx context.Context, conn redis.Conn, state CentralTraceState, startTime time.Time, endTime time.Time, n int) ([]string, error) {
_, span := t.tracer.Start(ctx, "traceIDsByState")
defer span.End()

start := startTime.UnixMicro()
if startTime.IsZero() {
start = 0
}

end := endTime.UnixMicro()
if endTime.IsZero() {
end = 0
}

results, err := conn.ZRangeByScoreString(t.stateNameKey(state), start, end, n, 0)
if err != nil {
span.RecordError(err)
return nil, err
}
otelutil.AddSpanFields(span, map[string]interface{}{
"cmd": "ZRANGEBYSCORE",
"state": state,
"num_ids": len(results),
"start": start,
"end": end,
"n": n,
})
return results, nil
}

func (t *traceStateProcessor) exists(ctx context.Context, conn redis.Conn, state CentralTraceState, traceID string) bool {
_, span := otelutil.StartSpanMulti(ctx, t.tracer, "exists", map[string]interface{}{
"trace_id": traceID,
Expand Down Expand Up @@ -1171,6 +1145,7 @@ func (t *traceStateProcessor) cleanupExpiredTraces(redis redis.Client) {
case <-ticker.Chan():
// cannot defer here!
ctx, span := t.tracer.Start(context.Background(), "cleanupExpiredTraces")
otelutil.AddSpanField(span, "interval", t.config.reaperRunInterval.String())
t.removeExpiredTraces(ctx, redis)
span.End()
}
Expand All @@ -1180,7 +1155,6 @@ func (t *traceStateProcessor) cleanupExpiredTraces(redis redis.Client) {
func (t *traceStateProcessor) removeExpiredTraces(ctx context.Context, client redis.Client) {
ctx, span := otelutil.StartSpanMulti(ctx, t.tracer, "removeExpiredTraces", map[string]interface{}{
"num_states": len(t.states),
"cmd": "ZRANGEBYSCORE",
})
defer span.End()

Expand All @@ -1189,22 +1163,16 @@ func (t *traceStateProcessor) removeExpiredTraces(ctx context.Context, client re

// get the traceIDs that have been in the state for longer than the expiration time
for _, state := range t.states {
traceIDs, err := conn.ZRangeByScoreString(t.stateNameKey(state), 0, t.clock.Now().Add(-t.config.maxTraceRetention).UnixMicro(), defaultReaperBatchSize, 0)
if err != nil {
span.RecordError(err)
otelutil.AddSpanFields(span, map[string]interface{}{
"state": state,
"error": err.Error(),
})
return
}
replies, err := t.config.removeExpiredTraces.DoInt(ctx, conn, t.stateNameKey(state),
t.clock.Now().Add(-t.config.maxTraceRetention).UnixMicro(),
t.config.reaperBatchSize)

// remove the traceIDs from the state map
err = t.remove(ctx, conn, state, traceIDs...)
if err != nil {
span.RecordError(err)
continue
}

otelutil.AddSpanField(span, state.String(), replies)
}

}
Expand Down Expand Up @@ -1430,3 +1398,16 @@ const keepTraceScript = `
return 1
`

const removeExpiredTracesKey = 1
const removeExpiredTracesScript = `
local stateKey = KEYS[1]
local expirationTime = ARGV[1]
local batchSize = ARGV[2]
local traces = redis.call('ZRANGE', stateKey,
"-inf", expirationTime, "byscore", "limit", 0, batchSize)
local result = redis.call('ZREM', stateKey, unpack(traces))
return result
`
4 changes: 3 additions & 1 deletion centralstore/redis_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,9 @@ func newTestTraceStateProcessor(_ *testing.T, redisClient redis.Client, clock cl
}
ts := &testTraceStateProcessor{
traceStateProcessor: newTraceStateProcessor(traceStateProcessorConfig{
changeState: redisClient.NewScript(stateChangeKey, stateChangeScript),
changeState: redisClient.NewScript(stateChangeKey, stateChangeScript),
getTraceNeedingDecision: redisClient.NewScript(tracesNeedingDecisionScriptKey, tracesNeedingDecisionScript),
removeExpiredTraces: redisClient.NewScript(removeExpiredTracesKey, removeExpiredTracesScript),
}, clock, tracer),
clock: clock,
}
Expand Down
3 changes: 2 additions & 1 deletion config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ type SmartWrapperOptions struct {
SendDelay Duration `yaml:"SendDelay" default:"2s"`
TraceTimeout Duration `yaml:"TraceTimeout" default:"60s"`
DecisionTimeout Duration `yaml:"DecisionTimeout" default:"3s"`
ReaperRunInterval Duration `yaml:"ReaperRunInterval" default:"1h"`
ReaperRunInterval Duration `yaml:"ReaperRunInterval" default:"10s"`
ReaperBatchSize int `yaml:"ReaperBatchSize" default:"500"`
}

func (c CollectionConfig) GetShutdownDelay() time.Duration {
Expand Down
20 changes: 15 additions & 5 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ groups:
After the trace decision has been made, Refinery retains a record of
that decision for a period of time. When additional spans (including
the root span) arrive, they will be kept or dropped based on the
original decision.
original decision.
When running Refinery with Redis as the central store, this timer is
also used to determine how long to keep trace decisions in Redis.
Expand Down Expand Up @@ -1105,7 +1105,6 @@ groups:
high-throughput environments. By setting this value to 0, the metrics
retrieval will be disabled.
- name: Strategy
v1group: PeerManagement
v1name: Strategy
Expand Down Expand Up @@ -1958,7 +1957,7 @@ groups:
type: duration
valuetype: nondefault
default: 3s
reload: true
reload: false
summary: is the time the central store will wait to receive a trace decision from Refinery
description: >
This value controls how long the central store will wait for a trace
Expand All @@ -1970,9 +1969,20 @@ groups:
firstVersion: v2.6
type: duration
valuetype: nondefault
default: 1h
reload: true
default: 10s
reload: false
summary: is the interval at which the reaper runs to clean up expired traces.
description: >
This value determines how often the reaper runs to clean up expired traces
in the central store. TODO: don't expose this?
- name: ReaperBatchSize
firstVersion: v3.0
type: duration
valuetype: nondefault
default: 500
reload: false
summary: is the maximum number of traces to be included for deleting expired traces in a single request.
description: >
This value determines how many traces the reaper will delete in a single
request.
37 changes: 21 additions & 16 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Script interface {
Load(conn Conn) error
Do(ctx context.Context, conn Conn, keysAndArgs ...any) (any, error)
DoStrings(ctx context.Context, conn Conn, keysAndArgs ...any) ([]string, error)
DoInt(ctx context.Context, conn Conn, keysAndArgs ...any) (int, error)
SendHash(ctx context.Context, conn Conn, keysAndArgs ...any) error
Send(ctx context.Context, conn Conn, keysAndArgs ...any) error
}
Expand Down Expand Up @@ -83,13 +84,13 @@ type Conn interface {

ZAdd(string, []any) error
ZRange(string, int, int) ([]string, error)
ZRangeByScoreString(key string, minScore int64, maxScore int64, count, offset int) ([]string, error)
ZScore(string, string) (int64, error)
ZMScore(string, []string) ([]int64, error)
ZCard(string) (int64, error)
ZExist(string, string) (bool, error)
ZRemove(string, []string) error
ZRandom(string, int) ([]string, error)
ZCount(string, int64, int64) (int64, error)
TTL(string) (int64, error)

ReceiveStrings(int) ([]string, error)
Expand Down Expand Up @@ -661,21 +662,6 @@ func (c *DefaultConn) ZRange(key string, start, stop int) ([]string, error) {
return redis.Strings(c.conn.Do("ZRANGE", key, start, stop))
}

func (c *DefaultConn) ZRangeByScoreString(key string, minScore int64, maxScore int64, count, offset int) ([]string, error) {
start := strconv.FormatInt(minScore, 10)
if minScore == 0 {
start = "-inf"
}
stop := strconv.FormatInt(maxScore, 10)
if maxScore == 0 {
stop = "+inf"
}

// return all members with scores between start and stop excluding stop
// "(" is used to exclude the stop value
return redis.Strings(c.conn.Do("ZRANGE", key, start, "("+stop, "BYSCORE", "LIMIT", offset, count))
}

func (c *DefaultConn) ZScore(key string, member string) (int64, error) {
return redis.Int64(c.conn.Do("ZSCORE", key, member))
}
Expand Down Expand Up @@ -911,6 +897,12 @@ func (s *DefaultScript) DoStrings(ctx context.Context, conn Conn, keysAndArgs ..
return redis.Strings(result, err)
}

func (s *DefaultScript) DoInt(ctx context.Context, conn Conn, keysAndArgs ...any) (int, error) {
defaultConn := conn.(*DefaultConn)
result, err := s.script.Do(defaultConn.conn, keysAndArgs...)
return redis.Int(result, err)
}

func (s *DefaultScript) Do(ctx context.Context, conn Conn, keysAndArgs ...any) (any, error) {
defaultConn := conn.(*DefaultConn)
return s.script.DoContext(ctx, defaultConn.conn, keysAndArgs...)
Expand Down Expand Up @@ -1002,6 +994,19 @@ func NewGetHashCommand(key string, field string) command {
}
}

func (c *DefaultConn) ZCount(key string, start int64, stop int64) (int64, error) {
startArg := strconv.FormatInt(start, 10)
stopArg := strconv.FormatInt(stop, 10)
if start == 0 {
startArg = "-inf"
}

if stop == -1 {
stopArg = "+inf"
}
return redis.Int64(c.conn.Do("ZCOUNT", key, startArg, stopArg))
}

func (c *DefaultConn) RPushTTL(key string, member string, expiration time.Duration) (bool, error) {
if err := c.conn.Send("MULTI"); err != nil {
return false, err
Expand Down

0 comments on commit 283066e

Please sign in to comment.