From 472d3cedae401260400a786125d224c8598d1f89 Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Fri, 27 Oct 2023 14:33:50 +0000 Subject: [PATCH] add the decision-time link override --- collect/collect.go | 175 ++++++++++++++++++++++++++++++++++++++++----- route/route.go | 21 +++--- types/event.go | 12 ++++ 3 files changed, 181 insertions(+), 27 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 8960570e6d..f60038abb7 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -2,9 +2,12 @@ package collect import ( "errors" + "io" + "net/http" "os" "runtime" "sort" + "strings" "sync" "time" @@ -13,6 +16,7 @@ import ( "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sample" + "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" "github.com/sirupsen/logrus" @@ -29,7 +33,7 @@ type Collector interface { Stressed() bool GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) - AlreadySeen(traceID string) (keep bool, timeout uint, err error) + AlreadySeen(traceID string) (keep bool, timeout time.Duration, err error) } func GetCollectorImplementation(c config.Config) Collector { @@ -51,6 +55,7 @@ type InMemCollector struct { Transmission transmit.Transmission `inject:"upstreamTransmission"` Metrics metrics.Metrics `inject:"genericMetrics"` SamplerFactory *sample.SamplerFactory `inject:""` + Sharder sharder.Sharder `inject:""` StressRelief StressReliever `inject:"stressRelief"` // For test use only @@ -263,19 +268,21 @@ func (i *InMemCollector) AddSpan(sp *types.Span) error { } // AlreadySeen(traceID string) (keep bool, timeout uint) -func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout uint, err error) { - sr, kept := i.sampleTraceCache.CheckTraceID(traceID) - if sr != nil { // sr is only nil if it's never been found. - return kept, 0, nil +// if timeout and error are 0 and nil, that means kept or not-kept are the answer +// if timeout is greater than zero, it's still waiting to send +// if error is not nil, it didn't get an answer. +func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout_ms time.Duration, err error) { + _, kept := i.sampleTraceCache.CheckTraceID(traceID) + if kept { + return true, 0, nil } trace := i.cache.Get(traceID) if trace != nil { // trace is in the cache. How long is left? plzwait := time.Until(trace.SendBy) - return false, uint(plzwait.Milliseconds()), nil + return false, plzwait, nil } - - return false, 0, errors.New("problem looking up trace") + return false, 0, errors.New("trace not found") } // AddSpan accepts the incoming span to a queue and returns immediately @@ -382,8 +389,10 @@ func (i *InMemCollector) collect() { func (i *InMemCollector) sendTracesInCache(now time.Time) { traces := i.cache.TakeExpiredTraces(now) + for _, t := range traces { if t.RootSpan != nil { + // if the root span has a link i.send(t, TraceSendGotRoot) } else { i.send(t, TraceSendExpired) @@ -455,6 +464,19 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace.SendBy = time.Now().Add(timeout) trace.RootSpan = sp } + + // if there are links, throw them in the pile + if i.isSpanLink(sp) { + parentId := i.getParentId(sp) + // root spans shouldn't be able to have links in OTEL land because links are + // turned into a separate event when they arrive at Refinery. + // if someone wants to do a link this way, we should allow it. + if len(parentId) < 1 { + parentId = "*It's*The*Root*Span*" + } + // this should include a reference to the span, not a copy of the span + trace.AddSpanLink(parentId, sp) + } } // ProcessSpanImmediately is an escape hatch used under stressful conditions -- @@ -569,6 +591,41 @@ func (i *InMemCollector) isRootSpan(sp *types.Span) bool { return true } +func (i *InMemCollector) getParentId(sp *types.Span) string { + for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() { + parentId := sp.Data[parentIdFieldName] + if _, ok := parentId.(string); ok { + return parentId.(string) + } + } + return "" +} + +func (i *InMemCollector) isSpanLink(sp *types.Span) bool { + // do we even bother checking for the meta.annotation_type? + // if sp.Data["meta.annotation_type"] = "link" + // Not going to include this today because I don't think it matters. + // if someone wants to use the spanlink functionality with their own fields, that should be fine. + for _, linkIdFieldName := range i.Config.GetLinkFieldNames() { + linkId := sp.Data[linkIdFieldName] + if _, ok := linkId.(string); ok { + return true + } + } + return false +} + +func (i *InMemCollector) getSpanLinkId(sp *types.Span) string { + for _, linkIdFieldName := range i.Config.GetLinkFieldNames() { + linkId := sp.Data[linkIdFieldName] + if _, ok := linkId.(string); ok { + // return the first one we find. If someone is using multiple link field names in a single span, eek. + return linkId.(string) + } + } + return "" +} + func (i *InMemCollector) send(trace *types.Trace, sendReason string) { if trace.Sent { // someone else already sent this so we shouldn't also send it. @@ -609,15 +666,102 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil { trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount()) } - - // use sampler key to find sampler; create and cache if not found - if sampler, found = i.datasetSamplers[samplerKey]; !found { - sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey) - i.datasetSamplers[samplerKey] = sampler + rate, shouldSend, reason, key := uint(1), true, "unprocessed", "" + keep_override := false + linkstrat, err := i.Config.GetLinkStrategy() + if trace.SpanLinks != nil && err == nil && linkstrat == "RootLinkOverride" && trace.RootSpan != nil { + // If the root span has a link, let's check the proper shard + var link_sp *types.Span + var ok bool + // for RootLinkOverride, we just want to look at the root span and its children (because links are children) + if link_sp, ok = trace.SpanLinks[trace.RootSpan.Data["trace.span_id"].(string)]; !ok { + link_sp, ok = trace.SpanLinks["*It's*The*Root*Span*"] + } + // Another mode like AnyLinkOverride would scan the entire trace for links and then return an array of spans to check + if ok { + rate, shouldSend, reason, key = uint(1), true, linkstrat, "" + linked_trace_id := i.getSpanLinkId(link_sp) + linked_shard := i.Sharder.WhichShard(linked_trace_id) + plzwait, retries := 5, 5 + if linked_shard.Equals(i.Sharder.MyShard()) { + for plzwait > 0 && retries > 0 { + keep, plzwait, error := i.AlreadySeen(linked_trace_id) + if err != nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local cache check error: %v") + } else if keep && error == nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace found") + keep_override = true + plzwait = 0 + } + if plzwait > 0 { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace timenot not reached: %v", plzwait) + time.Sleep(time.Duration(plzwait+20) * time.Millisecond) + } + retries = retries - 1 + } + } else { + // we store other shard's linked trace IDs for a bit so let's look + for plzwait > 0 && retries > 0 { + keep, plzwait, error := i.AlreadySeen(linked_trace_id) + if keep && error == nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace found in local queue") + keep_override = true + plzwait = 0 + } + if plzwait > 0 { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace timeout in local queue not reached: %v", plzwait) + time.Sleep(time.Duration(plzwait+20) * time.Millisecond) + } + retries = retries - 1 + } + if !keep_override { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("linked trace not found in local cache: %v", err) + // we have to ask the proper shard + retries := 5 + for retries > 0 { + url := linked_shard.GetAddress() + "/decision/" + linked_trace_id + resp, err := http.Get(url) + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("checking %v for trace decision", url) + // parse json someday. + retries = retries - 1 + if err == nil { + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Failed to get decision, retries left: %d", retries) + } else { + if strings.Contains(string(body), "\"decision\":\"kept\"") { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("received decision, keep") + i.sampleTraceCache.Record(&types.Trace{TraceID: linked_trace_id, SampleRate: 1}, true) + keep_override = true + retries = 0 + } else if strings.Contains(string(body), "\"decision\":\"dropped\"") { + keep_override = false + retries = 0 + } else if strings.Contains(string(body), "\"decision\":\"wait\"") { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Waiting for decision, retries left: %d", retries) + time.Sleep(200) + } else { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("something else happened, retries left: %d\n%v", retries, string(body)) + } + } + } + } + } + } + } } - // make sampling decision and update the trace - rate, shouldSend, reason, key := sampler.GetSampleRate(trace) + if !keep_override { + // use sampler key to find sampler; create and cache if not found + if sampler, found = i.datasetSamplers[samplerKey]; !found { + sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey) + i.datasetSamplers[samplerKey] = sampler + } + + // make sampling decision and update the trace + rate, shouldSend, reason, key = sampler.GetSampleRate(trace) + } trace.SampleRate = rate trace.KeepSample = shouldSend logFields["reason"] = reason @@ -626,7 +770,6 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { } // This will observe sample rate attempts even if the trace is dropped i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate)) - i.sampleTraceCache.Record(trace, shouldSend) // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. diff --git a/route/route.go b/route/route.go index f61f0235c6..548d1964c3 100644 --- a/route/route.go +++ b/route/route.go @@ -313,28 +313,27 @@ func (r *Router) getTraceDecision(w http.ResponseWriter, req *http.Request) { if !shard.Equals(r.Sharder.MyShard()) { w.Write([]byte(fmt.Sprintf("traceid %v is not on this shard, try %v", traceID, shard.GetAddress()))) w.WriteHeader(http.StatusNotFound) // maybe better to use `http.StatusMisdirectedRequest` - } // fallthrough to active cache + return + } // look in the decided traces pile and cache kept, timeout, err := r.Collector.AlreadySeen(traceID) if err != nil { - w.Write([]byte(fmt.Sprintf("traceid %v cache lookup failed", traceID))) - w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf("traceid %v cache lookup failed: %v", traceID, err))) + w.WriteHeader(http.StatusNotFound) return } if timeout > 0 { - w.Write([]byte(fmt.Sprintf("traceid %v has not ended yet, wait %v ms", traceID, timeout))) - w.WriteHeader(http.StatusContinue) + w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"wait","timeout_ms":%d}`, traceID, timeout))) + w.WriteHeader(http.StatusOK) return } + decision := "dropped" if kept { - w.Write([]byte(fmt.Sprintf("traceid %v has been sampled (kept)", traceID))) - w.WriteHeader(http.StatusOK) + decision = "kept" } - - // return traceID has never been seen - w.Write([]byte(fmt.Sprintf("traceid %v is unknown", traceID))) - w.WriteHeader(http.StatusNotFound) + w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"%v"}`, traceID, decision))) + w.WriteHeader(http.StatusOK) } func (r *Router) marshalToFormat(w http.ResponseWriter, obj interface{}, format string) { diff --git a/types/event.go b/types/event.go index d7f37cd7a3..b30a399859 100644 --- a/types/event.go +++ b/types/event.go @@ -54,6 +54,10 @@ type Trace struct { RootSpan *Span + // SpanLinks contains a map of spans that are indexed by the trace.parent_id + // so that they can be recalled by using th RootSpan.span_id in rootoverride. + SpanLinks map[string]*Span + // DataSize is the sum of the DataSize of spans that are added. // It's used to help expire the most expensive traces. DataSize int @@ -120,6 +124,14 @@ func (t *Trace) GetSamplerKey() (string, bool) { return env, false } +func (t *Trace) AddSpanLink(parentId string, sp *Span) error { + if t.SpanLinks == nil { + t.SpanLinks = map[string]*Span{} + } + t.SpanLinks[parentId] = sp + return nil +} + // Span is an event that shows up with a trace ID, so will be part of a Trace type Span struct { Event