Skip to content

Commit 647d928

Browse files
1 parent ba2270b commit 647d928

File tree

1 file changed

+40
-15
lines changed

1 file changed

+40
-15
lines changed

trace/backend_collector.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ type witnessWithInfo struct {
5555
requestEnd time.Time
5656
responseStart time.Time
5757

58+
// Mutex protecting witness while it is being processed and/or flushed.
59+
witnessMutex sync.Mutex
60+
61+
// Whether the witness has been flushed to the backend.
62+
witnessFlushed bool
63+
5864
witness *pb.Witness
5965
}
6066

@@ -210,22 +216,28 @@ func (c *BackendCollector) Process(t akinet.ParsedNetworkTraffic) error {
210216
if val, ok := c.pairCache.LoadAndDelete(partial.PairKey); ok {
211217
pair := val.(*witnessWithInfo)
212218

213-
// Combine the pair, merging the result into the existing item
214-
// rather than the new partial.
215-
learn.MergeWitness(pair.witness, partial.Witness)
216-
pair.computeProcessingLatency(isRequest, t)
217-
218-
// If partial is the request, flip the src/dst in the pair before
219-
// reporting.
220-
if isRequest {
221-
pair.srcIP, pair.dstIP = pair.dstIP, pair.srcIP
222-
pair.srcPort, pair.dstPort = pair.dstPort, pair.srcPort
223-
}
224-
225-
c.queueUpload(pair)
226-
printer.Debugf("Completed witness %v at %v -- %v\n",
227-
partial.PairKey, t.ObservationTime, t.FinalPacketTime)
219+
func() {
220+
// Lock the witness while it is being processed and flushed
221+
// and unlock it after it is flushed
222+
pair.witnessMutex.Lock()
223+
defer pair.witnessMutex.Unlock()
224+
225+
// Combine the pair, merging the result into the existing item
226+
// rather than the new partial.
227+
learn.MergeWitness(pair.witness, partial.Witness)
228+
pair.computeProcessingLatency(isRequest, t)
229+
230+
// If partial is the request, flip the src/dst in the pair before
231+
// reporting.
232+
if isRequest {
233+
pair.srcIP, pair.dstIP = pair.dstIP, pair.srcIP
234+
pair.srcPort, pair.dstPort = pair.dstPort, pair.srcPort
235+
}
228236

237+
c.queueUpload(pair)
238+
printer.Debugf("Completed witness %v at %v -- %v\n",
239+
partial.PairKey, t.ObservationTime, t.FinalPacketTime)
240+
}()
229241
} else {
230242
// Store the partial witness for now, waiting for its pair or a
231243
// flush timeout.
@@ -335,6 +347,14 @@ func excludeWitnessFromReproMode(w *pb.Witness) bool {
335347
}
336348

337349
func (c *BackendCollector) queueUpload(w *witnessWithInfo) {
350+
if w.witnessFlushed {
351+
printer.Debugf("Witness %v already flushed.\n", w.id)
352+
return
353+
}
354+
defer func() {
355+
w.witnessFlushed = true
356+
}()
357+
338358
// Mark the method as not obfuscated.
339359
w.witness.GetMethod().GetMeta().GetHttp().Obfuscation = pb.HTTPMethodMeta_NONE
340360

@@ -398,6 +418,11 @@ func (c *BackendCollector) flushPairCache(cutoffTime time.Time) {
398418
c.pairCache.Range(func(k, v interface{}) bool {
399419
e := v.(*witnessWithInfo)
400420
if e.observationTime.Before(cutoffTime) {
421+
// Lock the witness while it is being flushed
422+
// and unlock it after it is deleted from pairCache
423+
e.witnessMutex.Lock()
424+
defer e.witnessMutex.Unlock()
425+
401426
c.queueUpload(e)
402427
c.pairCache.Delete(k)
403428
}

0 commit comments

Comments
 (0)