From b941785994626f5e9baebedb2a31fbbeca7b878e Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 11 Jun 2024 17:35:12 -0400 Subject: [PATCH] feat: add metadata to kept traces for state changes (#1195) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which problem is this PR solving? In order to understand how long it takes for a trace to go through each state of Refinery's state machine, we need to add instrumentation that gives us that insight. ## Short description of the changes - store the timestamp of state change for a trace on its trace status object - store each timestamp as metadata for kept traces --- centralstore/centralstore.go | 16 ++++---- centralstore/local_store.go | 6 +-- centralstore/redis_store.go | 61 +++++++++++++++++++++---------- centralstore/redis_store_test.go | 21 ++++++----- centralstore/smartwrapper.go | 2 +- collect/central_collector.go | 10 ++++- collect/central_collector_test.go | 7 +++- redis/redis.go | 6 --- 8 files changed, 81 insertions(+), 48 deletions(-) diff --git a/centralstore/centralstore.go b/centralstore/centralstore.go index ef51f76199..5e785546e1 100644 --- a/centralstore/centralstore.go +++ b/centralstore/centralstore.go @@ -73,10 +73,11 @@ type CentralTraceStatus struct { KeepReason string SamplerSelector string reasonIndex uint // this is the cache ID for the reason - Timestamp time.Time // this is the last time the trace state was changed - Count uint32 // number of spans in the trace - EventCount uint32 // number of span events in the trace - LinkCount uint32 // number of span links in the trace + LastTimestamp time.Time // this is the last time the trace state was changed + StateTimestamps map[CentralTraceState]time.Time + Count uint32 // number of spans in the trace + EventCount uint32 // number of span events in the trace + LinkCount uint32 // number of span links in the trace } // ensure that CentralTraceStatus implements KeptTrace @@ -92,10 +93,9 @@ func (s *CentralTraceStatus) Clone() *CentralTraceStatus { } type CentralTrace struct { - TraceID string - Timestamp uint64 - Root *CentralSpan - Spans []*CentralSpan + TraceID string + Root *CentralSpan + Spans []*CentralSpan } func (t *CentralTrace) ID() string { diff --git a/centralstore/local_store.go b/centralstore/local_store.go index 4d1a58f647..b75de75602 100644 --- a/centralstore/local_store.go +++ b/centralstore/local_store.go @@ -90,7 +90,7 @@ func (lrs *LocalStore) cleanup() { lrs.mutex.RLock() deletes := make([]string, 0) for traceID, status := range lrs.states[DecisionKeep] { - if lrs.Clock.Since(status.Timestamp) > cleanupTTL { + if lrs.Clock.Since(status.LastTimestamp) > cleanupTTL { deletes = append(deletes, traceID) } } @@ -141,7 +141,7 @@ func (lrs *LocalStore) changeTraceState(traceID string, fromState, toState Centr status.State = toState lrs.states[toState][traceID] = status - status.Timestamp = time.Now() + status.LastTimestamp = time.Now() delete(lrs.states[fromState], traceID) return true } @@ -191,7 +191,7 @@ spanLoop: lrs.states[Collecting][span.TraceID] = &CentralTraceStatus{ TraceID: span.TraceID, State: Collecting, - Timestamp: lrs.Clock.Now(), + LastTimestamp: lrs.Clock.Now(), Metadata: make(map[string]interface{}), SamplerSelector: span.samplerSelector, } diff --git a/centralstore/redis_store.go b/centralstore/redis_store.go index e03408fe8d..b27e1fb30e 100644 --- a/centralstore/redis_store.go +++ b/centralstore/redis_store.go @@ -466,15 +466,6 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra defer span.End() otelutil.AddSpanField(span, "num_traces", len(statuses)) - conn := r.RedisClient.Get() - defer conn.Close() - - // store keep reason in status - err := r.traces.keepTrace(ctx, conn, statuses) - if err != nil { - return err - } - traceIDs := make([]string, 0, len(statuses)) for _, status := range statuses { traceIDs = append(traceIDs, status.TraceID) @@ -484,6 +475,9 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra return nil } + conn := r.RedisClient.Get() + defer conn.Close() + succeed, err := r.states.toNextState(ctx, conn, newTraceStateChangeEvent(AwaitingDecision, DecisionKeep), traceIDs...) if err != nil { return err @@ -497,6 +491,12 @@ func (r *RedisBasicStore) KeepTraces(ctx context.Context, statuses []*CentralTra } } + // store keep reason in status + err = r.traces.keepTrace(ctx, conn, statuses) + if err != nil { + return err + } + // remove span list spanListKeys := make([]string, 0, len(traceIDs)) for _, traceID := range traceIDs { @@ -569,13 +569,18 @@ type centralTraceStatusReason struct { } type centralTraceStatusRedis struct { - TraceID string - State string - Count uint32 - EventCount uint32 - LinkCount uint32 - SamplerKey string - Timestamp int64 + TraceID string + State string + Count uint32 + EventCount uint32 + LinkCount uint32 + SamplerKey string + LastTimestamp int64 + // the redis key format should match with the string format of CentralTraceState + CollectingTimestamp int64 `redis:"collecting"` + DecisionDelayTimestamp int64 `redis:"decision_delay"` + ReadyToDecideTimestamp int64 `redis:"ready_to_decide"` + AwaitingDecisionTimestamp int64 `redis:"awaiting_decision"` KeepRecord []byte } @@ -593,6 +598,23 @@ func normalizeCentralTraceStatusRedis(status *centralTraceStatusRedis) (*Central reason.Metadata = make(map[string]interface{}) } + stateTimestamps := make(map[CentralTraceState]time.Time, 0) + if status.CollectingTimestamp != 0 { + stateTimestamps[Collecting] = time.UnixMicro(status.CollectingTimestamp) + } + + if status.DecisionDelayTimestamp != 0 { + stateTimestamps[DecisionDelay] = time.UnixMicro(status.DecisionDelayTimestamp) + } + + if status.ReadyToDecideTimestamp != 0 { + stateTimestamps[ReadyToDecide] = time.UnixMicro(status.ReadyToDecideTimestamp) + } + + if status.AwaitingDecisionTimestamp != 0 { + stateTimestamps[AwaitingDecision] = time.UnixMicro(status.AwaitingDecisionTimestamp) + } + return &CentralTraceStatus{ TraceID: status.TraceID, State: CentralTraceState(status.State), @@ -600,7 +622,8 @@ func normalizeCentralTraceStatusRedis(status *centralTraceStatusRedis) (*Central Count: status.Count, EventCount: status.EventCount, LinkCount: status.LinkCount, - Timestamp: time.UnixMicro(status.Timestamp), + LastTimestamp: time.UnixMicro(status.LastTimestamp), + StateTimestamps: stateTimestamps, KeepReason: reason.KeepReason, Rate: reason.Rate, Metadata: reason.Metadata, @@ -794,7 +817,7 @@ func (t *tracesStore) getTraceStatuses(ctx context.Context, client redis.Client, status := ¢ralTraceStatusRedis{} status.TraceID = traceIDs[i] status.State = Unknown.String() - status.Timestamp = t.clock.Now().UnixMicro() + status.LastTimestamp = t.clock.Now().UnixMicro() v, err := normalizeCentralTraceStatusRedis(status) if err != nil { @@ -1293,7 +1316,7 @@ const traceStateChangeScript = ` local removed = redis.call('ZREM', string.format("%s:traces", currentState), traceID) - local status = redis.call("HSET", string.format("%s:status", traceID), "State", nextState, "Timestamp", timestamp) + local status = redis.call("HSET", string.format("%s:status", traceID), "State", nextState, "LastTimestamp", timestamp, nextState, timestamp) ` const validStateChangeEventsKey = "valid-state-change-events" diff --git a/centralstore/redis_store_test.go b/centralstore/redis_store_test.go index 080ad8c1fa..bdcfc75b1a 100644 --- a/centralstore/redis_store_test.go +++ b/centralstore/redis_store_test.go @@ -105,13 +105,16 @@ func TestRedisBasicStore_TraceStatus(t *testing.T) { require.NotNil(t, status[0]) assert.Equal(t, tc.expectedState, status[0].State) if i == 0 { - initialTimestamp = status[0].Timestamp + initialTimestamp = status[0].LastTimestamp } - assert.NotNil(t, status[0].Timestamp) + assert.NotNil(t, status[0].LastTimestamp) if i > 0 { - assert.Equal(t, initialTimestamp, status[0].Timestamp) + assert.Equal(t, initialTimestamp, status[0].LastTimestamp) } + assert.NotNil(t, status[0].StateTimestamps) + assert.NotEmpty(t, status[0].StateTimestamps[tc.expectedState]) + assert.Equal(t, tc.expectedEventCount, status[0].SpanEventCount()) assert.Equal(t, tc.expectedLinkCount, status[0].SpanLinkCount()) assert.Equal(t, tc.expectedSpanCount, status[0].SpanCount()) @@ -274,7 +277,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) { require.NoError(t, err) require.Len(t, waitingStatus, 1) assert.Equal(t, DecisionDelay, waitingStatus[0].State) - assert.True(t, waitingStatus[0].Timestamp.After(collectingStatus[0].Timestamp)) + assert.True(t, waitingStatus[0].LastTimestamp.After(collectingStatus[0].LastTimestamp)) store.clock.Advance(time.Duration(1 * time.Second)) require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, DecisionDelay, ReadyToDecide)) @@ -283,7 +286,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) { require.NoError(t, err) require.Len(t, readyStatus, 1) assert.Equal(t, ReadyToDecide, readyStatus[0].State) - assert.True(t, readyStatus[0].Timestamp.After(waitingStatus[0].Timestamp)) + assert.True(t, readyStatus[0].LastTimestamp.After(waitingStatus[0].LastTimestamp)) store.clock.Advance(time.Duration(1 * time.Second)) require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, ReadyToDecide, AwaitingDecision)) @@ -292,7 +295,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) { require.NoError(t, err) require.Len(t, awaitingStatus, 1) assert.Equal(t, AwaitingDecision, awaitingStatus[0].State) - assert.True(t, awaitingStatus[0].Timestamp.After(readyStatus[0].Timestamp)) + assert.True(t, awaitingStatus[0].LastTimestamp.After(readyStatus[0].LastTimestamp)) store.clock.Advance(time.Duration(1 * time.Second)) require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, AwaitingDecision, ReadyToDecide)) @@ -301,7 +304,7 @@ func TestRedisBasicStore_ChangeTraceStatus(t *testing.T) { require.NoError(t, err) require.Len(t, readyStatus, 1) assert.Equal(t, ReadyToDecide, readyStatus[0].State) - assert.True(t, readyStatus[0].Timestamp.After(awaitingStatus[0].Timestamp)) + assert.True(t, readyStatus[0].LastTimestamp.After(awaitingStatus[0].LastTimestamp)) store.clock.Advance(time.Duration(1 * time.Second)) require.NoError(t, store.ChangeTraceStatus(ctx, []string{span.TraceID}, ReadyToDecide, AwaitingDecision)) @@ -398,7 +401,7 @@ func TestRedisBasicStore_ConcurrentStateChange(t *testing.T) { require.NoError(t, err) require.Len(t, status, 1) require.Equal(t, Collecting, status[0].State) - initialTimestamp := status[0].Timestamp + initialTimestamp := status[0].LastTimestamp store.clock.Advance(1 * time.Second) var wg sync.WaitGroup @@ -438,7 +441,7 @@ func TestRedisBasicStore_ConcurrentStateChange(t *testing.T) { require.NoError(t, err) require.Len(t, status, 1) assert.Equal(t, AwaitingDecision, status[0].State) - assert.True(t, status[0].Timestamp.After(initialTimestamp)) + assert.True(t, status[0].LastTimestamp.After(initialTimestamp)) } func TestRedisBasicStore_Cleanup(t *testing.T) { diff --git a/centralstore/smartwrapper.go b/centralstore/smartwrapper.go index ea183e40d8..99b6737f60 100644 --- a/centralstore/smartwrapper.go +++ b/centralstore/smartwrapper.go @@ -174,7 +174,7 @@ func (w *SmartWrapper) manageTimeouts(ctx context.Context, timeout time.Duration } traceIDsToChange := make([]string, 0) for _, status := range statuses { - if !status.Timestamp.IsZero() && w.Clock.Since(status.Timestamp) > timeout { + if !status.LastTimestamp.IsZero() && w.Clock.Since(status.LastTimestamp) > timeout { if status.TraceID == "" { w.Logger.Warn().Logf("Attempted to change state from %s to %s of empty trace id", fromState, toState) } else { diff --git a/collect/central_collector.go b/collect/central_collector.go index 27744e9c76..475f52dea8 100644 --- a/collect/central_collector.go +++ b/collect/central_collector.go @@ -1177,7 +1177,7 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) { reason = status.KeepReason } var sendReason string - if sp.ArrivalTime.After(status.Timestamp) { + if sp.ArrivalTime.After(status.LastTimestamp) { if reason == "" { reason = "late arriving span" } else { @@ -1211,6 +1211,14 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) { sp.Data[k] = v } + stateDurPrefix := "meta.refinery.since_prev_state_ms." + sp.Data["meta.refinery.trace_arrival_micro"] = trace.ArrivalTime.UnixMicro() + sp.Data[stateDurPrefix+centralstore.Collecting.String()] = status.StateTimestamps[centralstore.Collecting].Sub(trace.ArrivalTime).Milliseconds() + sp.Data[stateDurPrefix+centralstore.DecisionDelay.String()] = status.StateTimestamps[centralstore.DecisionDelay].Sub(status.StateTimestamps[centralstore.Collecting]).Milliseconds() + sp.Data[stateDurPrefix+centralstore.ReadyToDecide.String()] = status.StateTimestamps[centralstore.ReadyToDecide].Sub(status.StateTimestamps[centralstore.DecisionDelay]).Milliseconds() + sp.Data[stateDurPrefix+centralstore.AwaitingDecision.String()] = status.StateTimestamps[centralstore.AwaitingDecision].Sub(status.StateTimestamps[centralstore.ReadyToDecide]).Milliseconds() + sp.Data[stateDurPrefix+centralstore.DecisionKeep.String()] = status.LastTimestamp.Sub(status.StateTimestamps[centralstore.AwaitingDecision]).Milliseconds() + if c.hostname != "" && c.Config.GetAddHostMetadataToTrace() { sp.Data["meta.refinery.sender.host.name"] = c.hostname } diff --git a/collect/central_collector_test.go b/collect/central_collector_test.go index 3c900b64d2..2b5a9181b2 100644 --- a/collect/central_collector_test.go +++ b/collect/central_collector_test.go @@ -119,7 +119,7 @@ func TestCentralCollector_AddSpan(t *testing.T) { } } -func TestCentralCollector_ProcessTraces(t *testing.T) { +func TestCentralCollector_Sender(t *testing.T) { for _, storeType := range storeTypes { t.Run(storeType, func(t *testing.T) { conf := &config.MockConfig{ @@ -190,6 +190,11 @@ func TestCentralCollector_ProcessTraces(t *testing.T) { assert.Equal(t, "test", transmission.Events[0].Environment) assert.Equal(t, TraceSendGotRoot, transmission.Events[0].Data["meta.refinery.send_reason"]) assert.Equal(t, "deterministic/always", transmission.Events[0].Data["meta.refinery.reason"]) + assert.GreaterOrEqual(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.collecting"], int64(0), transmission.Events[0].Data["meta.refinery.since_prev_state_ms.collecting"]) + assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.decision_delay"]) + assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.ready_to_decide"]) + assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.awaiting_decision"]) + assert.NotEmpty(t, transmission.Events[0].Data["meta.refinery.since_prev_state_ms.decision_keep"]) transmission.Mux.RUnlock() }, 5*time.Second, 100*time.Millisecond) }) diff --git a/redis/redis.go b/redis/redis.go index da1e8d8491..14d3d53f33 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -731,12 +731,6 @@ func (c *DefaultConn) receive(n int, converter func(reply any, err error) error) } func (c *DefaultConn) Do(commandString string, args ...any) (any, error) { - now := c.Clock.Now() - defer func() { - duration := c.Clock.Since(now) - c.metrics.Histogram("redis_request_latency", duration) - }() - return c.conn.Do(commandString, args...) }