Skip to content

Commit

Permalink
feat: add metadata to kept traces for state changes (#1195)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

In order to understand how long it takes for a trace to go through each
state of Refinery's state machine, we need to add instrumentation that
gives us that insight.

## Short description of the changes

- store the timestamp of state change for a trace on its trace status
object
- store each timestamp as metadata for kept traces
  • Loading branch information
VinozzZ authored Jun 11, 2024
1 parent 1d3e19a commit b941785
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 48 deletions.
16 changes: 8 additions & 8 deletions centralstore/centralstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ type CentralTraceStatus struct {
KeepReason string
SamplerSelector string
reasonIndex uint // this is the cache ID for the reason
Timestamp time.Time // this is the last time the trace state was changed
Count uint32 // number of spans in the trace
EventCount uint32 // number of span events in the trace
LinkCount uint32 // number of span links in the trace
LastTimestamp time.Time // this is the last time the trace state was changed
StateTimestamps map[CentralTraceState]time.Time
Count uint32 // number of spans in the trace
EventCount uint32 // number of span events in the trace
LinkCount uint32 // number of span links in the trace
}

// ensure that CentralTraceStatus implements KeptTrace
Expand All @@ -92,10 +93,9 @@ func (s *CentralTraceStatus) Clone() *CentralTraceStatus {
}

type CentralTrace struct {
TraceID string
Timestamp uint64
Root *CentralSpan
Spans []*CentralSpan
TraceID string
Root *CentralSpan
Spans []*CentralSpan
}

func (t *CentralTrace) ID() string {
Expand Down
6 changes: 3 additions & 3 deletions centralstore/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (lrs *LocalStore) cleanup() {
lrs.mutex.RLock()
deletes := make([]string, 0)
for traceID, status := range lrs.states[DecisionKeep] {
if lrs.Clock.Since(status.Timestamp) > cleanupTTL {
if lrs.Clock.Since(status.LastTimestamp) > cleanupTTL {
deletes = append(deletes, traceID)
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (lrs *LocalStore) changeTraceState(traceID string, fromState, toState Centr

status.State = toState
lrs.states[toState][traceID] = status
status.Timestamp = time.Now()
status.LastTimestamp = time.Now()
delete(lrs.states[fromState], traceID)
return true
}
Expand Down Expand Up @@ -191,7 +191,7 @@ spanLoop:
lrs.states[Collecting][span.TraceID] = &CentralTraceStatus{
TraceID: span.TraceID,
State: Collecting,
Timestamp: lrs.Clock.Now(),
LastTimestamp: lrs.Clock.Now(),
Metadata: make(map[string]interface{}),
SamplerSelector: span.samplerSelector,
}
Expand Down
61 changes: 42 additions & 19 deletions centralstore/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,6 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra
defer span.End()

otelutil.AddSpanField(span, "num_traces", len(statuses))
conn := r.RedisClient.Get()
defer conn.Close()

// store keep reason in status
err := r.traces.keepTrace(ctx, conn, statuses)
if err != nil {
return err
}

traceIDs := make([]string, 0, len(statuses))
for _, status := range statuses {
traceIDs = append(traceIDs, status.TraceID)
Expand All @@ -484,6 +475,9 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra
return nil
}

conn := r.RedisClient.Get()
defer conn.Close()

succeed, err := r.states.toNextState(ctx, conn, newTraceStateChangeEvent(AwaitingDecision, DecisionKeep), traceIDs...)
if err != nil {
return err
Expand All @@ -497,6 +491,12 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra
}
}

// store keep reason in status
err = r.traces.keepTrace(ctx, conn, statuses)
if err != nil {
return err
}

// remove span list
spanListKeys := make([]string, 0, len(traceIDs))
for _, traceID := range traceIDs {
Expand Down Expand Up @@ -569,13 +569,18 @@ type centralTraceStatusReason struct {
}

type centralTraceStatusRedis struct {
TraceID string
State string
Count uint32
EventCount uint32
LinkCount uint32
SamplerKey string
Timestamp int64
TraceID string
State string
Count uint32
EventCount uint32
LinkCount uint32
SamplerKey string
LastTimestamp int64
// the redis key format should match with the string format of CentralTraceState
CollectingTimestamp int64 `redis:"collecting"`
DecisionDelayTimestamp int64 `redis:"decision_delay"`
ReadyToDecideTimestamp int64 `redis:"ready_to_decide"`
AwaitingDecisionTimestamp int64 `redis:"awaiting_decision"`

KeepRecord []byte
}
Expand All @@ -593,14 +598,32 @@ func normalizeCentralTraceStatusRedis(status *centralTraceStatusRedis) (*Central
reason.Metadata = make(map[string]interface{})
}

stateTimestamps := make(map[CentralTraceState]time.Time, 0)
if status.CollectingTimestamp != 0 {
stateTimestamps[Collecting] = time.UnixMicro(status.CollectingTimestamp)
}

if status.DecisionDelayTimestamp != 0 {
stateTimestamps[DecisionDelay] = time.UnixMicro(status.DecisionDelayTimestamp)
}

if status.ReadyToDecideTimestamp != 0 {
stateTimestamps[ReadyToDecide] = time.UnixMicro(status.ReadyToDecideTimestamp)
}

if status.AwaitingDecisionTimestamp != 0 {
stateTimestamps[AwaitingDecision] = time.UnixMicro(status.AwaitingDecisionTimestamp)
}

return &CentralTraceStatus{
TraceID: status.TraceID,
State: CentralTraceState(status.State),
SamplerSelector: status.SamplerKey,
Count: status.Count,
EventCount: status.EventCount,
LinkCount: status.LinkCount,
Timestamp: time.UnixMicro(status.Timestamp),
LastTimestamp: time.UnixMicro(status.LastTimestamp),
StateTimestamps: stateTimestamps,
KeepReason: reason.KeepReason,
Rate: reason.Rate,
Metadata: reason.Metadata,
Expand Down Expand Up @@ -794,7 +817,7 @@ func (t *tracesStore) getTraceStatuses(ctx context.Context, client redis.Client,
status := &centralTraceStatusRedis{}
status.TraceID = traceIDs[i]
status.State = Unknown.String()
status.Timestamp = t.clock.Now().UnixMicro()
status.LastTimestamp = t.clock.Now().UnixMicro()

v, err := normalizeCentralTraceStatusRedis(status)
if err != nil {
Expand Down Expand Up @@ -1293,7 +1316,7 @@ const traceStateChangeScript = `
local removed = redis.call('ZREM', string.format("%s:traces", currentState), traceID)
local status = redis.call("HSET", string.format("%s:status", traceID), "State", nextState, "Timestamp", timestamp)
local status = redis.call("HSET", string.format("%s:status", traceID), "State", nextState, "LastTimestamp", timestamp, nextState, timestamp)
`

const validStateChangeEventsKey = "valid-state-change-events"
Expand Down
21 changes: 12 additions & 9 deletions centralstore/redis_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,16 @@ func TestRedisBasicStore_TraceStatus(t *testing.T) {
require.NotNil(t, status[0])
assert.Equal(t, tc.expectedState, status[0].State)
if i == 0 {
initialTimestamp = status[0].Timestamp
initialTimestamp = status[0].LastTimestamp
}
assert.NotNil(t, status[0].Timestamp)
assert.NotNil(t, status[0].LastTimestamp)
if i > 0 {
assert.Equal(t, initialTimestamp, status[0].Timestamp)
assert.Equal(t, initialTimestamp, status[0].LastTimestamp)
}

assert.NotNil(t, status[0].StateTimestamps)
assert.NotEmpty(t, status[0].StateTimestamps[tc.expectedState])

assert.Equal(t, tc.expectedEventCount, status[0].SpanEventCount())
assert.Equal(t, tc.expectedLinkCount, status[0].SpanLinkCount())
assert.Equal(t, tc.expectedSpanCount, status[0].SpanCount())
Expand Down Expand Up @@ -274,7 +277,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) {
require.NoError(t, err)
require.Len(t, waitingStatus, 1)
assert.Equal(t, DecisionDelay, waitingStatus[0].State)
assert.True(t, waitingStatus[0].Timestamp.After(collectingStatus[0].Timestamp))
assert.True(t, waitingStatus[0].LastTimestamp.After(collectingStatus[0].LastTimestamp))

store.clock.Advance(time.Duration(1 * time.Second))
require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, DecisionDelay, ReadyToDecide))
Expand All @@ -283,7 +286,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) {
require.NoError(t, err)
require.Len(t, readyStatus, 1)
assert.Equal(t, ReadyToDecide, readyStatus[0].State)
assert.True(t, readyStatus[0].Timestamp.After(waitingStatus[0].Timestamp))
assert.True(t, readyStatus[0].LastTimestamp.After(waitingStatus[0].LastTimestamp))

store.clock.Advance(time.Duration(1 * time.Second))
require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, ReadyToDecide, AwaitingDecision))
Expand All @@ -292,7 +295,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) {
require.NoError(t, err)
require.Len(t, awaitingStatus, 1)
assert.Equal(t, AwaitingDecision, awaitingStatus[0].State)
assert.True(t, awaitingStatus[0].Timestamp.After(readyStatus[0].Timestamp))
assert.True(t, awaitingStatus[0].LastTimestamp.After(readyStatus[0].LastTimestamp))

store.clock.Advance(time.Duration(1 * time.Second))
require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, AwaitingDecision, ReadyToDecide))
Expand All @@ -301,7 +304,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) {
require.NoError(t, err)
require.Len(t, readyStatus, 1)
assert.Equal(t, ReadyToDecide, readyStatus[0].State)
assert.True(t, readyStatus[0].Timestamp.After(awaitingStatus[0].Timestamp))
assert.True(t, readyStatus[0].LastTimestamp.After(awaitingStatus[0].LastTimestamp))

store.clock.Advance(time.Duration(1 * time.Second))
require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, ReadyToDecide, AwaitingDecision))
Expand Down Expand Up @@ -398,7 +401,7 @@ func TestRedisBasicStore_ConcurrentStateChange(t *testing.T) {
require.NoError(t, err)
require.Len(t, status, 1)
require.Equal(t, Collecting, status[0].State)
initialTimestamp := status[0].Timestamp
initialTimestamp := status[0].LastTimestamp

store.clock.Advance(1 * time.Second)
var wg sync.WaitGroup
Expand Down Expand Up @@ -438,7 +441,7 @@ func TestRedisBasicStore_ConcurrentStateChange(t *testing.T) {
require.NoError(t, err)
require.Len(t, status, 1)
assert.Equal(t, AwaitingDecision, status[0].State)
assert.True(t, status[0].Timestamp.After(initialTimestamp))
assert.True(t, status[0].LastTimestamp.After(initialTimestamp))
}

func TestRedisBasicStore_Cleanup(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion centralstore/smartwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (w *SmartWrapper) manageTimeouts(ctx context.Context, timeout time.Duration
}
traceIDsToChange := make([]string, 0)
for _, status := range statuses {
if !status.Timestamp.IsZero() && w.Clock.Since(status.Timestamp) > timeout {
if !status.LastTimestamp.IsZero() && w.Clock.Since(status.LastTimestamp) > timeout {
if status.TraceID == "" {
w.Logger.Warn().Logf("Attempted to change state from %s to %s of empty trace id", fromState, toState)
} else {
Expand Down
10 changes: 9 additions & 1 deletion collect/central_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) {
reason = status.KeepReason
}
var sendReason string
if sp.ArrivalTime.After(status.Timestamp) {
if sp.ArrivalTime.After(status.LastTimestamp) {
if reason == "" {
reason = "late arriving span"
} else {
Expand Down Expand Up @@ -1211,6 +1211,14 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) {
sp.Data[k] = v
}

stateDurPrefix := "meta.refinery.since_prev_state_ms."
sp.Data["meta.refinery.trace_arrival_micro"] = trace.ArrivalTime.UnixMicro()
sp.Data[stateDurPrefix+centralstore.Collecting.String()] = status.StateTimestamps[centralstore.Collecting].Sub(trace.ArrivalTime).Milliseconds()
sp.Data[stateDurPrefix+centralstore.DecisionDelay.String()] = status.StateTimestamps[centralstore.DecisionDelay].Sub(status.StateTimestamps[centralstore.Collecting]).Milliseconds()
sp.Data[stateDurPrefix+centralstore.ReadyToDecide.String()] = status.StateTimestamps[centralstore.ReadyToDecide].Sub(status.StateTimestamps[centralstore.DecisionDelay]).Milliseconds()
sp.Data[stateDurPrefix+centralstore.AwaitingDecision.String()] = status.StateTimestamps[centralstore.AwaitingDecision].Sub(status.StateTimestamps[centralstore.ReadyToDecide]).Milliseconds()
sp.Data[stateDurPrefix+centralstore.DecisionKeep.String()] = status.LastTimestamp.Sub(status.StateTimestamps[centralstore.AwaitingDecision]).Milliseconds()

if c.hostname != "" && c.Config.GetAddHostMetadataToTrace() {
sp.Data["meta.refinery.sender.host.name"] = c.hostname
}
Expand Down
7 changes: 6 additions & 1 deletion collect/central_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCentralCollector_AddSpan(t *testing.T) {
}
}

func TestCentralCollector_ProcessTraces(t *testing.T) {
func TestCentralCollector_Sender(t *testing.T) {
for _, storeType := range storeTypes {
t.Run(storeType, func(t *testing.T) {
conf := &config.MockConfig{
Expand Down Expand Up @@ -190,6 +190,11 @@ func TestCentralCollector_ProcessTraces(t *testing.T) {
assert.Equal(t, "test", transmission.Events[0].Environment)
assert.Equal(t, TraceSendGotRoot, transmission.Events[0].Data["meta.refinery.send_reason"])
assert.Equal(t, "deterministic/always", transmission.Events[0].Data["meta.refinery.reason"])
assert.GreaterOrEqual(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.collecting"], int64(0), transmission.Events[0].Data["meta.refinery.since_prev_state_ms.collecting"])
assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.decision_delay"])
assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.ready_to_decide"])
assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.awaiting_decision"])
assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.decision_keep"])
transmission.Mux.RUnlock()
}, 5*time.Second, 100*time.Millisecond)
})
Expand Down
6 changes: 0 additions & 6 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,6 @@ func (c *DefaultConn) receive(n int, converter func(reply any, err error) error)
}

func (c *DefaultConn) Do(commandString string, args ...any) (any, error) {
now := c.Clock.Now()
defer func() {
duration := c.Clock.Since(now)
c.metrics.Histogram("redis_request_latency", duration)
}()

return c.conn.Do(commandString, args...)
}

Expand Down

0 comments on commit b941785

Please sign in to comment.