Skip to content

Commit

Permalink
fix: make sure send_reason is attached to span metadata (#1133)
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ authored May 8, 2024
1 parent 5e285b8 commit 1351cd8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 19 deletions.
5 changes: 5 additions & 0 deletions centralstore/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ func (t *tracesStore) addStatuses(ctx context.Context, conn redis.Conn, cspans [

commands := make([]redis.Command, 0, 3*len(cspans))
for _, span := range cspans {
// prevent storing signaling spans sent from central collector
// all actual spans should have a spanID
if span.SpanID == "" {
continue
}

trace := &centralTraceStatusInit{
TraceID: span.TraceID,
Expand Down
29 changes: 18 additions & 11 deletions collect/central_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var ErrWouldBlock = errors.New("not adding span, channel buffer is full")
const (
TraceSendGotRoot = "trace_send_got_root"
TraceSendExpired = "trace_send_expired"
TraceSendEjectedFull = "trace_send_ejected_full"
TraceSendEjectedMemsize = "trace_send_ejected_memsize"
TraceSendLateSpan = "trace_send_late_span"
)
Expand Down Expand Up @@ -606,13 +605,11 @@ func (c *CentralCollector) makeDecisions(ctx context.Context) error {
// so that it's synced across all refinery instances
if c.Config.GetAddRuleReasonToTrace() {
status.Metadata["meta.refinery.reason"] = reason
if status.Metadata["meta.refinery.send_reason"] == "" {
sendReason := TraceSendExpired
if trace.Root != nil {
sendReason = TraceSendGotRoot
}
status.Metadata["meta.refinery.send_reason"] = sendReason
sendReason := TraceSendExpired
if trace.Root != nil {
sendReason = TraceSendGotRoot
}
status.Metadata["meta.refinery.send_reason"] = sendReason
if key != "" {
status.Metadata["meta.refinery.sample_key"] = key
}
Expand Down Expand Up @@ -734,7 +731,10 @@ func (c *CentralCollector) checkAlloc() {
}
totalDataSizeSent += trace.DataSize
numOfTracesSent++
err := c.Store.WriteSpan(ctx, &centralstore.CentralSpan{TraceID: id})
// in order to eject a trace from refinery's cache, we pretend that its root span has
// arrived. This will force the trace to enter the decision-making process. Once a decision
// is made, the trace will be removed from the cache.
err := c.Store.WriteSpan(ctx, &centralstore.CentralSpan{TraceID: id, IsRoot: true})
if err != nil {
c.Logger.Error().WithField("trace_id", id).Logf("error sending trace for decision: %s", err)
}
Expand Down Expand Up @@ -799,7 +799,7 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) {
if !ok {
reason = status.KeepReason
}
sendReason := status.Metadata["meta.refinery.send_reason"]
var sendReason string
if sp.ArrivalTime.After(status.Timestamp) {
if reason == "" {
reason = "late arriving span"
Expand All @@ -809,9 +809,16 @@ func (c *CentralCollector) sendSpans(status *centralstore.CentralTraceStatus) {
sendReason = TraceSendLateSpan
}
sp.Data["meta.refinery.reason"] = reason
if sendReason != nil {
sp.Data["meta.refinery.send_reason"] = sendReason
if sendReason == "" {
val, ok := status.Metadata["meta.refinery.send_reason"]
if ok {
stringVal, ok := val.(string)
if ok {
sendReason = stringVal
}
}
}
sp.Data["meta.refinery.send_reason"] = sendReason
}
sp.Data["meta.span_event_count"] = int(status.SpanEventCount())
sp.Data["meta.span_link_count"] = int(status.SpanLinkCount())
Expand Down
33 changes: 25 additions & 8 deletions collect/central_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ func TestCentralCollector_ProcessTraces(t *testing.T) {
for _, storeType := range storeTypes {
t.Run(storeType, func(t *testing.T) {
conf := &config.MockConfig{
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetParallelismVal: 10,
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
AddRuleReasonToTrace: true,
GetParallelismVal: 10,
GetCollectionConfigVal: config.CollectionConfig{
CacheCapacity: 100,
SenderCycleDuration: config.Duration(1 * time.Second),
Expand Down Expand Up @@ -177,6 +178,16 @@ func TestCentralCollector_ProcessTraces(t *testing.T) {
count, ok := collector.Metrics.Get("trace_send_kept")
require.True(t, ok)
assert.Equal(t, float64(numberOfTraces), count)

require.EventuallyWithT(t, func(collect *assert.CollectT) {
transmission.Mux.RLock()
assert.Equal(t, numberOfTraces*10, len(transmission.Events))
assert.Equal(t, "aoeu", transmission.Events[0].Dataset)
assert.Equal(t, "test", transmission.Events[0].Environment)
assert.Equal(t, TraceSendGotRoot, transmission.Events[0].Data["meta.refinery.send_reason"])
assert.Equal(t, "deterministic/always", transmission.Events[0].Data["meta.refinery.reason"])
transmission.Mux.RUnlock()
}, 5*time.Second, 100*time.Millisecond)
})
}
}
Expand All @@ -185,10 +196,11 @@ func TestCentralCollector_Decider(t *testing.T) {
for _, storeType := range storeTypes {
t.Run(storeType, func(t *testing.T) {
conf := &config.MockConfig{
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetParallelismVal: 10,
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
AddRuleReasonToTrace: true,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetParallelismVal: 10,
GetCollectionConfigVal: config.CollectionConfig{
IncomingQueueSize: 100,
SenderCycleDuration: config.Duration(1 * time.Second),
Expand Down Expand Up @@ -244,6 +256,8 @@ func TestCentralCollector_Decider(t *testing.T) {
require.Equal(t, numberOfTraces, len(traces))
for _, trace := range traces {
assert.Equal(t, centralstore.DecisionKeep, trace.State)
assert.Equal(t, TraceSendGotRoot, trace.Metadata["meta.refinery.send_reason"])
assert.Equal(t, "deterministic/always", trace.Metadata["meta.refinery.reason"])
}
})
}
Expand Down Expand Up @@ -290,6 +304,7 @@ func TestCentralCollector_OriginalSampleRateIsNotedInMetaField(t *testing.T) {
for i := 0; i < 10; i++ {
span := &types.Span{
TraceID: fmt.Sprintf("trace-%v", i),
ID: fmt.Sprintf("span%d", i),
Event: types.Event{
Dataset: "aoeu",
APIKey: legacyAPIKey,
Expand Down Expand Up @@ -322,6 +337,7 @@ func TestCentralCollector_OriginalSampleRateIsNotedInMetaField(t *testing.T) {
traceID := fmt.Sprintf("trace-%v", 1000)
err := collector.AddSpan(&types.Span{
TraceID: traceID,
ID: "span1000",
Event: types.Event{
Dataset: "no-upstream-sampling",
APIKey: legacyAPIKey,
Expand Down Expand Up @@ -385,6 +401,7 @@ func TestCentralCollector_TransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *t

span := &types.Span{
TraceID: fmt.Sprintf("trace-%v", 1),
ID: "span1",
Event: types.Event{
Dataset: "aoeu",
APIKey: legacyAPIKey,
Expand Down

0 comments on commit 1351cd8

Please sign in to comment.