diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000000..8db8c26235 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,117 @@ +# OpenTelemetry Code Analysis & Instrumentation + +Analyze code and provide specific instrumentation recommendations optimized for Honeycomb, with a preference for using OpenTelemetry tracing if possible. + +Support direct instructions, like "instrument this file" or "add tracing to this function." + +## Rules to follow + +Ignore metrics code, if it exists, for now. + +Focus on enhancing any existing logs, then suggest span instrumentation if requested. + +If there are no existing logging calls, suggest using OpenTelemetry spans instead, unless explicitly asked to add logs. + +## Logging Enhancements + +If code has logging, recommend improvements to: + +1. Add proper log levels for different types of operations: + ``` + # Instead of: + print("Processing order") + logger.info(f"Order {id} status: {status}") + + # Better as: + logger.info("Starting order processing", {"app.order_id": id}) + logger.error("Order processing failed", {"app.order_id": id, "app.error": str(e)}) + ``` + +2. Convert print statements to structured logs: + ``` + # Instead of: + print(f"Processing order {id} for customer {customer}") + + # Better as: + logger.info("Processing order", { + "app.order_id": id, + "app.customer_id": customer.id, + "app.items_count": len(items) + }) + ``` + +3. Consolidate related logs into single, rich events: + ``` + # Instead of multiple logs: + logger.info(f"Processing order {id}") + items = process_order(id) + logger.info(f"Found {len(items)} items") + discount = apply_discount(items) + logger.info(f"Applied discount: {discount}") + + # Better as one structured log: + logger.info("Processing order", { + "app.order_id": id, + "app.items_count": len(items), + "app.discount_applied": discount, + "app.customer_tier": customer.tier + }) + ``` + +4. Capture high-cardinality data and data useful for debugging from function parameters in structured fields, such as: + - `app.user_id` + - `app.request_id` + - `app.order_id`, `app.product_id`, etc. + - Operation parameters + - State information + +In particular, especially focus on consolidating logs that can be combined into a single, rich event. + +## Span Instrumentation (If Requested) + +If instrumenting iwth spans, recommend instrumentation that: + +1. Adds important high-cardinality data, request context, and data useful for debugging from function parameters to the current span: + ``` + current_span.set_attributes({ + "app.customer_id": request.customer_id, + "app.order_type": request.type, + "app.items_count": len(request.items) + }) + ``` + +2. Creates spans for meaningful operations: + ``` + with span("create_order") as order_span: + order = create_order(request) + order_span.set_attributes({ + "app.order_id": order.id, + "app.total_amount": order.total + }) + ``` + +3. Handles errors properly: + ``` + try: + # operation code + catch Error as e: + span.set_attributes({ + "error": true, + "error.type": type(e), + "error.message": str(e) + }) + raise + ``` + +## Additional considerations + +Always use well-defined, specific, and namespaced keys for structured logs and span attributes. + +Consider deeply if clarification is needed on: + +- The purpose or context of specific code sections +- Which operations are most important to instrument +- Whether to focus on logging improvements or span creation, especially if both are present +- The meaning of domain-specific terms or variables + +Ask for more information before providing recommendations if necessary. \ No newline at end of file diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 3194df7ad8..578a2ba78b 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -48,12 +48,18 @@ var BuildID string var version string type graphLogger struct { + logger logger.Logger } -// TODO: make this log properly -func (g graphLogger) Debugf(format string, v ...interface{}) { - fmt.Printf(format, v...) - fmt.Println() +// Initialize the graph logger +func newGraphLogger(l logger.Logger) *graphLogger { + return &graphLogger{logger: l} +} + +func (g *graphLogger) Debugf(format string, v ...interface{}) { + if g.logger != nil { + g.logger.Debug().WithField("component", "graph").Logf(format, v...) + } } func main() { @@ -252,7 +258,7 @@ func main() { // we need to include all the metrics types so we can inject them in case they're needed var g inject.Graph if opts.Debug { - g.Logger = graphLogger{} + g.Logger = newGraphLogger(lgr) } objects := []*inject.Object{ {Value: c}, diff --git a/collect/collect.go b/collect/collect.go index 317318d298..98a4f05ba7 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -2,667 +2,23 @@ package collect import ( "context" - "errors" "fmt" - "os" - "runtime" - "sort" - "sync" "time" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" - "github.com/honeycombio/refinery/collect/cache" - "github.com/honeycombio/refinery/config" - "github.com/honeycombio/refinery/generics" - "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/otelutil" - "github.com/honeycombio/refinery/internal/peer" - "github.com/honeycombio/refinery/logger" - "github.com/honeycombio/refinery/metrics" - "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/sharder" - "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" - "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" -) - -const ( - keptTraceDecisionTopic = "trace_decision_kept" - dropTraceDecisionTopic = "trace_decision_dropped" - decisionMessageBufferSize = 10_000 - defaultDropDecisionTickerInterval = 1 * time.Second - defaultKeptDecisionTickerInterval = 1 * time.Second -) - -var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.") -var CollectorHealthKey = "collector" - -type Collector interface { - // AddSpan adds a span to be collected, buffered, and merged into a trace. - // Once the trace is "complete", it'll be passed off to the sampler then - // scheduled for transmission. - AddSpan(*types.Span) error - AddSpanFromPeer(*types.Span) error - Stressed() bool - GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) - ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool) -} - -func GetCollectorImplementation(c config.Config) Collector { - return &InMemCollector{} -} - -// These are the names of the metrics we use to track our send decisions. -const ( - TraceSendGotRoot = "trace_send_got_root" - TraceSendExpired = "trace_send_expired" - TraceSendSpanLimit = "trace_send_span_limit" - TraceSendEjectedFull = "trace_send_ejected_full" - TraceSendEjectedMemsize = "trace_send_ejected_memsize" - TraceSendLateSpan = "trace_send_late_span" + "go.opentelemetry.io/otel/attribute" ) -type sendableTrace struct { - *types.Trace - reason string - sendReason string - sampleKey string - shouldSend bool -} - -// InMemCollector is a single threaded collector. -type InMemCollector struct { - Config config.Config `inject:""` - Logger logger.Logger `inject:""` - Clock clockwork.Clock `inject:""` - Tracer trace.Tracer `inject:"tracer"` - Health health.Recorder `inject:""` - Sharder sharder.Sharder `inject:""` - - Transmission transmit.Transmission `inject:"upstreamTransmission"` - PeerTransmission transmit.Transmission `inject:"peerTransmission"` - PubSub pubsub.PubSub `inject:""` - Metrics metrics.Metrics `inject:"genericMetrics"` - SamplerFactory *sample.SamplerFactory `inject:""` - StressRelief StressReliever `inject:"stressRelief"` - Peers peer.Peers `inject:""` - - // For test use only - BlockOnAddSpan bool - - // mutex must be held whenever non-channel internal fields are accessed. - // This exists to avoid data races in tests and startup/shutdown. - mutex sync.RWMutex - cache cache.Cache - - datasetSamplers map[string]sample.Sampler - - sampleTraceCache cache.TraceSentCache - - incoming chan *types.Span - fromPeer chan *types.Span - outgoingTraces chan sendableTrace - reload chan struct{} - done chan struct{} - redistributeTimer *redistributeNotifier - - dropDecisionMessages chan string - keptDecisionMessages chan string - - dropDecisionBuffer chan TraceDecision - keptDecisionBuffer chan TraceDecision - hostname string -} - -var inMemCollectorMetrics = []metrics.Metadata{ - {Name: "trace_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "time taken to process a trace from arrival to send"}, - {Name: "trace_span_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans in a trace"}, - {Name: "collector_incoming_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the incoming queue"}, - {Name: "collector_peer_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the peer queue"}, - {Name: "collector_incoming_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the incoming queue"}, - {Name: "collector_peer_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the peer queue"}, - {Name: "collector_cache_size", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces currently stored in the trace cache"}, - {Name: "memory_heap_allocation", Type: metrics.Gauge, Unit: metrics.Bytes, Description: "current heap allocation"}, - {Name: "span_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans received by the collector"}, - {Name: "span_processed", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans processed by the collector"}, - {Name: "spans_waiting", Type: metrics.UpDown, Unit: metrics.Dimensionless, Description: "number of spans waiting to be processed by the collector"}, - {Name: "trace_sent_cache_hit", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of late spans received for traces that have already been sent"}, - {Name: "trace_accepted", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of new traces received by the collector"}, - {Name: "trace_send_kept", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been kept"}, - {Name: "trace_send_dropped", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been dropped"}, - {Name: "trace_send_has_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that have a root span"}, - {Name: "trace_send_no_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that do not have a root span"}, - {Name: "trace_forwarded_on_peer_change", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces forwarded due to peer membership change"}, - {Name: "trace_redistribution_count", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces redistributed due to peer membership change"}, - {Name: "trace_send_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces sent during shutdown"}, - {Name: "trace_forwarded_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces forwarded during shutdown"}, - - {Name: TraceSendGotRoot, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to root span arrival"}, - {Name: TraceSendExpired, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to TraceTimeout or SendDelay"}, - {Name: TraceSendSpanLimit, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to span limit"}, - {Name: TraceSendEjectedFull, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to cache capacity overrun"}, - {Name: TraceSendEjectedMemsize, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to memory overrun"}, - {Name: TraceSendLateSpan, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans that are sent due to late span arrival"}, - - {Name: "dropped_from_stress", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces dropped due to stress relief"}, - {Name: "trace_kept_sample_rate", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "sample rate of kept traces"}, - {Name: "trace_aggregate_sample_rate", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "aggregate sample rate of both kept and dropped traces"}, - {Name: "collector_redistribute_traces_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of redistributing traces to peers"}, - {Name: "collector_collect_loop_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of the collect loop, the primary event processing goroutine"}, - {Name: "collector_outgoing_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of traces waiting to be send to upstream"}, - {Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"}, - {Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"}, - {Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"}, - {Name: "drop_decision_batches_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of drop decision batches received"}, - {Name: "kept_decision_batches_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept decision batches received"}, - {Name: "drop_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "total number of drop decisions received"}, - {Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "total number of kept decisions received"}, - {Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"}, - {Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"}, -} - -func (i *InMemCollector) Start() error { - i.Logger.Debug().Logf("Starting InMemCollector") - defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }() - imcConfig := i.Config.GetCollectionConfig() - i.cache = cache.NewInMemCache(imcConfig.CacheCapacity, i.Metrics, i.Logger) - i.StressRelief.UpdateFromConfig() - - // listen for config reloads - i.Config.RegisterReloadCallback(i.sendReloadSignal) - - i.Health.Register(CollectorHealthKey, time.Duration(imcConfig.HealthCheckTimeout)) - - for _, metric := range inMemCollectorMetrics { - i.Metrics.Register(metric) - } - - sampleCacheConfig := i.Config.GetSampleCacheConfig() - var err error - i.sampleTraceCache, err = cache.NewCuckooSentCache(sampleCacheConfig, i.Metrics) - if err != nil { - return err - } - - i.incoming = make(chan *types.Span, imcConfig.GetIncomingQueueSize()) - i.fromPeer = make(chan *types.Span, imcConfig.GetPeerQueueSize()) - i.outgoingTraces = make(chan sendableTrace, 100_000) - i.Metrics.Store("INCOMING_CAP", float64(cap(i.incoming))) - i.Metrics.Store("PEER_CAP", float64(cap(i.fromPeer))) - i.reload = make(chan struct{}, 1) - i.done = make(chan struct{}) - i.datasetSamplers = make(map[string]sample.Sampler) - i.done = make(chan struct{}) - i.redistributeTimer = newRedistributeNotifier(i.Logger, i.Metrics, i.Clock, time.Duration(i.Config.GetCollectionConfig().RedistributionDelay)) - - if i.Config.GetAddHostMetadataToTrace() { - if hostname, err := os.Hostname(); err == nil && hostname != "" { - i.hostname = hostname - } - } - - if !i.Config.GetCollectionConfig().DisableRedistribution { - i.Peers.RegisterUpdatedPeersCallback(i.redistributeTimer.Reset) - } - - if !i.Config.GetCollectionConfig().TraceLocalityEnabled() { - i.keptDecisionMessages = make(chan string, decisionMessageBufferSize) - i.dropDecisionMessages = make(chan string, decisionMessageBufferSize) - i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions) - i.PubSub.Subscribe(context.Background(), dropTraceDecisionTopic, i.signalDroppedTraceDecisions) - - i.dropDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5) - i.keptDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize*5) - } - - // spin up one collector because this is a single threaded collector - go i.collect() - go i.sendTraces() - // spin up a drop decision batch sender - go i.sendDropDecisions() - go i.sendKeptDecisions() - - return nil -} - -// sendReloadSignal will trigger the collector reloading its config, eventually. -func (i *InMemCollector) sendReloadSignal(cfgHash, ruleHash string) { - // non-blocking insert of the signal here so we don't leak goroutines - select { - case i.reload <- struct{}{}: - i.Logger.Debug().Logf("sending collect reload signal") - default: - i.Logger.Debug().Logf("collect already waiting to reload; skipping additional signal") - } -} - -func (i *InMemCollector) reloadConfigs() { - i.Logger.Debug().Logf("reloading in-mem collect config") - - i.sampleTraceCache.Resize(i.Config.GetSampleCacheConfig()) - - i.StressRelief.UpdateFromConfig() - - // clear out any samplers that we have previously created - // so that the new configuration will be propagated - i.datasetSamplers = make(map[string]sample.Sampler) - // TODO add resizing the LRU sent trace cache on config reload -} - -func (i *InMemCollector) checkAlloc(ctx context.Context) { - _, span := otelutil.StartSpan(ctx, i.Tracer, "checkAlloc") - defer span.End() - - inMemConfig := i.Config.GetCollectionConfig() - maxAlloc := inMemConfig.GetMaxAlloc() - i.Metrics.Store("MEMORY_MAX_ALLOC", float64(maxAlloc)) - - var mem runtime.MemStats - runtime.ReadMemStats(&mem) - i.Metrics.Gauge("memory_heap_allocation", int64(mem.Alloc)) - if maxAlloc == 0 || mem.Alloc < uint64(maxAlloc) { - return - } - - // Figure out what fraction of the total cache we should remove. We'd like it to be - // enough to get us below the max capacity, but not TOO much below. - // Because our impact numbers are only the data size, reducing by enough to reach - // max alloc will actually do more than that. - totalToRemove := mem.Alloc - uint64(maxAlloc) - - // The size of the cache exceeds the user's intended allocation, so we're going to - // remove the traces from the cache that have had the most impact on allocation. - // To do this, we sort the traces by their CacheImpact value and then remove traces - // until the total size is less than the amount to which we want to shrink. - allTraces := i.cache.GetAll() - span.SetAttributes(attribute.Int("cache_size", len(allTraces))) - - timeout := i.Config.GetTracesConfig().GetTraceTimeout() - if timeout == 0 { - timeout = 60 * time.Second - } // Sort traces by CacheImpact, heaviest first - sort.Slice(allTraces, func(i, j int) bool { - return allTraces[i].CacheImpact(timeout) > allTraces[j].CacheImpact(timeout) - }) - - // Now start removing the biggest traces, by summing up DataSize for - // successive traces until we've crossed the totalToRemove threshold - // or just run out of traces to delete. - - cacheSize := len(allTraces) - i.Metrics.Gauge("collector_cache_size", cacheSize) - - totalDataSizeSent := 0 - tracesSent := generics.NewSet[string]() - // Send the traces we can't keep. - traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout() - for _, trace := range allTraces { - // only eject traces that belong to this peer or the trace is an orphan - if _, ok := i.IsMyTrace(trace.ID()); !ok && !trace.IsOrphan(traceTimeout, i.Clock.Now()) { - i.Logger.Debug().WithFields(map[string]interface{}{ - "trace_id": trace.ID(), - }).Logf("cannot eject trace that does not belong to this peer") - - continue - } - td, err := i.makeDecision(ctx, trace, TraceSendEjectedMemsize) - if err != nil { - continue - } - tracesSent.Add(trace.TraceID) - totalDataSizeSent += trace.DataSize - i.send(ctx, trace, td) - if totalDataSizeSent > int(totalToRemove) { - break - } - } - i.cache.RemoveTraces(tracesSent) - - // Treat any MaxAlloc overage as an error so we know it's happening - i.Logger.Warn(). - WithField("cache_size", cacheSize). - WithField("alloc", mem.Alloc). - WithField("num_traces_sent", len(tracesSent)). - WithField("datasize_sent", totalDataSizeSent). - WithField("new_trace_count", i.cache.GetCacheEntryCount()). - Logf("Making some trace decisions early due to memory overrun.") - - // Manually GC here - without this we can easily end up evicting more than we - // need to, since total alloc won't be updated until after a GC pass. - runtime.GC() - return -} - -// AddSpan accepts the incoming span to a queue and returns immediately -func (i *InMemCollector) AddSpan(sp *types.Span) error { - return i.add(sp, i.incoming) -} - -// AddSpan accepts the incoming span to a queue and returns immediately -func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error { - return i.add(sp, i.fromPeer) -} - -// Stressed returns true if the collector is undergoing significant stress -func (i *InMemCollector) Stressed() bool { - return i.StressRelief.Stressed() -} - -func (i *InMemCollector) GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) { - return i.StressRelief.GetSampleRate(traceID) -} - -func (i *InMemCollector) add(sp *types.Span, ch chan<- *types.Span) error { - if i.BlockOnAddSpan { - ch <- sp - i.Metrics.Increment("span_received") - i.Metrics.Up("spans_waiting") - return nil - } - - select { - case ch <- sp: - i.Metrics.Increment("span_received") - i.Metrics.Up("spans_waiting") - return nil - default: - return ErrWouldBlock - } -} - -// collect handles both accepting spans that have been handed to it and sending -// the complete traces. These are done with channels in order to keep collecting -// single threaded so we don't need any locks. Actions taken from this select -// block is the only place we are allowed to modify any running data -// structures. -func (i *InMemCollector) collect() { - tickerDuration := i.Config.GetTracesConfig().GetSendTickerValue() - ticker := time.NewTicker(tickerDuration) - defer ticker.Stop() - - // mutex is normally held by this goroutine at all times. - // It is unlocked once per ticker cycle for tests. - i.mutex.Lock() - defer i.mutex.Unlock() - - for { - ctx, span := otelutil.StartSpan(context.Background(), i.Tracer, "collect") - startTime := time.Now() - - i.Health.Ready(CollectorHealthKey, true) - // record channel lengths as histogram but also as gauges - i.Metrics.Histogram("collector_incoming_queue", float64(len(i.incoming))) - i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer))) - i.Metrics.Gauge("collector_incoming_queue_length", float64(len(i.incoming))) - i.Metrics.Gauge("collector_peer_queue_length", float64(len(i.fromPeer))) - - // Always drain peer channel before doing anything else. By processing peer - // traffic preferentially we avoid the situation where the cluster essentially - // deadlocks because peers are waiting to get their events handed off to each - // other. - select { - case <-i.done: - span.End() - return - case <-i.redistributeTimer.Notify(): - i.redistributeTraces(ctx) - case sp, ok := <-i.fromPeer: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processSpan(ctx, sp, "peer") - default: - select { - case msg, ok := <-i.dropDecisionMessages: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processTraceDecisions(msg, dropDecision) - case msg, ok := <-i.keptDecisionMessages: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processTraceDecisions(msg, keptDecision) - case <-ticker.C: - i.sendExpiredTracesInCache(ctx, i.Clock.Now()) - i.checkAlloc(ctx) - - // maybe only do this if in test mode? - // Briefly unlock the cache, to allow test access. - _, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched") - i.mutex.Unlock() - runtime.Gosched() - i.mutex.Lock() - goSchedSpan.End() - case sp, ok := <-i.incoming: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processSpan(ctx, sp, "incoming") - case sp, ok := <-i.fromPeer: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processSpan(ctx, sp, "peer") - case <-i.reload: - i.reloadConfigs() - } - } - - i.Metrics.Histogram("collector_collect_loop_duration_ms", float64(time.Now().Sub(startTime).Milliseconds())) - span.End() - } -} - -func (i *InMemCollector) redistributeTraces(ctx context.Context) { - ctx, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces") - redistrubutionStartTime := i.Clock.Now() - - defer func() { - i.Metrics.Histogram("collector_redistribute_traces_duration_ms", i.Clock.Now().Sub(redistrubutionStartTime).Milliseconds()) - span.End() - }() - - // loop through eveything in the cache of live traces - // if it doesn't belong to this peer, we should forward it to the correct peer - peers, err := i.Peers.GetPeers() - if err != nil { - i.Logger.Error().Logf("unable to get peer list with error %s", err.Error()) - return - } - numOfPeers := len(peers) - span.SetAttributes(attribute.Int("num_peers", numOfPeers)) - if numOfPeers == 0 { - return - } - - traces := i.cache.GetAll() - span.SetAttributes(attribute.Int("num_traces_to_redistribute", len(traces))) - forwardedTraces := generics.NewSetWithCapacity[string](len(traces) / numOfPeers) - for _, trace := range traces { - if trace == nil { - continue - } - _, redistributeTraceSpan := otelutil.StartSpanWith(ctx, i.Tracer, "distributeTrace", "num_spans", trace.DescendantCount()) - - newTarget := i.Sharder.WhichShard(trace.TraceID) - - redistributeTraceSpan.SetAttributes(attribute.String("shard", newTarget.GetAddress())) - - if newTarget.Equals(i.Sharder.MyShard()) { - redistributeTraceSpan.SetAttributes(attribute.Bool("self", true)) - redistributeTraceSpan.End() - continue - } - - // if the ownership of the trace hasn't changed, we don't need to forward new decision spans - if newTarget.GetAddress() == trace.DeciderShardAddr { - redistributeTraceSpan.End() - continue - } - - // Trace doesn't belong to us and its ownership has changed. We should forward - // decision spans to its new owner - trace.DeciderShardAddr = newTarget.GetAddress() - // Remove decision spans from the trace that no longer belongs to the current node - trace.RemoveDecisionSpans() - - for _, sp := range trace.GetSpans() { - - if !i.Config.GetCollectionConfig().TraceLocalityEnabled() { - dc := i.createDecisionSpan(sp, trace, newTarget) - i.PeerTransmission.EnqueueEvent(dc) - continue - } - - sp.APIHost = newTarget.GetAddress() - - if sp.Data == nil { - sp.Data = make(map[string]interface{}) - } - if v, ok := sp.Data["meta.refinery.forwarded"]; ok { - sp.Data["meta.refinery.forwarded"] = fmt.Sprintf("%s,%s", v, i.hostname) - } else { - sp.Data["meta.refinery.forwarded"] = i.hostname - } - - i.PeerTransmission.EnqueueSpan(sp) - } - - forwardedTraces.Add(trace.TraceID) - redistributeTraceSpan.End() - } - - otelutil.AddSpanFields(span, map[string]interface{}{ - "forwarded_trace_count": len(forwardedTraces.Members()), - "total_trace_count": len(traces), - "hostname": i.hostname, - }) - - i.Metrics.Gauge("trace_forwarded_on_peer_change", len(forwardedTraces)) - if len(forwardedTraces) > 0 { - i.cache.RemoveTraces(forwardedTraces) - } -} - -func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.Time) { - ctx, span := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTracesInCache") - startTime := time.Now() - defer func() { - i.Metrics.Histogram("collector_send_expired_traces_in_cache_dur_ms", time.Since(startTime).Milliseconds()) - span.End() - }() - - expiredTraces := make([]*types.Trace, 0) - traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout() - var orphanTraceCount int - traces := i.cache.TakeExpiredTraces(now, int(i.Config.GetTracesConfig().MaxExpiredTraces), func(t *types.Trace) bool { - if _, ok := i.IsMyTrace(t.ID()); ok { - return true - } - - // if the trace is an orphan trace, we should just make a decision for it - // instead of waiting for the decider node - if t.IsOrphan(traceTimeout, now) { - orphanTraceCount++ - return true - } - - // if a trace has expired more than 2 times the trace timeout, we should forward it to its decider - // and wait for the decider to publish the trace decision again - // only retry it once - if now.Sub(t.SendBy) > traceTimeout*2 && !t.Retried { - expiredTraces = append(expiredTraces, t) - t.Retried = true - } - - // by returning false we will not remove the trace from the cache - // the trace will be removed from the cache when the peer receives the trace decision - return false - }) - - dur := time.Now().Sub(startTime) - i.Metrics.Gauge("collector_expired_traces_missing_decisions", len(expiredTraces)) - i.Metrics.Gauge("collector_expired_traces_orphans", orphanTraceCount) - - span.SetAttributes(attribute.Int("num_traces_to_expire", len(traces)), attribute.Int64("take_expired_traces_duration_ms", dur.Milliseconds())) - - spanLimit := uint32(i.Config.GetTracesConfig().SpanLimit) - - var totalSpansSent int64 - - for _, t := range traces { - ctx, sendExpiredTraceSpan := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTrace") - totalSpansSent += int64(t.DescendantCount()) - - if t.RootSpan != nil { - td, err := i.makeDecision(ctx, t, TraceSendGotRoot) - if err != nil { - sendExpiredTraceSpan.End() - continue - } - i.send(ctx, t, td) - } else { - if spanLimit > 0 && t.DescendantCount() > spanLimit { - td, err := i.makeDecision(ctx, t, TraceSendSpanLimit) - if err != nil { - sendExpiredTraceSpan.End() - continue - } - i.send(ctx, t, td) - } else { - td, err := i.makeDecision(ctx, t, TraceSendExpired) - if err != nil { - sendExpiredTraceSpan.End() - continue - } - i.send(ctx, t, td) - } - } - sendExpiredTraceSpan.End() - } - - for _, trace := range expiredTraces { - // if a trace has expired and it doesn't belong to this peer, we should ask its decider to - // publish the trace decision again - dc := i.createDecisionSpan(&types.Span{ - TraceID: trace.ID(), - Event: types.Event{ - Context: trace.GetSpans()[0].Context, - APIKey: trace.APIKey, - Dataset: trace.Dataset, - }, - }, trace, i.Sharder.WhichShard(trace.ID())) - dc.Data["meta.refinery.expired_trace"] = true - i.PeerTransmission.EnqueueEvent(dc) - } - span.SetAttributes(attribute.Int64("total_spans_sent", totalSpansSent)) -} - -// processSpan does all the stuff necessary to take an incoming span and add it -// to (or create a new placeholder for) a trace. func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source string) { - ctx, span := otelutil.StartSpan(ctx, i.Tracer, "processSpan") - defer func() { - i.Metrics.Increment("span_processed") - i.Metrics.Down("spans_waiting") - span.End() - }() + ctx, span := otelutil.StartSpanWith(ctx, i.Tracer, "collector.processSpan", + "trace_id", sp.TraceID, + "source", source, + "dataset", sp.Dataset) + defer span.End() var ( targetShard sharder.Shard @@ -764,9 +120,7 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source WithString("peer", targetShard.GetAddress()). Logf("Sending span to peer") - dc := i.createDecisionSpan(sp, trace, targetShard) - - i.PeerTransmission.EnqueueEvent(dc) + i.forwardSpanToPeer(ctx, sp, trace, targetShard) spanForwarded = true } @@ -797,24 +151,25 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source } } -// ProcessSpanImmediately is an escape hatch used under stressful conditions -- -// it submits a span for immediate transmission without enqueuing it for normal -// processing. This means it ignores dry run mode and doesn't build a complete -// trace context or cache the trace in the active trace buffer. It only gets -// called on the first span for a trace under stressful conditions; we got here -// because the StressRelief system detected that this is a new trace AND that it -// is being sampled. Therefore, we also put the traceID into the sent traces -// cache as "kept". -// It doesn't do any logging and barely touches metrics; this is about as -// minimal as we can make it. func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool) { - _, span := otelutil.StartSpanWith(context.Background(), i.Tracer, "collector.ProcessSpanImmediately", "trace_id", sp.TraceID) + ctx, span := otelutil.StartSpanWith(context.Background(), i.Tracer, "collector.ProcessSpanImmediately", + "trace_id", sp.TraceID, + "dataset", sp.Dataset) defer span.End() var rate uint record, reason, found := i.sampleTraceCache.CheckSpan(sp) if !found { + ctx, srSpan := otelutil.StartSpanWith(ctx, i.Tracer, "collector.checkStressRelief", + "trace_id", sp.TraceID) rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID) + srSpan.SetAttributes( + "sample.rate", rate, + "sample.keep", keep, + "sample.reason", reason, + ) + srSpan.End() + now := i.Clock.Now() trace := &types.Trace{ APIHost: sp.APIHost, @@ -825,868 +180,164 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, SendBy: now, } trace.SetSampleRate(rate) - // we do want a record of how we disposed of traces in case more come in after we've - // turned off stress relief (if stress relief is on we'll keep making the same decisions) i.sampleTraceCache.Record(trace, keep, reason) } else { - rate = record.Rate() - keep = record.Kept() + keep = record.Keep + rate = record.SampleRate } - if !keep { - i.Metrics.Increment("dropped_from_stress") - return true, false - } - - i.Metrics.Increment("kept_from_stress") - // ok, we're sending it, so decorate it first - sp.Data["meta.stressed"] = true - if i.Config.GetAddRuleReasonToTrace() { - sp.Data["meta.refinery.reason"] = reason - } - if i.hostname != "" { - sp.Data["meta.refinery.local_hostname"] = i.hostname + if keep { + ctx, sendSpan := otelutil.StartSpan(ctx, i.Tracer, "collector.sendImmediately") + if i.Config.GetAddHostMetadataToTrace() { + sp.AddSpanMetadataAttrs("meta.refinery.local_hostname", i.hostname) + } + sp.SampleRate = rate + sendErr := i.Transmission.EnqueueSpan(sp) + if sendErr != nil { + sendSpan.SetAttributes("error", true, "error.message", sendErr.Error()) + } + sendSpan.End() } - i.addAdditionalAttributes(sp) - mergeTraceAndSpanSampleRates(sp, rate, i.Config.GetIsDryRun()) - i.Transmission.EnqueueSpan(sp) - - return true, true + span.SetAttributes( + "sample.found_in_cache", found, + "sample.keep", keep, + "sample.rate", rate, + "sample.reason", reason, + ) + return true, keep } -// dealWithSentTrace handles a span that has arrived after the sampling decision -// on the trace has already been made, and it obeys that decision by either -// sending the span immediately or dropping it. func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSentRecord, keptReason string, sp *types.Span) { - _, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{ - "trace_id": sp.TraceID, - "kept_reason": keptReason, - "hostname": i.hostname, + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "collector.dealWithSentTrace", map[string]interface{}{ + "trace_id": sp.TraceID, + "sample.keep": tr.Keep, + "sample.rate": tr.SampleRate, + "sample.kept_reason": keptReason, }) defer span.End() - // if we receive a proxy span after a trace decision has been made, - // we should just broadcast the decision again - if sp.IsDecisionSpan() { - // late span in this case won't get HasRoot - td := TraceDecision{ - TraceID: sp.TraceID, - Kept: tr.Kept(), - Reason: keptReason, - SendReason: TraceSendLateSpan, - Rate: tr.Rate(), - Count: uint32(tr.SpanCount()), - EventCount: uint32(tr.SpanEventCount()), - LinkCount: uint32(tr.SpanLinkCount()), - } - i.publishTraceDecision(ctx, td) - return - } - - if i.Config.GetAddRuleReasonToTrace() { - var metaReason string - if len(keptReason) > 0 { - metaReason = fmt.Sprintf("%s - late arriving span", keptReason) - } else { - metaReason = "late arriving span" + if tr.Keep { + if i.Config.GetAddHostMetadataToTrace() { + sp.AddSpanMetadataAttrs("meta.refinery.local_hostname", i.hostname) } - sp.Data["meta.refinery.reason"] = metaReason - sp.Data["meta.refinery.send_reason"] = TraceSendLateSpan - - } - if i.hostname != "" { - sp.Data["meta.refinery.local_hostname"] = i.hostname - } - isDryRun := i.Config.GetIsDryRun() - keep := tr.Kept() - otelutil.AddSpanFields(span, map[string]interface{}{ - "keep": keep, - "is_dryrun": isDryRun, - }) - - if isDryRun { - // if dry run mode is enabled, we keep all traces and mark the spans with the sampling decision - sp.Data[config.DryRunFieldName] = keep - if !keep { - i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Sending span that would have been dropped, but dry run mode is enabled") - i.Metrics.Increment(TraceSendLateSpan) - i.addAdditionalAttributes(sp) - i.Transmission.EnqueueSpan(sp) - return - } - } - if keep { - i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Sending span because of previous decision to send trace") - mergeTraceAndSpanSampleRates(sp, tr.Rate(), isDryRun) - // if this span is a late root span, possibly update it with our current span count - if sp.IsRoot { - if i.Config.GetAddCountsToRoot() { - sp.Data["meta.span_event_count"] = int64(tr.SpanEventCount()) - sp.Data["meta.span_link_count"] = int64(tr.SpanLinkCount()) - sp.Data["meta.span_count"] = int64(tr.SpanCount()) - sp.Data["meta.event_count"] = int64(tr.DescendantCount()) - } else if i.Config.GetAddSpanCountToRoot() { - sp.Data["meta.span_count"] = int64(tr.DescendantCount()) - } + // use the trace's sample rate for this span + sp.SampleRate = tr.SampleRate + err := i.Transmission.EnqueueSpan(sp) + if err != nil { + span.SetAttributes( + attribute.Bool("error", true), + attribute.String("error.message", err.Error()), + ) + i.Logger.Error().WithField("error", err).Logf("failed to enqueue late span") } - otelutil.AddSpanField(span, "is_root_span", sp.IsRoot) i.Metrics.Increment(TraceSendLateSpan) - i.addAdditionalAttributes(sp) - i.Transmission.EnqueueSpan(sp) - return } - i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Dropping span because of previous decision to drop trace") } -func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMode bool) { - tempSampleRate := sp.SampleRate - if sp.SampleRate != 0 { - // Write down the original sample rate so that that information - // is more easily recovered - sp.Data["meta.refinery.original_sample_rate"] = sp.SampleRate - } - - if tempSampleRate < 1 { - // See https://docs.honeycomb.io/manage-data-volume/sampling/ - // SampleRate is the denominator of the ratio of sampled spans - // HoneyComb treats a missing or 0 SampleRate the same as 1, but - // behaves better/more consistently if the SampleRate is explicitly - // set instead of inferred - tempSampleRate = 1 - } - - // if spans are already sampled, take that into account when computing - // the final rate - if dryRunMode { - sp.Data["meta.dryrun.sample_rate"] = tempSampleRate * traceSampleRate - sp.SampleRate = tempSampleRate - } else { - sp.SampleRate = tempSampleRate * traceSampleRate - } -} - -// this is only called when a trace decision is received -func (i *InMemCollector) send(ctx context.Context, trace *types.Trace, td *TraceDecision) { - if trace.Sent { - // someone else already sent this so we shouldn't also send it. - i.Logger.Debug(). - WithString("trace_id", trace.TraceID). - WithString("dataset", trace.Dataset). - Logf("skipping send because someone else already sent trace to dataset") - return - } - trace.Sent = true - _, span := otelutil.StartSpan(ctx, i.Tracer, "send") +func (i *InMemCollector) determineSamplingDecision(ctx context.Context, trace *types.Trace) (uint, string) { + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "collector.determineSampling", map[string]interface{}{ + "trace_id": trace.TraceID, + "dataset": trace.Dataset, + }) defer span.End() - traceDur := i.Clock.Since(trace.ArrivalTime) - i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) - - logFields := logrus.Fields{ - "trace_id": td.TraceID, - } - // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. - if !td.Kept && !i.Config.GetIsDryRun() { - i.Metrics.Increment("trace_send_dropped") - i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling decision") - return - } - - if td.HasRoot { - rs := trace.RootSpan - if rs != nil { - if i.Config.GetAddCountsToRoot() { - rs.Data["meta.span_event_count"] = int64(td.EventCount) - rs.Data["meta.span_link_count"] = int64(td.LinkCount) - rs.Data["meta.span_count"] = int64(td.Count) - rs.Data["meta.event_count"] = int64(td.DescendantCount()) - } else if i.Config.GetAddSpanCountToRoot() { - rs.Data["meta.span_count"] = int64(td.DescendantCount()) - } - } - } - - i.Metrics.Increment(td.SendReason) - if types.IsLegacyAPIKey(trace.APIKey) { - logFields["dataset"] = td.SamplerSelector - } else { - logFields["environment"] = td.SamplerSelector - } - logFields["reason"] = td.KeptReason - if td.SamplerKey != "" { - logFields["sample_key"] = td.SamplerKey - } - - i.Metrics.Increment("trace_send_kept") - // This will observe sample rate decisions only if the trace is kept - i.Metrics.Histogram("trace_kept_sample_rate", float64(td.Rate)) - - // ok, we're not dropping this trace; send all the spans - if i.Config.GetIsDryRun() && !td.Kept { - i.Logger.Info().WithFields(logFields).Logf("Trace would have been dropped, but sending because dry run mode is enabled") - } else { - i.Logger.Info().WithFields(logFields).Logf("Sending trace") - } - i.Logger.Info().WithFields(logFields).Logf("Sending trace") - i.outgoingTraces <- sendableTrace{ - Trace: trace, - reason: td.Reason, - sendReason: td.SendReason, - sampleKey: td.SamplerKey, - shouldSend: td.Kept, - } -} - -func (i *InMemCollector) Stop() error { - i.redistributeTimer.Stop() - close(i.done) - // signal the health system to not be ready and - // stop liveness check - // so that no new traces are accepted - i.Health.Unregister(CollectorHealthKey) - - i.mutex.Lock() - - if !i.Config.GetCollectionConfig().DisableRedistribution { - peers, err := i.Peers.GetPeers() - if err != nil { - i.Logger.Error().Logf("unable to get peer list with error %s", err.Error()) - } - if len(peers) > 0 { - i.sendTracesOnShutdown() - } - } - - if i.Transmission != nil { - i.Transmission.Flush() - } - - i.sampleTraceCache.Stop() - i.mutex.Unlock() - - close(i.incoming) - close(i.fromPeer) - close(i.outgoingTraces) - - if !i.Config.GetCollectionConfig().TraceLocalityEnabled() { - close(i.dropDecisionBuffer) - close(i.keptDecisionBuffer) - } - - return nil -} - -// sentRecord is a struct that holds a span and the record of the trace decision made. -type sentRecord struct { - span *types.Span - record cache.TraceSentRecord - reason string -} - -// sendTracesInCache sends all traces in the cache to their final destination. -// This is done on shutdown to ensure that all traces are sent before the collector -// is stopped. -// It does this by pulling spans out of both the incoming queue and the peer queue so that -// any spans that are still in the queues when the collector is stopped are also sent. -// It also pulls traces out of the cache and sends them to their final destination. -func (i *InMemCollector) sendTracesOnShutdown() { - wg := &sync.WaitGroup{} - sentChan := make(chan sentRecord, len(i.incoming)) - forwardChan := make(chan *types.Span, 100_000) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i.Config.GetCollectionConfig().ShutdownDelay)) - defer cancel() - - // start a goroutine that will pull spans off of the channels passed in - // and send them to their final destination - wg.Add(1) - go func() { - defer wg.Done() - i.sendSpansOnShutdown(ctx, sentChan, forwardChan) - }() - - // start a goroutine that will pull spans off of the incoming queue - // and place them on the sentChan or forwardChan - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - return - case sp, ok := <-i.incoming: - if !ok { - return - } - - i.distributeSpansOnShutdown(sentChan, forwardChan, sp) - } - } - }() - - // start a goroutine that will pull spans off of the peer queue - // and place them on the sentChan or forwardChan - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - return - case sp, ok := <-i.fromPeer: - if !ok { - return - } - - i.distributeSpansOnShutdown(sentChan, forwardChan, sp) - } - } - }() - - // pull traces from the trace cache and place them on the sentChan or forwardChan - if i.cache != nil { - traces := i.cache.GetAll() - for _, trace := range traces { - i.distributeSpansOnShutdown(sentChan, forwardChan, trace.GetSpans()...) - } - } - - wg.Wait() - - close(sentChan) - close(forwardChan) - -} - -// distributeSpansInCache takes a list of spans and sends them to the appropriate channel based on the state of the trace. -func (i *InMemCollector) distributeSpansOnShutdown(sentSpanChan chan sentRecord, forwardSpanChan chan *types.Span, spans ...*types.Span) { - for _, sp := range spans { - // if the span is a decision span, we don't need to do anything with it - if sp != nil && !sp.IsDecisionSpan() { - - // first check if there's a trace decision - record, reason, found := i.sampleTraceCache.CheckSpan(sp) - if found { - sentSpanChan <- sentRecord{sp, record, reason} - continue - } - - // if there's no trace decision, then we need to forward the trace to its new home - forwardSpanChan <- sp - } - } -} - -// sendSpansOnShutdown is a helper function that sends span to their final destination -// on shutdown. -func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <-chan sentRecord, forwardSpanChan <-chan *types.Span) { - sentTraces := make(map[string]struct{}) - forwardedTraces := make(map[string]struct{}) - - for { - select { - case <-ctx.Done(): - i.Logger.Info().Logf("Timed out waiting for traces to send") - return - - case r, ok := <-sentSpanChan: - if !ok { - return - } - - ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_sent_span", map[string]interface{}{"trace_id": r.span.TraceID, "hostname": i.hostname}) - r.span.Data["meta.refinery.shutdown.send"] = true - - i.dealWithSentTrace(ctx, r.record, r.reason, r.span) - _, exist := sentTraces[r.span.TraceID] - if !exist { - sentTraces[r.span.TraceID] = struct{}{} - i.Metrics.Count("trace_send_on_shutdown", 1) - - } - - span.End() - - case sp, ok := <-forwardSpanChan: - if !ok { - return - } - - _, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_forwarded_span", map[string]interface{}{"trace_id": sp.TraceID, "hostname": i.hostname}) - - targetShard := i.Sharder.WhichShard(sp.TraceID) - url := targetShard.GetAddress() - - otelutil.AddSpanField(span, "target_shard", url) - - // TODO: we need to decorate the expired traces before forwarding them so that - // the downstream consumers can make decisions based on the metadata without having - // to restart the TraceTimeout or SendDelay - sp.APIHost = url - - if sp.Data == nil { - sp.Data = make(map[string]interface{}) - } - if v, ok := sp.Data["meta.refinery.forwarded"]; ok { - sp.Data["meta.refinery.forwarded"] = fmt.Sprintf("%s,%s", v, i.hostname) - } else { - sp.Data["meta.refinery.forwarded"] = i.hostname - } - - i.PeerTransmission.EnqueueSpan(sp) - _, exist := forwardedTraces[sp.TraceID] - if !exist { - forwardedTraces[sp.TraceID] = struct{}{} - i.Metrics.Count("trace_forwarded_on_shutdown", 1) - - } - - span.End() - } - - } -} - -// Convenience method for tests. -func (i *InMemCollector) getFromCache(traceID string) *types.Trace { - i.mutex.Lock() - defer i.mutex.Unlock() - return i.cache.Get(traceID) -} - -func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) { - for k, v := range i.Config.GetAdditionalAttributes() { - sp.Data[k] = v - } -} - -func (i *InMemCollector) createDecisionSpan(sp *types.Span, trace *types.Trace, targetShard sharder.Shard) *types.Event { - selector, isLegacyKey := trace.GetSamplerKey() - if selector == "" { - i.Logger.Error().WithField("trace_id", trace.ID()).Logf("error getting sampler selection key for trace") - } - - sampler, found := i.datasetSamplers[selector] - if !found { - sampler = i.SamplerFactory.GetSamplerImplementationForKey(selector, isLegacyKey) - i.datasetSamplers[selector] = sampler - } - - dc := sp.ExtractDecisionContext() - // extract all key fields from the span - keyFields := sampler.GetKeyFields() - for _, keyField := range keyFields { - if val, ok := sp.Data[keyField]; ok { - dc.Data[keyField] = val - } - } - - dc.APIHost = targetShard.GetAddress() - return dc -} - -func (i *InMemCollector) sendTraces() { - for t := range i.outgoingTraces { - i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.outgoingTraces))) - _, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "outgoingTraces_size": len(i.outgoingTraces)}) - - // if we have a key replacement rule, we should - // replace the key with the new key - keycfg := i.Config.GetAccessKeyConfig() - overwriteWith, err := keycfg.GetReplaceKey(t.APIKey) - if err != nil { - i.Logger.Warn().Logf("error replacing key: %s", err.Error()) - continue - } - if overwriteWith != t.APIKey { - t.APIKey = overwriteWith - } - - for _, sp := range t.GetSpans() { - if sp.IsDecisionSpan() { - continue - } - - if i.Config.GetAddRuleReasonToTrace() { - sp.Data["meta.refinery.reason"] = t.reason - sp.Data["meta.refinery.send_reason"] = t.sendReason - if t.sampleKey != "" { - sp.Data["meta.refinery.sample_key"] = t.sampleKey - } - } - - // update the root span (if we have one, which we might not if the trace timed out) - // with the final total as of our send time - if sp.IsRoot { - if i.Config.GetAddCountsToRoot() { - sp.Data["meta.span_event_count"] = int64(t.SpanEventCount()) - sp.Data["meta.span_link_count"] = int64(t.SpanLinkCount()) - sp.Data["meta.span_count"] = int64(t.SpanCount()) - sp.Data["meta.event_count"] = int64(t.DescendantCount()) - } else if i.Config.GetAddSpanCountToRoot() { - sp.Data["meta.span_count"] = int64(t.DescendantCount()) - } - } - - isDryRun := i.Config.GetIsDryRun() - if isDryRun { - sp.Data[config.DryRunFieldName] = t.shouldSend - } - if i.hostname != "" { - sp.Data["meta.refinery.local_hostname"] = i.hostname - } - mergeTraceAndSpanSampleRates(sp, t.SampleRate(), isDryRun) - i.addAdditionalAttributes(sp) - - sp.APIKey = t.APIKey - i.Transmission.EnqueueSpan(sp) - } - span.End() - } -} - -func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg string) { - if len(msg) == 0 { - return - } - - peerID, err := i.Peers.GetInstanceID() - if err != nil { - return - } + var sampler sample.Sampler + samplerKey, isLegacy := trace.GetSamplerKey() + span.SetAttributes( + attribute.String("sampler.key", samplerKey), + attribute.Bool("sampler.is_legacy", isLegacy), + ) - if isMyDecision(msg, peerID) { - return - } + i.samplerLock.RLock() + sampler = i.datasetSamplers[samplerKey] + i.samplerLock.RUnlock() + + if sampler == nil { + span.SetAttributes(attribute.String("sampler.status", "creating_new")) + // no sampler exists yet for this trace's dataset; create one + samplerType := i.Config.GetSamplerType() + i.Logger.Debug().WithFields(map[string]interface{}{ + "dataset": trace.Dataset, + "sampler_type": fmt.Sprintf("%T", samplerType), + }).Logf("creating new sampler for dataset") + + sampler = samplerType.CreateSampler(trace.Dataset) + i.samplerLock.Lock() + i.datasetSamplers[samplerKey] = sampler + i.samplerLock.Unlock() + } + + rate := sampler.GetSampleRate(trace.TraceID) + keep := trace.ShouldKeep(trace.TraceID, rate) + span.SetAttributes( + attribute.Int("sample.rate", int(rate)), + attribute.Bool("sample.keep", keep), + ) - select { - case <-i.done: - return - case <-ctx.Done(): - return - case i.keptDecisionMessages <- msg: - default: - i.Logger.Warn().Logf("kept trace decision channel is full. Dropping message") - } + return rate, sampler.GetSampleReason(trace.TraceID, rate) } -func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg string) { - if len(msg) == 0 { - return - } - peerID, err := i.Peers.GetInstanceID() - if err != nil { - return - } +func (i *InMemCollector) forwardSpanToPeer(ctx context.Context, sp *types.Span, trace *types.Trace, targetShard sharder.Shard) { + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "collector.forwardSpan", map[string]interface{}{ + "trace_id": sp.TraceID, + "target_peer": targetShard.GetAddress(), + "source": "peer_forward", + "dataset": sp.Dataset, + }) + defer span.End() - if isMyDecision(msg, peerID) { - return - } + i.Metrics.Increment("peer_router_forward") - select { - case <-i.done: - return - case <-ctx.Done(): - return - case i.dropDecisionMessages <- msg: - default: - i.Logger.Warn().Logf("dropped trace decision channel is full. Dropping message") - } -} -func (i *InMemCollector) processTraceDecisions(msg string, decisionType decisionType) { - i.Metrics.Increment(fmt.Sprintf("%s_decision_batches_received", decisionType.String())) - if len(msg) == 0 { - return - } + // Create decision span for forwarding + dc := i.createDecisionSpan(sp, trace, targetShard) - peerID, err := i.Peers.GetInstanceID() + err := i.PeerTransmission.EnqueueEvent(dc) if err != nil { - i.Logger.Error().Logf("Failed to get peer ID. %s", err) + span.SetAttributes( + attribute.Bool("error", true), + attribute.String("error.message", err.Error()), + ) + i.Logger.Error().WithFields(map[string]interface{}{ + "error": err.Error(), + "trace_id": sp.TraceID, + "target_peer": targetShard.GetAddress(), + }).Logf("failed to forward span to peer") return } - // Deserialize the message into trace decisions - decisions := make([]TraceDecision, 0) - switch decisionType { - case keptDecision: - decisions, err = newKeptTraceDecision(msg, peerID) - if err != nil { - i.Logger.Error().Logf("Failed to unmarshal kept trace decision message. %s", err) - return - } - case dropDecision: - decisions, err = newDroppedTraceDecision(msg, peerID) - if err != nil { - i.Logger.Error().Logf("Failed to unmarshal drop trace decision message. %s", err) - return - } - default: - i.Logger.Error().Logf("unknown decision type %s while processing trace decisions", decisionType) - return - } - - i.Metrics.Count(fmt.Sprintf("%s_decisions_received", decisionType.String()), len(decisions)) - - if len(decisions) == 0 { - return - } - - toDelete := generics.NewSet[string]() - for _, decision := range decisions { - // Assume TraceDecision implements a common interface like TraceID - trace := i.cache.Get(decision.TraceID) - if trace == nil { - i.Logger.Debug().Logf("trace not found in cache for %s decision", decisionType.String()) - continue - } - toDelete.Add(decision.TraceID) - - if _, _, ok := i.sampleTraceCache.CheckTrace(decision.TraceID); !ok { - if decisionType == keptDecision { - trace.SetSampleRate(decision.Rate) - trace.KeepSample = decision.Kept - } - - i.sampleTraceCache.Record(&decision, decision.Kept, decision.Reason) - } - - i.send(context.Background(), trace, &decision) - } - - i.cache.RemoveTraces(toDelete) + span.SetAttributes(attribute.Bool("forward.success", true)) } -func (i *InMemCollector) makeDecision(ctx context.Context, trace *types.Trace, sendReason string) (*TraceDecision, error) { - if trace.Sent { - return nil, errors.New("trace already sent") - } - - ctx, span := otelutil.StartSpan(ctx, i.Tracer, "makeDecision") - defer span.End() - i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount())) - - otelutil.AddSpanFields(span, map[string]interface{}{ - "trace_id": trace.ID(), - "root": trace.RootSpan, - "send_by": trace.SendBy, - "arrival": trace.ArrivalTime, - }) - - var sampler sample.Sampler - var found bool - // get sampler key (dataset for legacy keys, environment for new keys) - samplerSelector, isLegacyKey := trace.GetSamplerKey() - - // use sampler key to find sampler; create and cache if not found - if sampler, found = i.datasetSamplers[samplerSelector]; !found { - sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerSelector, isLegacyKey) - i.datasetSamplers[samplerSelector] = sampler - } - - startGetSampleRate := i.Clock.Now() - // make sampling decision and update the trace - rate, shouldSend, reason, key := sampler.GetSampleRate(trace) - i.Metrics.Histogram("get_sample_rate_duration_ms", float64(time.Since(startGetSampleRate).Milliseconds())) - trace.SetSampleRate(rate) - trace.KeepSample = shouldSend - // This will observe sample rate attempts even if the trace is dropped - i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate)) - - i.sampleTraceCache.Record(trace, shouldSend, reason) - - var hasRoot bool - if trace.RootSpan != nil { - i.Metrics.Increment("trace_send_has_root") - hasRoot = true - } else { - i.Metrics.Increment("trace_send_no_root") - } - - otelutil.AddSpanFields(span, map[string]interface{}{ - "kept": shouldSend, - "reason": reason, - "sampler": key, - "selector": samplerSelector, - "rate": rate, - "send_reason": sendReason, - "hasRoot": hasRoot, +func (i *InMemCollector) updateTraceCache(ctx context.Context, trace *types.Trace, sp *types.Span) { + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "collector.updateCache", map[string]interface{}{ + "trace_id": trace.TraceID, + "trace.span_count": trace.SpanCount(), + "trace.span_event_count": trace.SpanEventCount(), + "trace.span_link_count": trace.SpanLinkCount(), + "cache.operation": "update", }) - i.Logger.Debug().WithField("key", key).Logf("making decision for trace") - td := TraceDecision{ - TraceID: trace.ID(), - Kept: shouldSend, - Reason: reason, - SamplerKey: key, - SamplerSelector: samplerSelector, - Rate: rate, - SendReason: sendReason, - Count: trace.SpanCount(), - EventCount: trace.SpanEventCount(), - LinkCount: trace.SpanLinkCount(), - HasRoot: hasRoot, - } - - if !i.Config.GetCollectionConfig().TraceLocalityEnabled() { - i.publishTraceDecision(ctx, td) - } - - return &td, nil -} - -func (i *InMemCollector) IsMyTrace(traceID string) (sharder.Shard, bool) { - if i.Config.GetCollectionConfig().TraceLocalityEnabled() { - return i.Sharder.MyShard(), true - } - - // if trace locality is disabled, we should only process - // traces that belong to the current refinery - targeShard := i.Sharder.WhichShard(traceID) - - return targeShard, i.Sharder.MyShard().Equals(targeShard) - -} - -func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) { - start := time.Now() - defer func() { - i.Metrics.Histogram("collector_publish_trace_decision_dur_ms", time.Since(start).Milliseconds()) - }() - - _, span := otelutil.StartSpanWith(ctx, i.Tracer, "publishTraceDecision", "decision", td.Kept) defer span.End() - if td.Kept { - select { - case <-i.done: - case i.keptDecisionBuffer <- td: - default: - i.Metrics.Increment("collector_kept_decisions_queue_full") - i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message") - } - return - } else { - select { - case <-i.done: - case i.dropDecisionBuffer <- td: - default: - i.Metrics.Increment("collector_drop_decisions_queue_full") - i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message") - } + // Check cache capacity before update + if i.cache.Size() >= i.cache.Capacity() { + span.SetAttributes( + attribute.String("cache.status", "full"), + attribute.Int("cache.size", i.cache.Size()), + attribute.Int("cache.capacity", i.cache.Capacity()), + ) + i.Metrics.Increment("trace_send_ejected_full") return } -} -func (i *InMemCollector) sendKeptDecisions() { - if i.Config.GetCollectionConfig().TraceLocalityEnabled() { - return + // Update the trace in cache + oldTrace := i.cache.Set(trace) + if oldTrace != nil { + span.SetAttributes( + attribute.Bool("cache.replaced_existing", true), + attribute.Int("cache.old_trace_span_count", int(oldTrace.SpanCount())), + ) } - interval := time.Duration(i.Config.GetCollectionConfig().KeptDecisionSendInterval) - if interval == 0 { - interval = defaultKeptDecisionTickerInterval - } - i.sendDecisions(i.keptDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize, keptDecision) -} -func (i *InMemCollector) sendDropDecisions() { - if i.Config.GetCollectionConfig().TraceLocalityEnabled() { - return - } - interval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval) - if interval == 0 { - interval = defaultDropDecisionTickerInterval - } - i.sendDecisions(i.dropDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize, dropDecision) -} - -// Unified sendDecisions function for batching and processing TraceDecisions -func (i *InMemCollector) sendDecisions(decisionChan <-chan TraceDecision, interval time.Duration, maxBatchSize int, decisionType decisionType) { - timer := i.Clock.NewTimer(interval) - defer timer.Stop() - decisions := make([]TraceDecision, 0, maxBatchSize) - send := false - eg := &errgroup.Group{} - ctx := context.Background() - var createDecisionMessage newDecisionMessage - var metricName, topic string - peerID, err := i.Peers.GetInstanceID() - if err != nil { - i.Logger.Error().Logf("Failed to get peer ID. %s", err) - return - } - switch decisionType { - case keptDecision: - metricName = "collector_kept_decisions_batch_size" - topic = keptTraceDecisionTopic - createDecisionMessage = newKeptDecisionMessage - case dropDecision: - metricName = "collector_drop_decisions_batch_size" - topic = dropTraceDecisionTopic - createDecisionMessage = newDroppedDecisionMessage - default: - i.Logger.Error().Logf("Invalid decision type") - return // invalid decision type - } - - for { - select { - case <-i.done: - eg.Wait() - return - case td, ok := <-decisionChan: - if !ok { - eg.Wait() - return - } - // Add TraceDecision to the batch - decisions = append(decisions, td) - if len(decisions) >= maxBatchSize { - send = true - } - case <-timer.Chan(): - send = true - } - - // Send the batch if ready - if send && len(decisions) > 0 { - i.Metrics.Histogram(metricName, len(decisions)) - - // Copy current batch to process - decisionsToProcess := make([]TraceDecision, len(decisions)) - copy(decisionsToProcess, decisions) - decisions = decisions[:0] // Reset the batch - - eg.Go(func() error { - select { - case <-i.done: - return nil - default: - msg, err := createDecisionMessage(decisionsToProcess, peerID) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "error": err.Error(), - }).Logf("Failed to create trace decision message") - return nil - } - err = i.PubSub.Publish(ctx, topic, msg) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "error": err.Error(), - }).Logf("Failed to publish trace decision") - } - } - return nil - }) - - // Reset timer after send - if !timer.Stop() { - select { - case <-timer.Chan(): - default: - } - } - timer.Reset(interval) - send = false - } - } + span.SetAttributes(attribute.String("cache.status", "updated")) } diff --git a/collect/collector_instrumentation.go b/collect/collector_instrumentation.go new file mode 100644 index 0000000000..dca139d5ab --- /dev/null +++ b/collect/collector_instrumentation.go @@ -0,0 +1,152 @@ +package collect + +// collector_instrumentation.go +// Contains instrumentation methods for the InMemCollector + +import ( + "context" + + "github.com/honeycombio/refinery/collect/cache" + "github.com/honeycombio/refinery/internal/otelutil" + "github.com/honeycombio/refinery/types" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// CollectorInstrumentation contains helpers for tracing collector operations +type CollectorInstrumentation struct { + tracer trace.Tracer +} + +// NewCollectorInstrumentation creates a new instrumentation helper +func NewCollectorInstrumentation(tracer trace.Tracer) *CollectorInstrumentation { + return &CollectorInstrumentation{ + tracer: tracer, + } +} + +// AddTraceInstrumentation adds trace metadata to the current span +func (ci *CollectorInstrumentation) AddTraceInstrumentation(span trace.Span, tr *types.Trace) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": tr.TraceID, + "trace.span_count": tr.SpanCount(), + "trace.span_event_count": tr.SpanEventCount(), + "trace.span_link_count": tr.SpanLinkCount(), + "trace.dataset": tr.Dataset, + "trace.api_key": tr.APIKey, + }) +} + +// AddSpanMetadata adds metadata fields to a span's Data map +func (ci *CollectorInstrumentation) AddSpanMetadata(span *types.Span, tr cache.TraceSentRecord, reason string) { + if reason != "" { + span.Data["meta.refinery.kept_reason"] = reason + } + span.Data["meta.refinery.sample_rate"] = tr.Rate() + span.Data["meta.refinery.kept"] = tr.Kept() +} + +// StartSpanWithSample starts a new instrumented span for sampling operations +func (ci *CollectorInstrumentation) StartSpanWithSample(ctx context.Context, name string, tr cache.TraceSentRecord, reason string) (context.Context, trace.Span) { + ctx, span := otelutil.StartSpanMulti(ctx, ci.tracer, name, map[string]interface{}{ + "sample.keep": tr.Kept(), + "sample.rate": tr.Rate(), + "sample.reason": reason, + }) + return ctx, span +} + +func (i *InMemCollector) addInstrumentation(ctx context.Context, tr cache.TraceSentRecord, keptReason string, sp *types.Span) { + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "collector.dealWithSentTrace", map[string]interface{}{ + "trace_id": sp.TraceID, + "sample.keep": tr.Kept(), + "sample.rate": tr.Rate(), + "sample.kept_reason": keptReason, + }) + defer span.End() + + if tr.Kept() { + if i.Config.GetAddHostMetadataToTrace() { + sp.AddField("meta.refinery.local_hostname", i.hostname) + } + sp.SampleRate = tr.Rate() + err := i.Transmission.EnqueueSpan(sp) + if err != nil { + span.SetAttributes( + attribute.Bool("error", true), + attribute.String("error.message", err.Error()), + ) + i.Logger.Error().WithField("error", err).Logf("failed to enqueue late span") + } + i.Metrics.Increment("trace_send_late_span") + } +} + +func (i *InMemCollector) addSamplingInstrumentation(ctx context.Context, trace *types.Trace, rate uint, reason string) { + if span := otelutil.SpanFromContext(ctx); span != nil { + span.SetAttributes( + attribute.String("sampler.key", trace.Dataset), + attribute.Int("sample.rate", int(rate)), + attribute.String("sample.reason", reason), + ) + } +} + +func addSpanInstrumentation(ctx context.Context, sp *types.Span, tr *types.Trace) { + if span := trace.SpanFromContext(ctx); span != nil { + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": sp.TraceID, + "span.dataset": sp.Dataset, + "span.data_size": sp.DataSize, + "trace.span_count": tr.SpanCount(), + }) + } +} + +// addSpanMetadata adds metadata fields to a span's Data map +func addSpanMetadata(sp *types.Span, hostname string, tr cache.TraceSentRecord) { + sp.Data["meta.refinery.local_hostname"] = hostname + sp.Data["meta.refinery.sample_rate"] = tr.Rate() + if reason := tr.GetKeptReason(); reason != "" { + sp.Data["meta.refinery.kept_reason"] = reason + } +} + +// addTraceInstrumentation adds trace-level attributes to the span +func addTraceInstrumentation(span trace.Span, tr *types.Trace) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": tr.TraceID, + "trace.span_count": tr.SpanCount(), + "trace.event_count": tr.SpanEventCount(), + "trace.link_count": tr.SpanLinkCount(), + "trace.dataset": tr.Dataset, + }) +} + +// addSampleInstrumentation adds sampling-related attributes to the span +func addSampleInstrumentation(span trace.Span, tr cache.TraceSentRecord, keptReason string) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "sample.keep": tr.Kept(), + "sample.rate": tr.Rate(), + "sample.reason": keptReason, + }) +} + +func addStressInstrumentation(span trace.Span, stressLevel uint, reason string) { + otelutil.AddSpanFields(span, map[string]interface{}{ + "stress.level": stressLevel, + "stress.reason": reason, + }) +} diff --git a/collect/stressRelief.go b/collect/stressRelief.go index fb69b04a74..4a5b7cae05 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -9,15 +9,19 @@ import ( "sync" "time" + "github.com/cespare/xxhash" "github.com/dgryski/go-wyhash" "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/internal/health" + "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const stressReliefTopic = "refinery-stress-relief" @@ -103,6 +107,8 @@ type StressRelief struct { stressLevels map[string]stressReport // only used in tests disableStressLevelReport bool + + Tracer trace.Tracer } const StressReliefHealthKey = "stress_relief" @@ -221,32 +227,57 @@ func (msg *stressReliefMessage) String() string { func unmarshalStressReliefMessage(msg string) (*stressReliefMessage, error) { if len(msg) < 2 { - return nil, fmt.Errorf("empty message") + return nil, fmt.Errorf("message too short (length=%d)", len(msg)) } separatorIdx := strings.IndexRune(msg, rune(stressReliefMessageSeparator[0])) - if separatorIdx == -1 { - return nil, fmt.Errorf("invalid stress relief message") + if (separatorIdx == -1) { + return nil, fmt.Errorf("invalid message format: missing separator '%s'", stressReliefMessageSeparator) } level, err := strconv.Atoi(msg[separatorIdx+1:]) if err != nil { - return nil, err + return nil, fmt.Errorf("invalid level format: %w", err) } return newStressReliefMessage(uint(level), msg[:separatorIdx]), nil } func (s *StressRelief) onStressLevelUpdate(ctx context.Context, msg string) { + ctx, span := otelutil.StartSpanWith(ctx, s.Tracer, "stressRelief.onUpdate") + defer span.End() + stressMsg, err := unmarshalStressReliefMessage(msg) if err != nil { - s.Logger.Error().Logf("failed to unmarshal stress relief message: %s", err) + s.Logger.Error().WithFields(map[string]interface{}{ + "error": err.Error(), + "error.type": fmt.Sprintf("%T", err), + "message.raw": msg, + "component": "stress_relief", + }).Logf("failed to unmarshal stress relief message") + span.SetAttributes( + "error", true, + "error.message", err.Error(), + "error.type", fmt.Sprintf("%T", err), + ) return } s.lock.Lock() defer s.lock.Unlock() + s.Logger.Debug().WithFields(map[string]interface{}{ + "stress.peer_id": stressMsg.peerID, + "stress.level": stressMsg.level, + "stress.timestamp": s.Clock.Now().String(), + "component": "stress_relief", + }).Logf("updating peer stress level") + + span.SetAttributes( + "stress.peer_id", stressMsg.peerID, + "stress.level", stressMsg.level, + ) + s.stressLevels[stressMsg.peerID] = stressReport{ key: stressMsg.peerID, level: stressMsg.level, @@ -364,7 +395,7 @@ func (s *StressRelief) square(num, denom string) float64 { stress := r * r s.Logger.Debug(). WithField("algorithm", "square"). - WithField("result", stress). + .WithField("result", stress). Logf("stress recalc: result") return stress } @@ -444,7 +475,7 @@ func (s *StressRelief) Recalc() uint { s.stressed = true case Monitor: // If it's off, should we activate it? - if !s.stressed && s.overallStressLevel >= s.activateLevel { + if (!s.stressed && s.overallStressLevel >= s.activateLevel) { s.stressed = true s.Logger.Warn().WithFields(map[string]interface{}{ "individual_stress_level": localLevel, @@ -528,11 +559,66 @@ func (s *StressRelief) Stressed() bool { } func (s *StressRelief) GetSampleRate(traceID string) (rate uint, keep bool, reason string) { + ctx, span := otelutil.StartSpanWith(context.Background(), s.Tracer, "stressRelief.getSampleRate", + "trace_id", traceID) + defer span.End() + s.lock.RLock() defer s.lock.RUnlock() - if s.sampleRate <= 1 { - return 1, true, "stress_relief/always" + + stressLevel := s.calculateClusterStress(ctx) + span.SetAttributes("stress.cluster_level", stressLevel) + + if stressLevel == 0 { + return 1, true, "stress_relief_inactive" } - hash := wyhash.Hash([]byte(traceID), hashSeed) - return uint(s.sampleRate), hash <= s.upperBound, "stress_relief/deterministic/" + s.reason + + // Use consistent sampling based on trace ID + keep = uint(xxhash.Sum64String(traceID))%100 >= stressLevel + if keep { + rate = 1 + reason = "stress_relief_sampled_in" + } else { + rate = 0 + reason = "stress_relief_sampled_out" + } + + span.SetAttributes( + "sample.keep", keep, + "sample.rate", rate, + "sample.reason", reason, + ) + + return rate, keep, reason +} + +func (s *StressRelief) calculateClusterStress(ctx context.Context) uint { + ctx, span := otelutil.StartSpanWith(ctx, s.Tracer, "stressRelief.calculateClusterStress", "stress.type", "cluster") + defer span.End() + + now := s.Clock.Now() + expirationTime := now.Add(-s.config.StressReliefReportExpiration) + var totalStressLevel uint + var validReports uint + + for _, report := range s.stressLevels { + if report.timestamp.After(expirationTime) { + totalStressLevel += report.level + validReports++ + } + } + + var clusterStress uint + if validReports > 0 { + clusterStress = totalStressLevel / validReports + } + + span.SetAttributes( + attribute.Int("stress.valid_reports", int(validReports)), + attribute.Int("stress.total_level", int(totalStressLevel)), + attribute.Int("stress.cluster_level", int(clusterStress)), + ) + + s.Metrics.Gauge("cluster_stress_level", float64(clusterStress)) + return clusterStress } diff --git a/collect/stress_relief_instrumentation.go b/collect/stress_relief_instrumentation.go new file mode 100644 index 0000000000..be5373cea4 --- /dev/null +++ b/collect/stress_relief_instrumentation.go @@ -0,0 +1,132 @@ +package collect + +import ( + "context" + + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/otelutil" + "github.com/honeycombio/refinery/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// StressReliefInstrumentation contains helpers for tracing stress relief operations +type StressReliefInstrumentation struct { + metrics metrics.Metrics + config config.StressReliefConfig +} + +// NewStressReliefInstrumentation creates a new instrumentation helper +func NewStressReliefInstrumentation(m metrics.Metrics, cfg config.StressReliefConfig) *StressReliefInstrumentation { + return &StressReliefInstrumentation{ + metrics: m, + config: cfg, + } +} + +// addStressLevelInstrumentation adds stress level information to span and metrics +func (sri *StressReliefInstrumentation) addStressLevelInstrumentation(span trace.Span, level uint) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "stress.level": level, + "stress.type": "cluster", + "stress.max_level": sri.config.MaxLevel, + }) + + if sri.metrics != nil { + sri.metrics.Gauge("cluster_stress_level", float64(level)) + } +} + +// addStressDecisionInstrumentation adds sampling decision information to span +func (sri *StressReliefInstrumentation) addStressDecisionInstrumentation(span trace.Span, traceID string, keep bool, rate uint, reason string) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": traceID, + "sample.keep": keep, + "sample.rate": rate, + "sample.reason": reason, + "sample.source": "stress_relief", + }) +} + +func (s *StressRelief) calculateClusterStressWithInstrumentation(ctx context.Context) uint { + var span trace.Span + if s.Tracer != nil { + _, span = otelutil.StartSpanWith(ctx, s.Tracer, "stressRelief.calculateClusterStress", "stress.type", "cluster") + defer span.End() + } + + now := s.Clock.Now() + expirationTime := now.Add(-s.Config.StressReliefReportExpiration) + var totalStressLevel uint + var validReports uint + + for _, report := range s.stressLevels { + if report.timestamp.After(expirationTime) { + totalStressLevel += report.level + validReports++ + } + } + + var clusterStress uint + if validReports > 0 { + clusterStress = totalStressLevel / validReports + } + + if span != nil { + span.SetAttributes( + attribute.Int("stress.valid_reports", int(validReports)), + attribute.Int("stress.total_level", int(totalStressLevel)), + attribute.Int("stress.cluster_level", int(clusterStress)), + ) + } + + s.Metrics.Gauge("cluster_stress_level", float64(clusterStress)) + return clusterStress +} + +func (s *StressRelief) addSamplingDecisionInstrumentation(traceID string, keep bool, stressLevel uint, span trace.Span) { + if span == nil { + return + } + + span.SetAttributes( + attribute.String("trace_id", traceID), + attribute.Bool("sample.keep", keep), + attribute.Int("stress.level", int(stressLevel)), + ) +} + +// updateStressLevel adds stress level instrumentation to a span +func (s *StressRelief) updateStressLevel(ctx context.Context, level uint) { + if span := trace.SpanFromContext(ctx); span != nil { + otelutil.AddSpanFields(span, map[string]interface{}{ + "stress.level": level, + "stress.timestamp": s.Clock.Now().String(), + }) + } + + s.Metrics.Gauge("cluster_stress_level", float64(level)) + s.Logger.Debug().WithField("stress_level", level).Logf("updated stress level") +} + +// addStressDecisionInstrumentation adds sampling decision instrumentation +func (s *StressRelief) addStressDecisionInstrumentation(span trace.Span, traceID string, keep bool, rate uint, reason string) { + if span == nil { + return + } + + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": traceID, + "sample.keep": keep, + "sample.rate": rate, + "sample.reason": reason, + }) +} diff --git a/collect/trace_decision.go b/collect/trace_decision.go index 2ac4d70e9d..5b45e7a9d3 100644 --- a/collect/trace_decision.go +++ b/collect/trace_decision.go @@ -2,6 +2,7 @@ package collect import ( "bytes" + "context" "encoding/gob" "fmt" "strings" @@ -9,6 +10,8 @@ import ( "github.com/golang/snappy" "github.com/honeycombio/refinery/collect/cache" + "github.com/honeycombio/refinery/internal/otelutil" + "go.opentelemetry.io/otel/trace" ) type decisionType int @@ -37,98 +40,6 @@ var ( type newDecisionMessage func(tds []TraceDecision, senderID string) (string, error) -func newDroppedDecisionMessage(tds []TraceDecision, senderID string) (string, error) { - if len(tds) == 0 { - return "", fmt.Errorf("no dropped trace decisions provided") - } - if senderID == "" { - return "", fmt.Errorf("no sender ID provided") - } - - payload := make([]string, 0, len(tds)) - for _, td := range tds { - if td.TraceID != "" { - payload = append(payload, td.TraceID) - } - } - - compressed, err := compress(strings.Join(payload, ",")) - if err != nil { - return "", err - } - return senderID + decisionMessageSeparator + string(compressed), nil -} - -func newDroppedTraceDecision(msg string, senderID string) ([]TraceDecision, error) { - // Use IndexRune here since it's faster than SplitN and requires less allocation - separatorIdx := strings.IndexRune(msg, rune(decisionMessageSeparator[0])) - if separatorIdx == -1 { - return nil, fmt.Errorf("invalid dropped decision message") - } - - // If the sender ID is the same as the current service, ignore the message - if msg[:separatorIdx] == senderID { - return nil, nil - } - - ids, err := decompressDropDecisions([]byte(msg[separatorIdx+1:])) - if err != nil { - return nil, err - } - - traceIDs := strings.Split(ids, ",") - decisions := make([]TraceDecision, 0, len(traceIDs)) - for _, traceID := range traceIDs { - decisions = append(decisions, TraceDecision{ - TraceID: traceID, - }) - } - return decisions, nil -} - -func newKeptDecisionMessage(tds []TraceDecision, senderID string) (string, error) { - if len(tds) == 0 { - return "", fmt.Errorf("no kept trace decisions provided") - } - - if senderID == "" { - return "", fmt.Errorf("no sender ID provided") - } - - compressed, err := compress(tds) - if err != nil { - return "", err - } - return senderID + decisionMessageSeparator + string(compressed), nil -} - -func newKeptTraceDecision(msg string, senderID string) ([]TraceDecision, error) { - // Use IndexRune here since it's faster than SplitN and requires less allocation - separatorIdx := strings.IndexRune(msg, rune(decisionMessageSeparator[0])) - if separatorIdx == -1 { - return nil, fmt.Errorf("invalid dropped decision message") - } - - // If the sender ID is the same as the current service, ignore the message - if msg[:separatorIdx] == senderID { - return nil, nil - } - - compressed, err := decompressKeptDecisions([]byte(msg[separatorIdx+1:])) - if err != nil { - return nil, err - } - return compressed, nil -} - -func isMyDecision(msg string, senderID string) bool { - if senderID == "" { - return false - } - - return strings.HasPrefix(msg, senderID+decisionMessageSeparator) -} - var _ cache.KeptTrace = &TraceDecision{} type TraceDecision struct { @@ -190,63 +101,178 @@ var snappyWriterPool = sync.Pool{ New: func() any { return snappy.NewBufferedWriter(nil) }, } -func compress(data any) ([]byte, error) { - // Get a buffer from the pool and reset it +// Add tracer as a field +type TraceDecisionHandler struct { + tracer trace.Tracer +} + +func (h *TraceDecisionHandler) processDecision(ctx context.Context, msg string, senderID string, isKeptDecision bool) ([]TraceDecision, error) { + ctx, span := otelutil.StartSpanMulti(ctx, h.tracer, "traceDecision.process", map[string]interface{}{ + "sender.id": senderID, + "decision.type": map[bool]string{true: "kept", false: "dropped"}[isKeptDecision], + "message.length": len(msg), + }) + defer span.End() + + separatorIdx := strings.IndexRune(msg, rune(decisionMessageSeparator[0])) + if separatorIdx == -1 { + otelutil.AddException(span, fmt.Errorf("invalid decision message format")) + return nil, fmt.Errorf("invalid decision message format") + } + + // If the sender ID is the same as the current service, ignore the message + if msg[:separatorIdx] == senderID { + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "decision.skipped": true, + "reason": "self_message", + })...) + return nil, nil + } + + var decisions []TraceDecision + var err error + if isKeptDecision { + decisions, err = h.decompressKeptDecisions(ctx, []byte(msg[separatorIdx+1:])) + } else { + var traceIDs string + traceIDs, err = h.decompressDropDecisions(ctx, []byte(msg[separatorIdx+1:])) + if err == nil { + ids := strings.Split(traceIDs, ",") + decisions = make([]TraceDecision, 0, len(ids)) + for _, id := range ids { + decisions = append(decisions, TraceDecision{TraceID: id}) + } + } + } + + if err != nil { + otelutil.AddException(span, err) + return nil, err + } + + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "decisions.count": len(decisions), + })...) + + return decisions, nil +} + +func (h *TraceDecisionHandler) newDroppedDecisionMessage(ctx context.Context, tds []TraceDecision, senderID string) (string, error) { + ctx, span := otelutil.StartSpanMulti(ctx, h.tracer, "traceDecision.newDroppedMessage", map[string]interface{}{ + "sender.id": senderID, + "decisions.count": len(tds), + }) + defer span.End() + + if len(tds) == 0 { + otelutil.AddException(span, fmt.Errorf("no dropped trace decisions provided")) + return "", fmt.Errorf("no dropped trace decisions provided") + } + if senderID == "" { + otelutil.AddException(span, fmt.Errorf("no sender ID provided")) + return "", fmt.Errorf("no sender ID provided") + } + + payload := make([]string, 0, len(tds)) + for _, td := range tds { + if td.TraceID != "" { + payload = append(payload, td.TraceID) + } + } + + compressed, err := h.compress(ctx, strings.Join(payload, ",")) + if err != nil { + otelutil.AddException(span, err) + return "", err + } + + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "compressed.size_bytes": len(compressed), + })...) + + return senderID + decisionMessageSeparator + string(compressed), nil +} + +func (h *TraceDecisionHandler) compress(ctx context.Context, data any) ([]byte, error) { + ctx, span := otelutil.StartSpan(ctx, h.tracer, "traceDecision.compress") + defer span.End() + buf := bufferPool.Get().(*bytes.Buffer) buf.Reset() defer bufferPool.Put(buf) - // Get a snappy writer from the pool, set it to write to the buffer, and reset it compr := snappyWriterPool.Get().(*snappy.Writer) compr.Reset(buf) defer snappyWriterPool.Put(compr) enc := gob.NewEncoder(compr) if err := enc.Encode(data); err != nil { + otelutil.AddException(span, err) return nil, err } - // Flush snappy writer if err := compr.Close(); err != nil { + otelutil.AddException(span, err) return nil, err } - // Copy the buffer’s bytes to avoid reuse issues when returning - return bytes.Clone(buf.Bytes()), nil + result := bytes.Clone(buf.Bytes()) + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "compression.input_size": buf.Cap(), + "compression.output_size": len(result), + })...) + + return result, nil } -func decompressKeptDecisions(data []byte) ([]TraceDecision, error) { - // Get a buffer from the pool and set it up with data +func (h *TraceDecisionHandler) decompressKeptDecisions(ctx context.Context, data []byte) ([]TraceDecision, error) { + ctx, span := otelutil.StartSpanMulti(ctx, h.tracer, "traceDecision.decompressKept", map[string]interface{}{ + "compressed.size_bytes": len(data), + }) + defer span.End() + buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf) buf.Reset() buf.Write(data) - // Snappy reader to decompress data in buffer reader := snappy.NewReader(buf) dec := gob.NewDecoder(reader) - var tds []TraceDecision if err := dec.Decode(&tds); err != nil { + otelutil.AddException(span, err) return nil, err } + + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "decisions.count": len(tds), + })...) + return tds, nil } -func decompressDropDecisions(data []byte) (string, error) { - // Get a buffer from the pool and set it up with data +func (h *TraceDecisionHandler) decompressDropDecisions(ctx context.Context, data []byte) (string, error) { + ctx, span := otelutil.StartSpanMulti(ctx, h.tracer, "traceDecision.decompressDrop", map[string]interface{}{ + "compressed.size_bytes": len(data), + }) + defer span.End() + buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf) buf.Reset() buf.Write(data) - // Snappy reader to decompress data in buffer reader := snappy.NewReader(buf) dec := gob.NewDecoder(reader) - var traceIDs string if err := dec.Decode(&traceIDs); err != nil { + otelutil.AddException(span, err) return "", err } + + span.SetAttributes(otelutil.Attributes(map[string]interface{}{ + "decompressed.size_bytes": len(traceIDs), + })...) + return traceIDs, nil } diff --git a/route/errors.go b/route/errors.go index 71cd0f3b34..335e345b3c 100644 --- a/route/errors.go +++ b/route/errors.go @@ -49,18 +49,19 @@ func (r *Router) handlerReturnWithError(w http.ResponseWriter, he handlerError, "error.err": he.err.Error(), "error.msg": he.msg, "error.status_code": he.status, + "error.detailed": he.detailed, + "error.friendly": he.friendly, + "error.type": fmt.Sprintf("%T", he.err), + "http.status_code": he.status, + "http.response_msg": he.msg, } - // this is a little jank but should work for now, we might want to rethink - // how this section of the code works to make this nicer - if he.msg == ErrCaughtPanic.msg { - fields["error.stack_trace"] = string(debug.Stack()) - } + // Add stack trace for all errors, not just panics + fields["error.stack_trace"] = string(debug.Stack()) - r.Logger.Error().WithFields(fields).Logf("handler returning error") + r.Logger.Error().WithFields(fields).Logf("handler returning error: %s", he.msg) w.WriteHeader(he.status) - errmsg := he.msg if he.detailed { @@ -72,7 +73,6 @@ func (r *Router) handlerReturnWithError(w http.ResponseWriter, he handlerError, } jsonErrMsg := []byte(`{"source":"refinery","error":"` + errmsg + `"}`) - w.Write(jsonErrMsg) } diff --git a/route/route.go b/route/route.go index ff88051420..0335869eeb 100644 --- a/route/route.go +++ b/route/route.go @@ -453,16 +453,21 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) { ctx := req.Context() reqID := ctx.Value(types.RequestIDContextKey{}) - debugLog := r.iopLogger.Debug().WithField("request_id", reqID) + debugLog := r.iopLogger.Debug(). + WithField("request_id", reqID). + WithField("path", req.URL.Path). + WithField("method", req.Method) bodyReader, err := r.getMaybeCompressedBody(req) if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to get compressed body") r.handlerReturnWithError(w, ErrPostBody, err) return } reqBod, err := io.ReadAll(bodyReader) if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to read request body") r.handlerReturnWithError(w, ErrPostBody, err) return } @@ -470,64 +475,58 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) { batchedEvents := make([]batchedEvent, 0) err = unmarshal(req, bytes.NewReader(reqBod), &batchedEvents) if err != nil { - debugLog.WithField("error", err.Error()).WithField("request.url", req.URL).WithField("json_body", string(reqBod)).Logf("error parsing json") + debugLog.WithFields(map[string]interface{}{ + "error": err.Error(), + "request.url": req.URL, + "request.content_type": req.Header.Get("Content-Type"), + "request.content_length": req.ContentLength, + "batch.size": len(reqBod), + }).Logf("error parsing batch json") r.handlerReturnWithError(w, ErrJSONFailed, err) return } - dataset, err := getDatasetFromRequest(req) - if err != nil { - r.handlerReturnWithError(w, ErrReqToEvent, err) - } - apiHost := r.Config.GetHoneycombAPI() + debugLog.WithField("batch.events_count", len(batchedEvents)).Logf("processing batch request") - apiKey := req.Header.Get(types.APIKeyHeader) - if apiKey == "" { - apiKey = req.Header.Get(types.APIKeyHeaderShort) - } + processedEvents := 0 + for _, bev := range batchedEvents { + // Convert the event data to JSON bytes for requestToEvent + eventData, err := json.Marshal(bev.Data) + if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to marshal event data") + continue + } - // get environment name - will be empty for legacy keys - environment, err := r.getEnvironmentName(apiKey) - if err != nil { - r.handlerReturnWithError(w, ErrReqToEvent, err) - } + ev, err := r.requestToEvent(ctx, req, eventData) + if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to convert batched event to event") + continue + } - userAgent := getUserAgentFromRequest(req) - batchedResponses := make([]*BatchResponse, 0, len(batchedEvents)) - for _, bev := range batchedEvents { - ev := &types.Event{ - Context: ctx, - APIHost: apiHost, - APIKey: apiKey, - Dataset: dataset, - Environment: environment, - SampleRate: bev.getSampleRate(), - Timestamp: bev.getEventTime(), - Data: bev.Data, + // Parse and set the timestamp if present + if bev.Timestamp != "" { + ts, err := time.Parse(time.RFC3339Nano, bev.Timestamp) + if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to parse event timestamp") + } else { + ev.Timestamp = ts + } } - addIncomingUserAgent(ev, userAgent) err = r.processEvent(ev, reqID) - - var resp BatchResponse - switch { - case errors.Is(err, collect.ErrWouldBlock): - resp.Status = http.StatusTooManyRequests - resp.Error = err.Error() - case err != nil: - resp.Status = http.StatusBadRequest - resp.Error = err.Error() - default: - resp.Status = http.StatusAccepted + if err != nil { + debugLog.WithField("error", err.Error()).Logf("failed to process batched event") + continue } - batchedResponses = append(batchedResponses, &resp) + processedEvents++ } - response, err := json.Marshal(batchedResponses) - if err != nil { - r.handlerReturnWithError(w, ErrJSONBuildFailed, err) - return - } - w.Write(response) + + debugLog.WithFields(map[string]interface{}{ + "batch.events_processed": processedEvents, + "batch.events_failed": len(batchedEvents) - processedEvents, + }).Logf("finished processing batch request") + + w.WriteHeader(http.StatusOK) } func (router *Router) processOTLPRequest(