Skip to content

Use 48 tags #1901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,8 @@ func (s *BuiltInItemValue) SetValueCounter(value float64, count float64) {
s.value.AddValueCounter(value, count)
}

func (s *Agent) shard(key *data_model.Key, metricInfo *format.MetricMetaValue) (shardID uint32, newStrategy bool, weightMul int, legacyKeyHash uint64) {
return sharding.Shard(key, metricInfo, s.NumShards(), s.shardByMetricCount, s.newShardingByName.Load())
func (s *Agent) shard(key *data_model.Key, metricInfo *format.MetricMetaValue, scratch *[]byte) (shardID uint32, newStrategy bool, weightMul int, legacyKeyHash uint64) {
return sharding.Shard(key, metricInfo, s.NumShards(), s.shardByMetricCount, s.newShardingByName.Load(), scratch)
}

// Do not create too many. ShardReplicas will iterate through values before flushing bucket
Expand Down Expand Up @@ -750,7 +750,7 @@ func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetri
h.InvalidString, 1)
return
}
shardId, newStrategy, weightMul, resolutionHash := s.shard(&h.Key, h.MetricMeta)
shardId, newStrategy, weightMul, resolutionHash := s.shard(&h.Key, h.MetricMeta, scratch)
if shardId >= uint32(len(s.Shards)) {
s.AddCounter(0, format.BuiltinMetricMetaIngestionStatus,
[]int32{h.Key.Tags[0], h.Key.Metric, format.TagValueIDSrcIngestionStatusErrShardingFailed, 0},
Expand Down Expand Up @@ -866,7 +866,7 @@ func (s *Agent) AddCounterHostAERA(t uint32, metricInfo *format.MetricMetaValue,
key.Tags[format.RouteTag] = aera.Route
key.Tags[format.BuildArchTag] = aera.BuildArch
}
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo)
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo, nil)
// resolutionHash will be 0 for built-in metrics, we are OK with this
shard := s.Shards[shardId]
shard.AddCounterHost(&key, resolutionHash, count, hostTag, metricInfo, weightMul)
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *Agent) AddCounterHostStringBytesAERA(t uint32, metricInfo *format.Metri
key.Tags[format.RouteTag] = aera.Route
key.Tags[format.BuildArchTag] = aera.BuildArch
}
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo)
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo, nil)
// resolutionHash will be 0 for built-in metrics, we are OK with this
shard := s.Shards[shardId]
shard.AddCounterHostStringBytes(&key, resolutionHash, data_model.TagUnionBytes{S: str, I: 0}, count, hostTag, metricInfo, weightMul)
Expand Down Expand Up @@ -925,7 +925,7 @@ func (s *Agent) AddValueCounterHostAERA(t uint32, metricInfo *format.MetricMetaV
key.Tags[format.RouteTag] = aera.Route
key.Tags[format.BuildArchTag] = aera.BuildArch
}
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo)
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo, nil)
// resolutionHash will be 0 for built-in metrics, we are OK with this
shard := s.Shards[shardId]
shard.AddValueCounterHost(&key, resolutionHash, value, counter, hostTag, metricInfo, weightMul)
Expand All @@ -952,7 +952,7 @@ func (s *Agent) AddValueCounterStringHostAERA(t uint32, metricInfo *format.Metri
key.Tags[format.RouteTag] = aera.Route
key.Tags[format.BuildArchTag] = aera.BuildArch
}
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo)
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo, nil)
// resolutionHash will be 0 for built-in metrics, we are OK with this
shard := s.Shards[shardId]
shard.AddValueCounterStringHost(&key, resolutionHash, topValue, value, counter, hostTag, metricInfo, weightMul)
Expand All @@ -969,7 +969,7 @@ func (s *Agent) MergeItemValue(t uint32, metricInfo *format.MetricMetaValue, tag
key.Tags[format.AggShardTag] = s.AggregatorShardKey
key.Tags[format.AggReplicaTag] = s.AggregatorReplicaKey
}
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo)
shardId, _, weightMul, resolutionHash := s.shard(&key, metricInfo, nil)
// resolutionHash will be 0 for built-in metrics, we are OK with this
shard := s.Shards[shardId]
shard.MergeItemValue(&key, resolutionHash, item, metricInfo, weightMul)
Expand Down
5 changes: 2 additions & 3 deletions internal/agent/agent_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshous
// We could arguably call h.SetKey, but there is very little difference in semantic to care
continue
}
if tagMeta.Index+1 < format.MaxTags { // TODO - remove after NewMaxTags
h.SetTag(tagMeta.Index+1, hi, tagIDKey+1) // last tag is never Raw64, checked by RestoreCachedInfo
}
h.SetTag(tagMeta.Index+1, hi, tagIDKey+1) // last tag is never Raw64, checked by RestoreCachedInfo

tagValue.I = lo
case tagMeta.Raw:
id, ok := format.ContainsRawTagValue(mem.B(v.Value))
Expand Down
11 changes: 0 additions & 11 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ func Benchmark_Original_Marshal(b *testing.B) {
}
}

func Benchmark_Hash(b *testing.B) {
var k data_model.Key
var result uint64
b.ReportAllocs()
for i := 0; i < b.N; i++ {
k.Tags[14]++
k.Tags[0] = int32(i)
result += k.Hash()
}
}

// cpu: 13th Gen Intel(R) Core(TM) i7-1360P
// Benchmark_XXHash-16 10919467 102.8 ns/op 0 B/op 0 allocs/op
func Benchmark_XXHash(b *testing.B) {
Expand Down
20 changes: 8 additions & 12 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ import (
"github.com/vkcom/statshouse/internal/vkgo/srvfunc"
)

const legacyMaxTags = 16

func getTableDesc(v3Format bool) string {
if v3Format {
keysFieldsNamesVec := make([]string, format.NewMaxTags)
for i := 0; i < format.NewMaxTags; i++ {
keysFieldsNamesVec := make([]string, format.MaxTags)
for i := 0; i < format.MaxTags; i++ {
keysFieldsNamesVec[i] = fmt.Sprintf(`tag%d,stag%d`, i, i)
}
return `statshouse_v3_incoming(index_type,metric,time,` + strings.Join(keysFieldsNamesVec, `,`) + `,count,min,max,sum,sumsquare,percentiles,uniq_state,min_host_legacy,max_host_legacy,min_host,max_host)`
}
keysFieldsNamesVec := make([]string, format.MaxTags)
for i := 0; i < format.MaxTags; i++ {
keysFieldsNamesVec := make([]string, legacyMaxTags)
for i := 0; i < legacyMaxTags; i++ {
keysFieldsNamesVec[i] = fmt.Sprintf(`key%d`, i)
}
return `statshouse_value_incoming_prekey3(metric,prekey,prekey_set,time,` + strings.Join(keysFieldsNamesVec, `,`) + `,count,min,max,sum,sumsquare,percentiles,uniq_state,skey,min_host,max_host)`
Expand Down Expand Up @@ -142,20 +144,14 @@ func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, v3
res = append(res, byte(0))
}
res = binary.LittleEndian.AppendUint32(res, k.Timestamp)
tagsN := format.MaxTags
for ki := 0; ki < tagsN; ki++ {
for ki := 0; ki < legacyMaxTags; ki++ {
res = appendTag(res, uint32(k.Tags[ki]))
}
return res
}

func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndexCache, top data_model.TagUnion, bufferedInsert bool) []byte {
appendTag := func(res []byte, k *data_model.Key, i int) []byte {
if i >= len(k.Tags) { // temporary while we in transition between 16 and 48 tags
res = binary.LittleEndian.AppendUint32(res, 0)
res = rowbinary.AppendString(res, "")
return res
}
if len(k.GetSTag(i)) > 0 {
res = binary.LittleEndian.AppendUint32(res, 0)
res = rowbinary.AppendString(res, k.GetSTag(i))
Expand All @@ -174,7 +170,7 @@ func appendKeysNewFormat(res []byte, k *data_model.Key, metricCache *metricIndex
// TODO write pretags
_ = metricCache
res = binary.LittleEndian.AppendUint32(res, k.Timestamp)
for ki := 0; ki < format.NewMaxTags; ki++ {
for ki := 0; ki < format.MaxTags; ki++ {
if ki == format.StringTopTagIndexV3 {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions internal/api/tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type tsSelectRow struct {

// all numeric tags are stored as int32 to save space
type tsTags struct {
tag [format.NewMaxTags]int64
stag [format.NewMaxTags]string
tag [format.MaxTags]int64
stag [format.MaxTags]string
shardNum uint32
stagCount int
}
Expand Down
29 changes: 0 additions & 29 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"pgregory.net/rand"

"github.com/dchest/siphash"
"github.com/hrissan/tdigest"
"github.com/zeebo/xxh3"

Expand Down Expand Up @@ -95,10 +94,6 @@ func (t TagUnionBytes) Empty() bool {
return t.I == 0 && len(t.S) == 0
}

// Randomly selected, do not change. Which keys go to which shard depends on this.
const sipKeyA = 0x3605bf49d8e3adf2
const sipKeyB = 0xc302580679a8cef2

func (s *ItemCounter) Count() float64 { return s.counter }

func (k *Key) SetSTag(i int, s string) {
Expand Down Expand Up @@ -152,30 +147,6 @@ func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTagId int32, shardTa
return &key
}

func (k *Key) Hash() uint64 {
var b [4 + 4*format.MaxTags]byte
// timestamp is not part of shard
binary.LittleEndian.PutUint32(b[:], uint32(k.Metric))
binary.LittleEndian.PutUint32(b[4+0*4:], uint32(k.Tags[0]))
binary.LittleEndian.PutUint32(b[4+1*4:], uint32(k.Tags[1]))
binary.LittleEndian.PutUint32(b[4+2*4:], uint32(k.Tags[2]))
binary.LittleEndian.PutUint32(b[4+3*4:], uint32(k.Tags[3]))
binary.LittleEndian.PutUint32(b[4+4*4:], uint32(k.Tags[4]))
binary.LittleEndian.PutUint32(b[4+5*4:], uint32(k.Tags[5]))
binary.LittleEndian.PutUint32(b[4+6*4:], uint32(k.Tags[6]))
binary.LittleEndian.PutUint32(b[4+7*4:], uint32(k.Tags[7]))
binary.LittleEndian.PutUint32(b[4+8*4:], uint32(k.Tags[8]))
binary.LittleEndian.PutUint32(b[4+9*4:], uint32(k.Tags[9]))
binary.LittleEndian.PutUint32(b[4+10*4:], uint32(k.Tags[10]))
binary.LittleEndian.PutUint32(b[4+11*4:], uint32(k.Tags[11]))
binary.LittleEndian.PutUint32(b[4+12*4:], uint32(k.Tags[12]))
binary.LittleEndian.PutUint32(b[4+13*4:], uint32(k.Tags[13]))
binary.LittleEndian.PutUint32(b[4+14*4:], uint32(k.Tags[14]))
binary.LittleEndian.PutUint32(b[4+15*4:], uint32(k.Tags[15]))
const _ = uint(16 - format.MaxTags) // compile time assert to manually add new keys above
return siphash.Hash(sipKeyA, sipKeyB, b[:])
}

// returns possibly reallocated scratch
func (k *Key) XXHash(scratch []byte) ([]byte, uint64) {
scratch, _ = k.MarshalAppend(scratch[:0])
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/known_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func publishDraftTags(meta *format.MetricMetaValue, knownTags []KnownTag) int {
// pass
}
}
if x < 1 || format.NewMaxTags <= x || (x < len(meta.Tags) && meta.Tags[x].Name != "") {
if x < 1 || format.MaxTags <= x || (x < len(meta.Tags) && meta.Tags[x].Name != "") {
continue
}
draftTag.Name = knownTag.Name
Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/mapped_metric_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MappedMetricHeader struct {
CheckedTagIndex int // we check tags one by one, remembering position here, between invocations of mapTags
ValuesChecked bool // infs, nans, etc. This might be expensive, so done only once

OriginalTagValues [format.NewMaxTags][]byte
OriginalTagValues [format.MaxTags][]byte
// original strings values as sent by user. Hash of those is stable between agents independent of
// mappings, so used as a resolution hash to deterministically place same rows into same resolution buckets

Expand Down
2 changes: 1 addition & 1 deletion internal/data_model/query_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type QueryFilter struct {

type TagFilters struct {
Metrics []*format.MetricMetaValue
Tags [format.NewMaxTags]TagFilter
Tags [format.MaxTags]TagFilter
}

type TagFilter struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/format/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func init() {
// v.WithAgentEnvRouteArch = false
// v.WithAggregatorID = false
}
for i := 0; i < NewMaxTags; i++ {
for i := 0; i < MaxTags; i++ {
name := strconv.Itoa(i)
legacyName := legacyTagIDPrefix + name
tagIDsLegacy = append(tagIDsLegacy, legacyName)
Expand Down
13 changes: 6 additions & 7 deletions internal/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
)

const (
MaxTags = 16
NewMaxTags = 48
MaxTags = 48
MaxDraftTags = 128
MaxStringLen = 128 // both for normal tags and _s, _h tags (string tops, hostnames)

Expand Down Expand Up @@ -486,9 +485,9 @@ func (m *MetricMetaValue) RestoreCachedInfo() error {
}
m.PreKeyIndex = -1
tags := m.Tags
if len(tags) > NewMaxTags { // prevent various overflows in code
tags = tags[:NewMaxTags]
err = multierr.Append(err, fmt.Errorf("too many tags, limit is: %d", NewMaxTags))
if len(tags) > MaxTags { // prevent various overflows in code
tags = tags[:MaxTags]
err = multierr.Append(err, fmt.Errorf("too many tags, limit is: %d", MaxTags))
}
for name, tag := range m.TagsDraft {
// in compact journal we clear tag.Name, so we must restore
Expand Down Expand Up @@ -519,7 +518,7 @@ func (m *MetricMetaValue) RestoreCachedInfo() error {
}
tag.Raw = tag.RawKind != "" // Raw is serialized for v2 agents only
tag.raw64 = IsRaw64Kind(tag.RawKind)
if tag.raw64 && tag.Index >= NewMaxTags-1 { // for now, to avoid overflows in v2 and v3 mapping
if tag.raw64 && tag.Index >= MaxTags-1 { // for now, to avoid overflows in v2 and v3 mapping
err = multierr.Append(err, fmt.Errorf("last tag cannot be raw64 kind %q of tag %d", tag.RawKind, i))
tag.raw64 = false
}
Expand Down Expand Up @@ -1262,7 +1261,7 @@ func SameCompactMetric(a, b *MetricMetaValue) bool {
a.RoundSampleFactors != b.RoundSampleFactors {
return false
}
for i := 0; i < NewMaxTags; i++ {
for i := 0; i < MaxTags; i++ {
ta := a.Name2Tag(TagID(i))
tb := b.Name2Tag(TagID(i))
if !SameCompactTag(ta, tb) {
Expand Down
67 changes: 67 additions & 0 deletions internal/loadgen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,60 @@ type GenericMetric interface {
Ensure(ctx context.Context, c *api.Client)
}

type manyTagsValueMetric struct {
name string
tags statshouse.Tags
resolution int

value float64
}

func (m *manyTagsValueMetric) Name() string {
return m.name
}

func (m *manyTagsValueMetric) Write(c *statshouse.Client) {
c.Value(m.name, m.tags, m.value)
}

func (m *manyTagsValueMetric) Update(now time.Time, rng *rand.Rand) {
ts := now.Unix()
switch m.resolution {
case 1:
m.value = math.Sin(math.Pi * 2. * float64(ts%300) / 300.)
case 15:
m.value = math.Sin(math.Pi * 2. * float64(ts%600) / 600.)
case 60:
m.value = math.Sin(math.Pi * 2. * float64(ts%1800) / 1800.)
default:
panic("unexpected resolution")
}
}

func (m *manyTagsValueMetric) Ensure(ctx context.Context, c *api.Client) {
metric, err := c.GetMetric(ctx, m.name)
if err != nil {
log.Printf("error getting metric: %v", err)
}
metric.Metric.Tags = []format.MetricMetaTag{
{
Description: "environment",
},
}
for i := 1; i < 48; i++ {
metric.Metric.Tags = append(metric.Metric.Tags, format.MetricMetaTag{
Name: fmt.Sprintf("tag_%d", i),
})
}
metric.Metric.Name = m.name
metric.Metric.Resolution = m.resolution
metric.Metric.Kind = format.MetricKindValue
err = c.PostMetric(ctx, metric)
if err != nil {
log.Printf("error creating metric: %v", err)
}
}

type valueMetric struct {
name string
tags statshouse.NamedTags
Expand Down Expand Up @@ -276,6 +330,19 @@ func (g *Generator) AddConstValue(resolution int, idx int) {
g.metrics = append(g.metrics, &m)
}

func (g *Generator) AddConstValueManyTags(resolution int, idx int) {
tags := statshouse.Tags{}
for i := range tags {
tags[i] = fmt.Sprint("tag_", i)
}
m := manyTagsValueMetric{
name: metricPrefixG + "const_val_many_tags_" + fmt.Sprint(resolution),
tags: tags,
resolution: resolution,
}
g.metrics = append(g.metrics, &m)
}

func (g *Generator) AddConstValueHost(resolution int, idx int, host string) {
m := valueMetric{
name: metricPrefixG + "const_val_host_" + fmt.Sprint(resolution),
Expand Down
1 change: 1 addition & 0 deletions internal/loadgen/loadgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func addMetrics(g *Generator, resolution int, idx int) {
g.AddConstPercentile(resolution, idx)
g.AddConstValueHost(resolution, idx, "host_1")
g.AddConstValueHost(resolution, idx, "host_2")
g.AddConstValueManyTags(resolution, idx)
// metrics with changing tag values
g.AddChangingCounter(resolution, idx)
g.AddChangingValue(resolution, idx)
Expand Down
2 changes: 1 addition & 1 deletion internal/mapping/pipeline_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (mp *mapPipelineV2) mapTags(h *data_model.MappedMetricHeader, metric *tlsta
// We could arguably call h.SetKey, but there is very little difference in semantic to care
continue
}
if tagMeta.Index+1 < format.MaxTags { // TODO - remove after NewMaxTags
if tagMeta.Index+1 < format.MaxTags { // TODO - remove after MaxTags
h.SetTag(tagMeta.Index+1, hi, tagIDKey+1) // last tag is never Raw64, checked by RestoreCachedInfo
}
h.SetTag(tagMeta.Index, lo, tagIDKey)
Expand Down
2 changes: 1 addition & 1 deletion internal/promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func (ev *evaluator) buildSeriesQuery(ctx context.Context, sel *parser.VectorSel
}
// filtering
var (
emptyCount [format.NewMaxTags]int // number of "MatchEqual" or "MatchRegexp" filters which are guaranteed to yield empty response
emptyCount [format.MaxTags]int // number of "MatchEqual" or "MatchRegexp" filters which are guaranteed to yield empty response
)
for _, matcher := range sel.LabelMatchers {
if strings.HasPrefix(matcher.Name, "__") {
Expand Down
Loading