Skip to content

[stats] stop receiving from libs #1301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/data_model/mapped_metric_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type HandlerArgs struct {
Description string
ScrapeInterval int
MapCallback MapCallbackFunc
FromStatsHouse bool // mark metric which is produced by agent but used our common metric flow (host metrics)
}

type MapCallbackFunc func(tlstatshouse.MetricBytes, MappedMetricHeader)
Expand Down
27 changes: 15 additions & 12 deletions internal/format/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ const (
TagValueIDSrcIngestionStatusWarnOldCounterSemantic = 51 // never written, for historic data
TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue = 52
TagValueIDSrcIngestionStatusWarnMapTagNameFoundDraft = 53
TagValueIDSrcIngestionStatusErrReservedMetricName = 54

TagValueIDPacketFormatLegacy = 1
TagValueIDPacketFormatTL = 2
Expand Down Expand Up @@ -606,6 +607,7 @@ This metric uses sampling budgets of metric it refers to, so flooding by errors
TagValueIDSrcIngestionStatusWarnOldCounterSemantic: "warn_deprecated_counter_semantic",
TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue: "warn_map_invalid_raw_tag_value",
TagValueIDSrcIngestionStatusWarnMapTagNameFoundDraft: "warn_tag_draft_found",
TagValueIDSrcIngestionStatusErrReservedMetricName: "err_reserved_metric_name",
}),
}, {
Description: "tag_id",
Expand Down Expand Up @@ -1852,18 +1854,17 @@ Value is delta between second value and time it was inserted.`,
Name: BuiltinMetricNameAPICacheHit,
Kind: MetricKindValue,
Description: `API cache hit rate`,
Tags: []MetricMetaTag{
{Description: "environment"}, {
Description: "source",
}, {
Description: "metric",
IsMetric: true,
Raw: true,
}, {
Description: "table",
}, {
Description: "kind",
}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}},
Tags: []MetricMetaTag{{
Description: "source",
}, {
Description: "metric",
IsMetric: true,
Raw: true,
}, {
Description: "table",
}, {
Description: "kind",
}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}},
},
BuiltinMetricIDAggScrapeTargetDispatch: {
Name: "__agg_scrape_target_dispatch",
Expand Down Expand Up @@ -1977,6 +1978,7 @@ Value is delta between second value and time it was inserted.`,
BuiltinMetricIDUIErrors: true,
BuiltinMetricIDStatsHouseErrors: true,
BuiltinMetricIDPromQLEngineTime: true,
BuiltinMetricIDAPICacheHit: true,
}

builtinMetricsNoSamplingAgent = map[int32]bool{
Expand Down Expand Up @@ -2178,6 +2180,7 @@ func init() {
}
}
for k, v := range hostMetrics {
v.IsHostMetric = true
v.Tags = append([]MetricMetaTag{{Name: "hostname"}}, v.Tags...)
v.Resolution = 5
v.GroupID = BuiltinGroupIDHost
Expand Down
1 change: 1 addition & 0 deletions internal/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type MetricMetaValue struct {
ShardUniqueValues bool `json:"-"` // Experimental, set if magic word in description is found
NoSampleAgent bool `json:"-"` // Built-in metrics with fixed/limited # of rows on agent
HistorgamBuckets []float32 `json:"-"` // Prometheus histogram buckets
IsHostMetric bool `json:"-"`

GroupID int32 `json:"-"`

Expand Down
7 changes: 7 additions & 0 deletions internal/mapping/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ func (mp *mapPipeline) doMap(args data_model.HandlerArgs, h *data_model.MappedMe
h.IngestionStatus = format.TagValueIDSrcIngestionStatusErrMetricNotFound
return true
}
if h.MetricInfo.IsHostMetric && !args.FromStatsHouse {
validName, _ := format.AppendValidStringValue(metric.Name[:0], metric.Name)
h.Key.Metric = h.MetricInfo.MetricID
metric.Name = validName
h.IngestionStatus = format.TagValueIDSrcIngestionStatusErrReservedMetricName
return true
}
}
h.Key.Metric = h.MetricInfo.MetricID
if !h.MetricInfo.Visible {
Expand Down
7 changes: 7 additions & 0 deletions internal/stats/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ func diff[A float64 | uint64](new, old A) float64 {
}
return 0
}

func nonNegative[A float64 | int64](v A) float64 {
if v < 0 {
return 0
}
return float64(v)
}
64 changes: 32 additions & 32 deletions internal/stats/net_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ func calcNetDev(old, new_ *procfs.NetDevLine) (stat *devStat) {
if old == nil {
return nil
}
rxBytes := float64(new_.RxBytes) - float64(old.RxBytes)
rxPackets := float64(new_.RxPackets) - float64(old.RxPackets)
rxErrors := float64(new_.RxErrors) - float64(old.RxErrors)
rxDropped := float64(new_.RxDropped) - float64(old.RxDropped)
rxBytes := diff(new_.RxBytes, old.RxBytes)
rxPackets := diff(new_.RxPackets, old.RxPackets)
rxErrors := diff(new_.RxErrors, old.RxErrors)
rxDropped := diff(new_.RxDropped, old.RxDropped)

txBytes := float64(new_.TxBytes) - float64(old.TxBytes)
txPackets := float64(new_.TxPackets) - float64(old.TxPackets)
txErrors := float64(new_.TxErrors) - float64(old.TxErrors)
txDropped := float64(new_.TxDropped) - float64(old.TxDropped)
txBytes := diff(new_.TxBytes, old.TxBytes)
txPackets := diff(new_.TxPackets, old.TxPackets)
txErrors := diff(new_.TxErrors, old.TxErrors)
txDropped := diff(new_.TxDropped, old.TxDropped)

return &devStat{
RxBytes: rxBytes,
Expand Down Expand Up @@ -264,17 +264,17 @@ func (c *NetStats) writeSNMP(nowUnix int64) error {
}

func (c *NetStats) writePackets(nowUnix int64, stat netStat) {
tcpR := stat.tcp.InSegs - c.oldNetStat.tcp.InSegs
tcpO := stat.tcp.OutSegs - c.oldNetStat.tcp.OutSegs
tcpR := diff(stat.tcp.InSegs, c.oldNetStat.tcp.InSegs)
tcpO := diff(stat.tcp.OutSegs, c.oldNetStat.tcp.OutSegs)

ipR := stat.ip.InReceives - c.oldNetStat.ip.InReceives
ipO := stat.ip.OutRequests - c.oldNetStat.ip.OutRequests
ipR := diff(stat.ip.InReceives, c.oldNetStat.ip.InReceives)
ipO := diff(stat.ip.OutRequests, c.oldNetStat.ip.OutRequests)

udpR := stat.udp.InDatagrams - c.oldNetStat.udp.InDatagrams
udpO := stat.udp.OutDatagrams - c.oldNetStat.udp.OutDatagrams
udpR := diff(stat.udp.InDatagrams, c.oldNetStat.udp.InDatagrams)
udpO := diff(stat.udp.OutDatagrams, c.oldNetStat.udp.OutDatagrams)

icmpR := stat.icmp.InMsgs - c.oldNetStat.icmp.InMsgs
icmpO := stat.icmp.OutMsgs - c.oldNetStat.icmp.OutMsgs
icmpR := diff(stat.icmp.InMsgs, c.oldNetStat.icmp.InMsgs)
icmpO := diff(stat.icmp.OutMsgs, c.oldNetStat.icmp.OutMsgs)

if c.oldNetStat.tcp.isSuccess {
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, tcpR, format.RawIDTagReceived, format.RawIDTagTCP)
Expand All @@ -289,8 +289,8 @@ func (c *NetStats) writePackets(nowUnix int64, stat netStat) {
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, icmpO, format.RawIDTagSent, format.RawIDTagICMP)
}
if c.oldNetStat.ip.isSuccess {
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, ipR-tcpR-udpR-icmpR, format.RawIDTagReceived, format.RawIDTagOther)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, ipO-tcpO-udpO-icmpO, format.RawIDTagSent, format.RawIDTagOther)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, nonNegative(ipR-tcpR-udpR-icmpR), format.RawIDTagReceived, format.RawIDTagOther)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetPacket, nonNegative(ipO-tcpO-udpO-icmpO), format.RawIDTagSent, format.RawIDTagOther)
}
}

Expand All @@ -299,22 +299,22 @@ func (c *NetStats) writeIP(nowUnix int64, stat netStat) {
return
}

inHdrErrs := stat.ip.InHdrErrors - c.oldNetStat.ip.InHdrErrors
inHdrErrs := diff(stat.ip.InHdrErrors, c.oldNetStat.ip.InHdrErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inHdrErrs, format.RawIDTagInHdrError, format.RawIDTagIP)

inDiscards := stat.ip.InDiscards - c.oldNetStat.ip.InDiscards
inDiscards := diff(stat.ip.InDiscards, c.oldNetStat.ip.InDiscards)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inDiscards, format.RawIDTagInDiscard, format.RawIDTagIP)

outDiscard := stat.ip.OutDiscards - c.oldNetStat.ip.OutDiscards
outDiscard := diff(stat.ip.OutDiscards, c.oldNetStat.ip.OutDiscards)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, outDiscard, format.RawIDTagOutDiscard, format.RawIDTagIP)

outNoRoutes := stat.ip.OutNoRoutes - c.oldNetStat.ip.OutNoRoutes
outNoRoutes := diff(stat.ip.OutNoRoutes, c.oldNetStat.ip.OutNoRoutes)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, outNoRoutes, format.RawIDTagOutNoRoute, format.RawIDTagIP)

inAddrErrors := stat.ip.InAddrErrors - c.oldNetStat.ip.InAddrErrors
inAddrErrors := diff(stat.ip.InAddrErrors, c.oldNetStat.ip.InAddrErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inAddrErrors, format.RawIDTagInAddrError, format.RawIDTagIP)

inUnknownProtos := stat.ip.InUnknownProtos - c.oldNetStat.ip.InUnknownProtos
inUnknownProtos := diff(stat.ip.InUnknownProtos, c.oldNetStat.ip.InUnknownProtos)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inUnknownProtos, format.RawIDTagInUnknownProto, format.RawIDTagIP)
}

Expand All @@ -323,27 +323,27 @@ func (c *NetStats) writeTCP(nowUnix int64, stat netStat) {
return
}

inErrs := stat.tcp.InErrs - c.oldNetStat.tcp.InErrs
inErrs := diff(stat.tcp.InErrs, c.oldNetStat.tcp.InErrs)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inErrs, format.RawIDTagInErr, format.RawIDTagTCP)
inCsumError := stat.tcp.InCsumErrors - c.oldNetStat.tcp.InCsumErrors
inCsumError := diff(stat.tcp.InCsumErrors, c.oldNetStat.tcp.InCsumErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inCsumError, format.RawIDTagInCsumErr, format.RawIDTagTCP)
retransSegs := stat.tcp.RetransSegs - c.oldNetStat.tcp.RetransSegs
retransSegs := diff(stat.tcp.RetransSegs, c.oldNetStat.tcp.RetransSegs)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, retransSegs, format.RawIDTagRetransSeg, format.RawIDTagTCP)
}

func (c *NetStats) writeUDP(nowUnix int64, stat netStat) {
if !c.oldNetStat.udp.isSuccess {
return
}
inErrs := stat.udp.InErrors - c.oldNetStat.udp.InErrors
inErrs := diff(stat.udp.InErrors, c.oldNetStat.udp.InErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inErrs, format.RawIDTagInErrors, format.RawIDTagUDP)
inCsumError := stat.udp.InCsumErrors - c.oldNetStat.udp.InCsumErrors
inCsumError := diff(stat.udp.InCsumErrors, c.oldNetStat.udp.InCsumErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, inCsumError, format.RawIDTagInCsumErrors, format.RawIDTagUDP)
rcvbufErrors := stat.udp.RcvbufErrors - c.oldNetStat.udp.RcvbufErrors
rcvbufErrors := diff(stat.udp.RcvbufErrors, c.oldNetStat.udp.RcvbufErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, rcvbufErrors, format.RawIDTagRcvbufErrors, format.RawIDTagUDP)
sndbufErrors := stat.udp.SndbufErrors - c.oldNetStat.udp.SndbufErrors
sndbufErrors := diff(stat.udp.SndbufErrors, c.oldNetStat.udp.SndbufErrors)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, sndbufErrors, format.RawIDTagSndbufErrors, format.RawIDTagUDP)
noPorts := stat.udp.NoPorts - c.oldNetStat.udp.NoPorts
noPorts := diff(stat.udp.NoPorts, c.oldNetStat.udp.NoPorts)
c.writer.WriteSystemMetricCount(nowUnix, format.BuiltinMetricNameNetError, noPorts, format.RawIDTagNoPorts, format.RawIDTagUDP)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/stats/vmstat_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *VMStatStats) WriteMetrics(nowUnix int64) error {
for name, value := range m {
oldValue, ok := c.old[name]
if ok {
c.pushMetric(nowUnix, name, value-oldValue, m)
c.pushMetric(nowUnix, name, diff(value, oldValue), m)
}
c.old[name] = value
}
Expand Down
12 changes: 6 additions & 6 deletions internal/stats/write_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,42 +176,42 @@ func (p *MetricWriterSHImpl) WriteSystemMetricValue(nowUnix int64, name string,
m := p.metric
fillCommonMetric(p, m, true, name, nowUnix, tagsList...)
m.Value = append(m.Value, value)
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}

func (p *MetricWriterSHImpl) WriteSystemMetricCountValue(nowUnix int64, name string, count, value float64, tagsList ...int32) {
m := p.metric
fillCommonMetric(p, m, true, name, nowUnix, tagsList...)
m.Counter = count
m.Value = append(m.Value, value)
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}

func (p *MetricWriterSHImpl) WriteSystemMetricValueWithoutHost(nowUnix int64, name string, value float64, tagsList ...int32) {
m := p.metric
fillCommonMetric(p, m, false, name, nowUnix, tagsList...)
m.Value = append(m.Value, value)
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}

func (p *MetricWriterSHImpl) WriteSystemMetricCount(nowUnix int64, name string, count float64, tagsList ...int32) {
m := p.metric
fillCommonMetric(p, m, true, name, nowUnix, tagsList...)
m.Counter = count
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}

func (p *MetricWriterSHImpl) WriteSystemMetricCountValueExtendedTag(nowUnix int64, name string, count, value float64, tagsList ...Tag) {
m := p.metric
fillCommonMetric(p, m, true, name, nowUnix, tagsList...)
m.Counter = count
m.Value = append(m.Value, value)
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}

func (p *MetricWriterSHImpl) WriteSystemMetricCountExtendedTag(nowUnix int64, name string, count float64, tagsList ...Tag) {
m := p.metric
fillCommonMetric(p, m, true, name, nowUnix, tagsList...)
m.Counter = count
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m})
_, _ = p.handler.HandleMetrics(data_model.HandlerArgs{MetricBytes: m, FromStatsHouse: true})
}
Loading