diff --git a/app/app_test.go b/app/app_test.go index 8264b6fdb0..e6a85dffa4 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -100,6 +100,7 @@ func newStartedApp( basePort int, redisDB int, enableHostMetadata bool, + storeType string, ) (*App, inject.Graph, func()) { c := &config.MockConfig{ GetTraceTimeoutVal: 10 * time.Millisecond, @@ -120,6 +121,8 @@ func newStartedApp( ShutdownDelay: config.Duration(1 * time.Millisecond), }, GetParallelismVal: 10, + GetRedisMaxActiveVal: 10, + GetRedisMaxIdleVal: 10, AddHostMetadataToTrace: enableHostMetadata, TraceIdFieldNames: []string{"trace.trace_id"}, ParentIdFieldNames: []string{"trace.parent_id"}, @@ -127,7 +130,7 @@ func newStartedApp( SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)}, StoreOptions: config.SmartWrapperOptions{ StateTicker: config.Duration(50 * time.Millisecond), - BasicStoreType: "redis", + BasicStoreType: storeType, SpanChannelSize: 10000, SendDelay: config.Duration(2 * time.Millisecond), DecisionTimeout: config.Duration(100 * time.Millisecond), @@ -137,7 +140,9 @@ func newStartedApp( // give the test a chance to override parts of the config configCallback(c) - fmt.Println("Using Redis database", c.GetRedisDatabaseVal) + if storeType == "redis" { + fmt.Println("Using Redis database", c.GetRedisDatabaseVal) + } var err error a := App{} @@ -161,14 +166,7 @@ func newStartedApp( }) assert.NoError(t, err) - var store centralstore.BasicStorer - if c.StoreOptions.BasicStoreType == "local" { - store = ¢ralstore.LocalStore{} - } else { - store = ¢ralstore.RedisBasicStore{} - } sw := ¢ralstore.SmartWrapper{} - redis := &redis.DefaultClient{} var g inject.Graph err = g.Provide( &inject.Object{Value: c}, @@ -178,8 +176,6 @@ func newStartedApp( &inject.Object{Value: clockwork.NewRealClock()}, &inject.Object{Value: trace.Tracer(noop.Tracer{}), Name: "tracer"}, &inject.Object{Value: &cache.SpanCache_basic{}}, - &inject.Object{Value: redis, Name: "redis"}, - &inject.Object{Value: store}, &inject.Object{Value: sw}, &inject.Object{Value: collector, Name: "collector"}, &inject.Object{Value: &cache.CuckooSentCache{}}, @@ -189,12 +185,27 @@ func newStartedApp( &inject.Object{Value: "test", Name: "version"}, &inject.Object{Value: samplerFactory}, &inject.Object{Value: &health.Health{}}, - &inject.Object{Value: &gossip.GossipRedis{}, Name: "gossip"}, &inject.Object{Value: &stressRelief.StressRelief{}, Name: "stressRelief"}, &inject.Object{Value: &a}, ) require.NoError(t, err) + var red redis.Client + if storeType == "redis" { + red = &redis.DefaultClient{} + err = g.Provide(&inject.Object{Value: red, Name: "redis"}) + require.NoError(t, err) + err = g.Provide(&inject.Object{Value: &gossip.GossipRedis{}, Name: "gossip"}) + require.NoError(t, err) + err = g.Provide(&inject.Object{Value: ¢ralstore.RedisBasicStore{}}) + require.NoError(t, err) + } else { + err = g.Provide(&inject.Object{Value: &gossip.InMemoryGossip{}, Name: "gossip"}) + require.NoError(t, err) + err = g.Provide(&inject.Object{Value: ¢ralstore.LocalStore{}}) + require.NoError(t, err) + } + err = g.Populate() require.NoError(t, err) @@ -204,10 +215,12 @@ func newStartedApp( // Racy: wait just a moment for ListenAndServe to start up. time.Sleep(10 * time.Millisecond) return &a, g, func() { - conn := redis.Get() - _, err := conn.Do("FLUSHDB") - assert.NoError(t, err) - conn.Close() + if storeType == "redis" { + conn := red.Get() + _, err := conn.Do("FLUSHDB") + assert.NoError(t, err) + conn.Close() + } err = startstop.Stop(g.Objects(), nil) assert.NoError(t, err) } @@ -221,206 +234,231 @@ func post(t testing.TB, req *http.Request) { resp.Body.Close() } +var storesToTest = []string{ + "redis", + "local", +} + func TestAppIntegration(t *testing.T) { port := 10500 redisDB := 2 - sender := &transmission.MockSender{} - _, _, stop := newStartedApp(t, sender, port, redisDB, false) - defer stop() - - // Send a root span, it should be sent in short order. - req := httptest.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), - strings.NewReader(`[{"data":{"trace.trace_id":"1","foo":"bar"}}]`), - ) - req.Header.Set("X-Honeycomb-Team", legacyAPIKey) - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultTransport.RoundTrip(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() - - require.Eventually(t, func() bool { - events := sender.Events() - return len(events) == 1 - }, 5*time.Second, 100*time.Millisecond) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - events := sender.Events() - - assert.Equal(collect, "dataset", events[0].Dataset) - assert.Equal(collect, "bar", events[0].Data["foo"]) - assert.Equal(collect, "1", events[0].Data["trace.trace_id"]) - assert.Equal(collect, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) - }, 5*time.Second, 100*time.Millisecond) + for _, storeType := range storesToTest { + t.Run(storeType, func(t *testing.T) { + sender := &transmission.MockSender{} + _, _, stop := newStartedApp(t, sender, port, redisDB, false, storeType) + defer stop() + + // Send a root span, it should be sent in short order. + req := httptest.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), + strings.NewReader(`[{"data":{"trace.trace_id":"1","foo":"bar"}}]`), + ) + req.Header.Set("X-Honeycomb-Team", legacyAPIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultTransport.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + require.Eventually(t, func() bool { + events := sender.Events() + return len(events) == 1 + }, 5*time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + events := sender.Events() + + assert.Equal(collect, "dataset", events[0].Dataset) + assert.Equal(collect, "bar", events[0].Data["foo"]) + assert.Equal(collect, "1", events[0].Data["trace.trace_id"]) + assert.Equal(collect, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + }, 5*time.Second, 100*time.Millisecond) + }) + } } func TestAppIntegrationWithNonLegacyKey(t *testing.T) { port := 10600 redisDB := 3 - sender := &transmission.MockSender{} - a, _, stop := newStartedApp(t, sender, port, redisDB, false) - defer stop() - a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) - - // Send a root span, it should be sent in short order. - traceID := strconv.Itoa(rand.Intn(1000)) - data := `[{"data":{"trace.trace_id":"` + traceID + `","foo":"bar"}}]` - req := httptest.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), - strings.NewReader(data), - ) - req.Header.Set("X-Honeycomb-Team", nonLegacyAPIKey) - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultTransport.RoundTrip(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() - - require.Eventually(t, func() bool { - events := sender.Events() - return len(events) == 1 - }, 5*time.Second, 100*time.Millisecond) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - events := sender.Events() - assert.Equal(t, "dataset", events[0].Dataset) - assert.Equal(t, "bar", events[0].Data["foo"]) - assert.Equal(t, traceID, events[0].Data["trace.trace_id"]) - assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) - }, 5*time.Second, 100*time.Millisecond) + for _, storeType := range storesToTest { + t.Run(storeType, func(t *testing.T) { + sender := &transmission.MockSender{} + a, _, stop := newStartedApp(t, sender, port, redisDB, false, storeType) + defer stop() + a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) + + // Send a root span, it should be sent in short order. + traceID := strconv.Itoa(rand.Intn(1000)) + data := `[{"data":{"trace.trace_id":"` + traceID + `","foo":"bar"}}]` + req := httptest.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), + strings.NewReader(data), + ) + req.Header.Set("X-Honeycomb-Team", nonLegacyAPIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultTransport.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + require.Eventually(t, func() bool { + events := sender.Events() + return len(events) == 1 + }, 5*time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + events := sender.Events() + assert.Equal(t, "dataset", events[0].Dataset) + assert.Equal(t, "bar", events[0].Data["foo"]) + assert.Equal(t, traceID, events[0].Data["trace.trace_id"]) + assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + }, 5*time.Second, 100*time.Millisecond) + }) + } } func TestAppIntegrationWithUnauthorizedKey(t *testing.T) { port := 10700 redisDB := 4 - sender := &transmission.MockSender{} - a, _, stop := newStartedApp(t, sender, port, redisDB, false) - defer stop() - a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) - - // Send a root span, it should be sent in short order. - traceID := strconv.Itoa(rand.Intn(1000)) - input := fmt.Sprintf(`[{"data":{"trace.trace_id":"%s","foo":"bar"}}]`, traceID) - req := httptest.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/v1/traces", port), - strings.NewReader(input), - ) - req.Header.Set("X-Honeycomb-Team", "badkey") - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultTransport.RoundTrip(req) - assert.NoError(t, err) - assert.Equal(t, 401, resp.StatusCode) - data, err := io.ReadAll(resp.Body) - resp.Body.Close() - assert.NoError(t, err) - assert.Contains(t, string(data), "not found in list of authorized keys") + for _, storeType := range storesToTest { + t.Run(storeType, func(t *testing.T) { + sender := &transmission.MockSender{} + a, _, stop := newStartedApp(t, sender, port, redisDB, false, storeType) + defer stop() + a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) + + // Send a root span, it should be sent in short order. + traceID := strconv.Itoa(rand.Intn(1000)) + input := fmt.Sprintf(`[{"data":{"trace.trace_id":"%s","foo":"bar"}}]`, traceID) + req := httptest.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/v1/traces", port), + strings.NewReader(input), + ) + req.Header.Set("X-Honeycomb-Team", "badkey") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultTransport.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, 401, resp.StatusCode) + data, err := io.ReadAll(resp.Body) + resp.Body.Close() + assert.NoError(t, err) + assert.Contains(t, string(data), "not found in list of authorized keys") + }) + } } func TestHostMetadataSpanAdditions(t *testing.T) { port := 14000 redisDB := 6 - sender := &transmission.MockSender{} - _, _, stop := newStartedApp(t, sender, port, redisDB, true) - defer stop() - - // Send a root span, it should be sent in short order. - req := httptest.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), - strings.NewReader(`[{"data":{"foo":"bar","trace.trace_id":"2"}}]`), - ) - req.Header.Set("X-Honeycomb-Team", legacyAPIKey) - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultTransport.RoundTrip(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() - - require.Eventually(t, func() bool { - events := sender.Events() - return len(events) == 1 - }, 5*time.Second, 100*time.Millisecond) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - events := sender.Events() - - assert.Equal(t, "dataset", events[0].Dataset) - assert.Equal(t, "bar", events[0].Data["foo"]) - assert.Equal(t, "2", events[0].Data["trace.trace_id"]) - assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) - hostname, _ := os.Hostname() - assert.Equal(t, hostname, events[0].Data["meta.refinery.decider.host.name"]) - }, 5*time.Second, 100*time.Millisecond) + for _, storeType := range storesToTest { + t.Run(storeType, func(t *testing.T) { + sender := &transmission.MockSender{} + _, _, stop := newStartedApp(t, sender, port, redisDB, true, storeType) + defer stop() + + // Send a root span, it should be sent in short order. + req := httptest.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), + strings.NewReader(`[{"data":{"foo":"bar","trace.trace_id":"2"}}]`), + ) + req.Header.Set("X-Honeycomb-Team", legacyAPIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultTransport.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + require.Eventually(t, func() bool { + events := sender.Events() + return len(events) == 1 + }, 5*time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + events := sender.Events() + + assert.Equal(t, "dataset", events[0].Dataset) + assert.Equal(t, "bar", events[0].Data["foo"]) + assert.Equal(t, "2", events[0].Data["trace.trace_id"]) + assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + hostname, _ := os.Hostname() + assert.Equal(t, hostname, events[0].Data["meta.refinery.sender.host.name"]) + }, 5*time.Second, 100*time.Millisecond) + }) + } } func TestSamplerKeys(t *testing.T) { port := 14000 redisDB := 11 - sender := &transmission.MockSender{} - sampler := &config.MockSamplerConfig{ - SampleRate: 2, - FieldList: []string{"path", "status"}, - } - configCallback = func(c *config.MockConfig) { - c.GetSamplerTypeVal = sampler - c.GetSamplerTypeName = "mock" - } - - _, _, stop := newStartedApp(t, sender, port, redisDB, true) - defer stop() - - spandata := `[ - {"data":{"trace.trace_id":"123","trace.span_id":"2","path":"/bar","status":"200","trace.parent_id":"1"}}, - {"data":{"trace.trace_id":"123","trace.span_id":"3","path":"/bar","status":"404","trace.parent_id":"2"}}, - {"data":{"trace.trace_id":"123","trace.span_id":"4","path":"/bazz","status":"200","trace.parent_id":"3"}}, - {"data":{"trace.trace_id":"123","trace.span_id":"5","path":"/buzz","status":"503","trace.parent_id":"4"}}, - {"data":{"trace.trace_id":"123","trace.span_id":"1","path":"/foo","status":"200"}} - ]` - - // send some spans - req := httptest.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), - strings.NewReader(spandata), - ) - req.Header.Set("X-Honeycomb-Team", legacyAPIKey) - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultTransport.RoundTrip(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() - - require.Eventually(t, func() bool { - events := sender.Events() - return len(events) == 5 - }, 5*time.Second, 100*time.Millisecond) + for _, storeType := range storesToTest { + t.Run(storeType, func(t *testing.T) { + sender := &transmission.MockSender{} + sampler := &config.MockSamplerConfig{ + SampleRate: 2, + FieldList: []string{"path", "status"}, + } + configCallback = func(c *config.MockConfig) { + c.GetSamplerTypeVal = sampler + c.GetSamplerTypeName = "mock" + } - require.EventuallyWithT(t, func(collect *assert.CollectT) { - events := sender.Events() - - assert.Equal(t, "dataset", events[0].Dataset) - assert.Equal(t, "123", events[0].Data["trace.trace_id"]) - assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) - hostname, _ := os.Hostname() - assert.Equal(t, hostname, events[0].Data["meta.refinery.decider.host.name"]) - assert.Equal(t, "/bar•/bazz•/buzz•/foo•,200•404•503•,", events[0].Data["meta.refinery.sample_key"]) - assert.Equal(t, 5, events[0].Data["meta.event_count"]) - assert.Equal(t, 5, events[0].Data["meta.span_count"]) - }, 5*time.Second, 100*time.Millisecond) + _, _, stop := newStartedApp(t, sender, port, redisDB, true, storeType) + defer stop() + + spandata := `[ + {"data":{"trace.trace_id":"123","trace.span_id":"2","path":"/bar","status":"200","trace.parent_id":"1"}}, + {"data":{"trace.trace_id":"123","trace.span_id":"3","path":"/bar","status":"404","trace.parent_id":"2"}}, + {"data":{"trace.trace_id":"123","trace.span_id":"4","path":"/bazz","status":"200","trace.parent_id":"3"}}, + {"data":{"trace.trace_id":"123","trace.span_id":"5","path":"/buzz","status":"503","trace.parent_id":"4"}}, + {"data":{"trace.trace_id":"123","trace.span_id":"1","path":"/foo","status":"200"}} + ]` + + // send some spans + req := httptest.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/1/batch/dataset", port), + strings.NewReader(spandata), + ) + req.Header.Set("X-Honeycomb-Team", legacyAPIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultTransport.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + require.Eventually(t, func() bool { + events := sender.Events() + return len(events) == 5 + }, 5*time.Second, 100*time.Millisecond) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + events := sender.Events() + + assert.Equal(t, "dataset", events[0].Dataset) + assert.Equal(t, "123", events[0].Data["trace.trace_id"]) + assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + hostname, _ := os.Hostname() + assert.Equal(t, hostname, events[0].Data["meta.refinery.decider.host.name"]) + assert.Equal(t, "/bar•/bazz•/buzz•/foo•,200•404•503•,", events[0].Data["meta.refinery.sample_key"]) + assert.Equal(t, 5, events[0].Data["meta.event_count"]) + assert.Equal(t, 5, events[0].Data["meta.span_count"]) + }, 5*time.Second, 100*time.Millisecond) + }) + } } func TestEventsEndpoint(t *testing.T) { @@ -434,7 +472,7 @@ func TestEventsEndpoint(t *testing.T) { var stop func() basePort := 13000 + (i * 2) senders[i] = &transmission.MockSender{} - apps[i], _, stop = newStartedApp(t, senders[i], basePort, redisDB, false) + apps[i], _, stop = newStartedApp(t, senders[i], basePort, redisDB, false, "redis") defer stop() addrs[i] = "localhost:" + strconv.Itoa(basePort) @@ -552,7 +590,6 @@ func TestEventsEndpoint(t *testing.T) { event, ) }, 3*time.Second, 2*time.Millisecond) - } func TestEventsEndpointWithNonLegacyKey(t *testing.T) { @@ -565,7 +602,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { for i := range apps { basePort := 15000 + (i * 2) senders[i] = &transmission.MockSender{} - app, _, stop := newStartedApp(t, senders[i], basePort, redisDB, false) + app, _, stop := newStartedApp(t, senders[i], basePort, redisDB, false, "redis") app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) apps[i] = app defer stop() @@ -753,7 +790,7 @@ func BenchmarkTraces(b *testing.B) { W: io.Discard, }, } - _, _, stop := newStartedApp(b, sender, 11000, 11, false) + _, _, stop := newStartedApp(b, sender, 11000, 11, false, "redis") defer stop() req, err := http.NewRequest( @@ -840,7 +877,7 @@ func BenchmarkDistributedTraces(b *testing.B) { for i := range apps { var stop func() basePort := 12000 + (i * 2) - apps[i], _, stop = newStartedApp(b, sender, basePort, 11, false) + apps[i], _, stop = newStartedApp(b, sender, basePort, 11, false, "redis") defer stop() addrs[i] = "localhost:" + strconv.Itoa(basePort) diff --git a/centralstore/local_store.go b/centralstore/local_store.go index 50f5e0b3e8..f3c905e4a7 100644 --- a/centralstore/local_store.go +++ b/centralstore/local_store.go @@ -17,7 +17,11 @@ import ( ) // LocalStore is a basic store that is local to the Refinery process. This is -// used when there is only one refinery in the system. +// used when there is only one refinery in the system. LocalStore keeps trace +// decisions (after they've been made) in the decision cache with much less +// information, and synthesizes the state of those traces from the cache. But +// when a trace is Kept, it *also* keeps it in a local store with a short TTL +// so that all the information is still available while the trace is being sent. type LocalStore struct { Config config.Config `inject:""` DecisionCache cache.TraceSentCache `inject:""` @@ -29,6 +33,7 @@ type LocalStore struct { states map[CentralTraceState]statusMap traces map[string]*CentralTrace mutex sync.RWMutex + done chan struct{} } // ensure that LocalStore implements RemoteStore @@ -55,27 +60,71 @@ func (lrs *LocalStore) Start() error { DecisionDelay, ReadyToDecide, AwaitingDecision, - // DecisionKeep, // these are in the decision cache - // DecisionDrop, // these are in the decision cache + DecisionKeep, // these are also in the decision cache + // DecisionDrop, // these are ONLY in the decision cache } for _, state := range mapStates { // initialize the map for each state lrs.states[state] = make(statusMap) } + lrs.done = make(chan struct{}) + go lrs.cleanup() return nil } func (lrs *LocalStore) Stop() error { + close(lrs.done) return nil } +var cleanupTTL = 5 * time.Minute + +// this is run every minute and deletes traces that have been in the +// DecisionKeep state in the status map for more than 5 minutes. They're only +// there to allow the sender time to process them. This is to prevent the local +// store from growing indefinitely. +func (lrs *LocalStore) cleanup() { + ticker := lrs.Clock.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-lrs.done: + return + case <-ticker.Chan(): + // make a list of traces to delete + lrs.mutex.RLock() + deletes := make([]string, 0) + for traceID, status := range lrs.states[DecisionKeep] { + if lrs.Clock.Since(status.Timestamp) > cleanupTTL { + deletes = append(deletes, traceID) + } + } + lrs.mutex.RUnlock() + // delete them + if len(deletes) > 0 { + lrs.mutex.Lock() + for _, traceID := range deletes { + delete(lrs.states[DecisionKeep], traceID) + } + lrs.mutex.Unlock() + } + } + } +} + // findTraceStatus returns the state and status of a trace, or Unknown if the trace // wasn't found in any state. If the trace is found, the status will be non-nil. // Only call this if you're holding a Lock. func (lrs *LocalStore) findTraceStatus(traceID string) (CentralTraceState, *CentralTraceStatus) { if tracerec, reason, found := lrs.DecisionCache.Test(traceID); found { - // it was in the decision cache, so we can return the right thing + // it was in the decision cache if tracerec.Kept() { + // but let's look in the local status map to see if we have more information + if status, ok := lrs.states[DecisionKeep][traceID]; ok { + // we have more information, so we return that + return DecisionKeep, status + } + // we don't have more information, so we return what we have status := NewCentralTraceStatus(traceID, DecisionKeep, lrs.Clock.Now()) status.KeepReason = reason return DecisionKeep, status @@ -325,8 +374,14 @@ func (lrs *LocalStore) KeepTraces(ctx context.Context, statuses []*CentralTraceS defer lrs.mutex.Unlock() for _, status := range statuses { if _, ok := lrs.states[AwaitingDecision][status.TraceID]; ok { + // record in the decision cache for the long term lrs.DecisionCache.Record(status, true, status.KeepReason) + // and move it to the DecisionKeep state for the short term + // note that the status we're looking at may have been updated, so we need to + // use the one we have in the statuses list + lrs.states[DecisionKeep][status.TraceID] = status delete(lrs.states[AwaitingDecision], status.TraceID) + // also remove it from the current traces list delete(lrs.traces, status.TraceID) } }