Skip to content


add the decision-time link override
Browse files Browse the repository at this point in the history
  • Loading branch information
mterhar committed Nov 2, 2023
1 parent 989d736 commit 472d3ce
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 27 deletions.
175 changes: 159 additions & 16 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package collect

import (

Expand All @@ -13,6 +16,7 @@ import (
Expand All @@ -29,7 +33,7 @@ type Collector interface {
Stressed() bool
GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string)
ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string)
AlreadySeen(traceID string) (keep bool, timeout uint, err error)
AlreadySeen(traceID string) (keep bool, timeout time.Duration, err error)

func GetCollectorImplementation(c config.Config) Collector {
Expand All @@ -51,6 +55,7 @@ type InMemCollector struct {
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
Sharder sharder.Sharder `inject:""`
StressRelief StressReliever `inject:"stressRelief"`

// For test use only
Expand Down Expand Up @@ -263,19 +268,21 @@ func (i *InMemCollector) AddSpan(sp *types.Span) error {

// AlreadySeen(traceID string) (keep bool, timeout uint)
func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout uint, err error) {
sr, kept := i.sampleTraceCache.CheckTraceID(traceID)
if sr != nil { // sr is only nil if it's never been found.
return kept, 0, nil
// if timeout and error are 0 and nil, that means kept or not-kept are the answer
// if timeout is greater than zero, it's still waiting to send
// if error is not nil, it didn't get an answer.
func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout_ms time.Duration, err error) {
_, kept := i.sampleTraceCache.CheckTraceID(traceID)
if kept {
return true, 0, nil

trace := i.cache.Get(traceID)
if trace != nil { // trace is in the cache. How long is left?
plzwait := time.Until(trace.SendBy)
return false, uint(plzwait.Milliseconds()), nil
return false, plzwait, nil

return false, 0, errors.New("problem looking up trace")
return false, 0, errors.New("trace not found")

// AddSpan accepts the incoming span to a queue and returns immediately
Expand Down Expand Up @@ -382,8 +389,10 @@ func (i *InMemCollector) collect() {

func (i *InMemCollector) sendTracesInCache(now time.Time) {
traces := i.cache.TakeExpiredTraces(now)

for _, t := range traces {
if t.RootSpan != nil {
// if the root span has a link
i.send(t, TraceSendGotRoot)
} else {
i.send(t, TraceSendExpired)
Expand Down Expand Up @@ -455,6 +464,19 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
trace.SendBy = time.Now().Add(timeout)
trace.RootSpan = sp

// if there are links, throw them in the pile
if i.isSpanLink(sp) {
parentId := i.getParentId(sp)
// root spans shouldn't be able to have links in OTEL land because links are
// turned into a separate event when they arrive at Refinery.
// if someone wants to do a link this way, we should allow it.
if len(parentId) < 1 {
parentId = "*It's*The*Root*Span*"
// this should include a reference to the span, not a copy of the span
trace.AddSpanLink(parentId, sp)

// ProcessSpanImmediately is an escape hatch used under stressful conditions --
Expand Down Expand Up @@ -569,6 +591,41 @@ func (i *InMemCollector) isRootSpan(sp *types.Span) bool {
return true

func (i *InMemCollector) getParentId(sp *types.Span) string {
for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() {
parentId := sp.Data[parentIdFieldName]
if _, ok := parentId.(string); ok {
return parentId.(string)
return ""

func (i *InMemCollector) isSpanLink(sp *types.Span) bool {
// do we even bother checking for the meta.annotation_type?
// if sp.Data["meta.annotation_type"] = "link"
// Not going to include this today because I don't think it matters.
// if someone wants to use the spanlink functionality with their own fields, that should be fine.
for _, linkIdFieldName := range i.Config.GetLinkFieldNames() {
linkId := sp.Data[linkIdFieldName]
if _, ok := linkId.(string); ok {
return true
return false

func (i *InMemCollector) getSpanLinkId(sp *types.Span) string {
for _, linkIdFieldName := range i.Config.GetLinkFieldNames() {
linkId := sp.Data[linkIdFieldName]
if _, ok := linkId.(string); ok {
// return the first one we find. If someone is using multiple link field names in a single span, eek.
return linkId.(string)
return ""

func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if trace.Sent {
// someone else already sent this so we shouldn't also send it.
Expand Down Expand Up @@ -609,15 +666,102 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil {
trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount())

// use sampler key to find sampler; create and cache if not found
if sampler, found = i.datasetSamplers[samplerKey]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey)
i.datasetSamplers[samplerKey] = sampler
rate, shouldSend, reason, key := uint(1), true, "unprocessed", ""
keep_override := false
linkstrat, err := i.Config.GetLinkStrategy()
if trace.SpanLinks != nil && err == nil && linkstrat == "RootLinkOverride" && trace.RootSpan != nil {
// If the root span has a link, let's check the proper shard
var link_sp *types.Span
var ok bool
// for RootLinkOverride, we just want to look at the root span and its children (because links are children)
if link_sp, ok = trace.SpanLinks[trace.RootSpan.Data["trace.span_id"].(string)]; !ok {
link_sp, ok = trace.SpanLinks["*It's*The*Root*Span*"]
// Another mode like AnyLinkOverride would scan the entire trace for links and then return an array of spans to check
if ok {
rate, shouldSend, reason, key = uint(1), true, linkstrat, ""
linked_trace_id := i.getSpanLinkId(link_sp)
linked_shard := i.Sharder.WhichShard(linked_trace_id)
plzwait, retries := 5, 5
if linked_shard.Equals(i.Sharder.MyShard()) {
for plzwait > 0 && retries > 0 {
keep, plzwait, error := i.AlreadySeen(linked_trace_id)
if err != nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local cache check error: %v")
} else if keep && error == nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace found")
keep_override = true
plzwait = 0
if plzwait > 0 {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace timenot not reached: %v", plzwait)
time.Sleep(time.Duration(plzwait+20) * time.Millisecond)
retries = retries - 1
} else {
// we store other shard's linked trace IDs for a bit so let's look
for plzwait > 0 && retries > 0 {
keep, plzwait, error := i.AlreadySeen(linked_trace_id)
if keep && error == nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace found in local queue")
keep_override = true
plzwait = 0
if plzwait > 0 {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace timeout in local queue not reached: %v", plzwait)
time.Sleep(time.Duration(plzwait+20) * time.Millisecond)
retries = retries - 1
if !keep_override {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("linked trace not found in local cache: %v", err)
// we have to ask the proper shard
retries := 5
for retries > 0 {
url := linked_shard.GetAddress() + "/decision/" + linked_trace_id
resp, err := http.Get(url)
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("checking %v for trace decision", url)
// parse json someday.
retries = retries - 1
if err == nil {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Failed to get decision, retries left: %d", retries)
} else {
if strings.Contains(string(body), "\"decision\":\"kept\"") {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("received decision, keep")
i.sampleTraceCache.Record(&types.Trace{TraceID: linked_trace_id, SampleRate: 1}, true)
keep_override = true
retries = 0
} else if strings.Contains(string(body), "\"decision\":\"dropped\"") {
keep_override = false
retries = 0
} else if strings.Contains(string(body), "\"decision\":\"wait\"") {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Waiting for decision, retries left: %d", retries)
} else {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("something else happened, retries left: %d\n%v", retries, string(body))

// make sampling decision and update the trace
rate, shouldSend, reason, key := sampler.GetSampleRate(trace)
if !keep_override {
// use sampler key to find sampler; create and cache if not found
if sampler, found = i.datasetSamplers[samplerKey]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey)
i.datasetSamplers[samplerKey] = sampler

// make sampling decision and update the trace
rate, shouldSend, reason, key = sampler.GetSampleRate(trace)
trace.SampleRate = rate
trace.KeepSample = shouldSend
logFields["reason"] = reason
Expand All @@ -626,7 +770,6 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
// This will observe sample rate attempts even if the trace is dropped
i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate))

i.sampleTraceCache.Record(trace, shouldSend)

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
Expand Down
21 changes: 10 additions & 11 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,28 +313,27 @@ func (r *Router) getTraceDecision(w http.ResponseWriter, req *http.Request) {
if !shard.Equals(r.Sharder.MyShard()) {
w.Write([]byte(fmt.Sprintf("traceid %v is not on this shard, try %v", traceID, shard.GetAddress())))
w.WriteHeader(http.StatusNotFound) // maybe better to use `http.StatusMisdirectedRequest`
} // fallthrough to active cache

// look in the decided traces pile and cache
kept, timeout, err := r.Collector.AlreadySeen(traceID)
if err != nil {
w.Write([]byte(fmt.Sprintf("traceid %v cache lookup failed", traceID)))
w.Write([]byte(fmt.Sprintf("traceid %v cache lookup failed: %v", traceID, err)))
if timeout > 0 {
w.Write([]byte(fmt.Sprintf("traceid %v has not ended yet, wait %v ms", traceID, timeout)))
w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"wait","timeout_ms":%d}`, traceID, timeout)))
decision := "dropped"
if kept {
w.Write([]byte(fmt.Sprintf("traceid %v has been sampled (kept)", traceID)))
decision = "kept"

// return traceID has never been seen
w.Write([]byte(fmt.Sprintf("traceid %v is unknown", traceID)))
w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"%v"}`, traceID, decision)))

func (r *Router) marshalToFormat(w http.ResponseWriter, obj interface{}, format string) {
Expand Down
12 changes: 12 additions & 0 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Trace struct {

RootSpan *Span

// SpanLinks contains a map of spans that are indexed by the trace.parent_id
// so that they can be recalled by using th RootSpan.span_id in rootoverride.
SpanLinks map[string]*Span

// DataSize is the sum of the DataSize of spans that are added.
// It's used to help expire the most expensive traces.
DataSize int
Expand Down Expand Up @@ -120,6 +124,14 @@ func (t *Trace) GetSamplerKey() (string, bool) {
return env, false

func (t *Trace) AddSpanLink(parentId string, sp *Span) error {
if t.SpanLinks == nil {
t.SpanLinks = map[string]*Span{}
t.SpanLinks[parentId] = sp
return nil

// Span is an event that shows up with a trace ID, so will be part of a Trace
type Span struct {
Expand Down

0 comments on commit 472d3ce

Please sign in to comment.