diff --git a/pkg/promhttputil/merge.go b/pkg/promhttputil/merge.go index 5ebcd1389..d30f8f198 100644 --- a/pkg/promhttputil/merge.go +++ b/pkg/promhttputil/merge.go @@ -121,8 +121,13 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value, preferMax bool if index, ok := fingerPrintMap[finger]; ok { // Only replace if we have no value (which seems reasonable) // Or we prefer max value and there is a bigger value - if newValue[index].Value == model.SampleValue(0) || preferMax && newValue[index].Value < item.Value { - newValue[index].Value = item.Value + existing := newValue[index] + if preferMax { + if item.Value > existing.Value { + newValue[index].Value = item.Value + } + } else { + newValue[index].Value += item.Value } } else { newValue = append(newValue, item) @@ -148,10 +153,14 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value, preferMax bool addStream := func(stream *model.SampleStream) { finger := stream.Metric.Fingerprint() - // If we've seen this fingerPrint before, lets make sure that a value exists + // If we've seen this fingerPrint before, lets make sure that a value exists and merge the streams if index, ok := fingerPrintMap[finger]; ok { - // TODO: check this error? For now the only one is sig collision, which we check - newValue[index], _ = MergeSampleStream(antiAffinityBuffer, newValue[index], stream, preferMax) + mergedStream, err := MergeSampleStream(antiAffinityBuffer, newValue[index], stream, preferMax) + if err != nil { + // Handle merging errors (e.g., fingerprint collisions) + return + } + newValue[index] = mergedStream } else { newValue = append(newValue, stream) fingerPrintMap[finger] = len(newValue) - 1 @@ -251,7 +260,27 @@ func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream, } if !preferMax { - newValues = append(newValues, aValue) + // Summation logic + done := false + + for i := lastOffset; i < len(b.Values); i++ { + bValue := b.Values[i] + // Check if bValue falls within antiAffinityBuffer + if bValue.Timestamp > (aValue.Timestamp+antiAffinityBuffer) || bValue.Timestamp < (aValue.Timestamp-antiAffinityBuffer) { + break + } + if bValue.Timestamp == aValue.Timestamp { + aValue.Value += bValue.Value // Sum overlapping values + done = true + break + } + } + + if !done { + newValues = append(newValues, aValue) + } else { + newValues = append(newValues, aValue) + } } else { done := false