Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send bytes_received through opamp custom message #1492

Merged
merged 11 commits into from
Mar 4, 2025
119 changes: 103 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package agent
import (
"bytes"
"context"
"crypto/tls"
"errors"
"net/http"
"os"
"runtime"
Expand All @@ -13,6 +13,7 @@ import (
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/health"
"github.com/honeycombio/refinery/metrics"
"github.com/jonboulle/clockwork"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -24,39 +25,44 @@ const (
)

type Agent struct {
clock clockwork.Clock
agentType string
agentVersion string
instanceId uuid.UUID
hostname string
effectiveConfig config.Config
agentDescription *protobufs.AgentDescription
opampClient client.OpAMPClient
remoteConfigStatus *protobufs.RemoteConfigStatus
remoteConfig *protobufs.AgentRemoteConfig

opampClientCert *tls.Certificate
caCertPath string
certRequested bool
clientPrivateKeyPEM []byte
lastHealth *protobufs.ComponentHealth

logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
health health.Reporter
// opampClientCert *tls.Certificate
// caCertPath string
// certRequested bool
// clientPrivateKeyPEM []byte
lastHealth *protobufs.ComponentHealth

logger Logger
ctx context.Context
cancel context.CancelFunc
metrics metrics.Metrics
usageTracker *usageTracker
health health.Reporter
}

func NewAgent(refineryLogger Logger, agentVersion string, currentConfig config.Config, metrics metrics.Metrics, health health.Reporter) *Agent {
ctx, cancel := context.WithCancel(context.Background())
agent := &Agent{
ctx: ctx,
cancel: cancel,
clock: clockwork.NewRealClock(),
logger: refineryLogger,
agentType: serviceName,
agentVersion: agentVersion,
effectiveConfig: currentConfig,
metrics: metrics,
health: health,
usageTracker: newUsageTracker(),
}
agent.createAgentIdentity()
agent.logger.Debugf(context.Background(), "starting opamp client, id=%v", agent.instanceId)
Expand All @@ -74,6 +80,7 @@ func (agent *Agent) createAgentIdentity() {
}
agent.instanceId = uid
hostname, _ := os.Hostname()
agent.hostname = hostname
agent.agentDescription = &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
{
Expand All @@ -99,7 +106,7 @@ func (agent *Agent) createAgentIdentity() {
{
Key: "host.name",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: hostname},
Value: &protobufs.AnyValue_StringValue{StringValue: agent.hostname},
},
},
},
Expand Down Expand Up @@ -165,6 +172,7 @@ func (agent *Agent) connect() error {
agent.logger.Debugf(context.Background(), "started opamp client")

go agent.healthCheck()
go agent.reportUsagePeriodically()
return nil
}

Expand All @@ -188,20 +196,99 @@ func (agent *Agent) Stop(ctx context.Context) {

func (agent *Agent) healthCheck() {
//TODO: make this ticker configurable
timer := time.NewTicker(15 * time.Second)
timer := agent.clock.NewTicker(15 * time.Second)
for {
select {
case <-agent.ctx.Done():
case <-timer.C:
case <-timer.Chan():
report := agent.calculateHealth()
if report != nil {
agent.lastHealth = report
agent.opampClient.SetHealth(report)
if err := agent.opampClient.SetHealth(report); err != nil {
agent.logger.Errorf(context.Background(), "Could not report health to OpAMP server: %v", err)
}
}

traceUsage, ok := agent.metrics.Get("bytes_received_trace")
if !ok {
agent.logger.Errorf(context.Background(), "unexpected missing trace usage metric")
}
logUsage, ok := agent.metrics.Get("bytes_received_log")
if !ok {
agent.logger.Errorf(context.Background(), "unexpected missing log usage metric")
}

now := agent.clock.Now()
agent.usageTracker.Add(newTraceCumulativeUsage(traceUsage, now))
agent.usageTracker.Add(newLogCumulativeUsage(logUsage, now))
}
}
}

func (agent *Agent) reportUsagePeriodically() {
timer := agent.clock.NewTicker(15 * time.Second)
defer timer.Stop()

for {
select {
case <-agent.ctx.Done():
// TODO: drain the existing reports
return
case <-timer.Chan():
if err := agent.sendUsageReport(); err != nil {
if errors.Is(err, errNoData) {
agent.logger.Debugf(context.Background(), "No data to report")
continue
}
agent.logger.Errorf(context.Background(), "MONEY STEALING. Could not send usage report: %v", err)
}

}
}
}

func (agent *Agent) sendUsageReport() error {
usageReport, err := agent.usageTracker.NewReport(agent.agentType, agent.agentVersion, agent.hostname)
if err != nil {
return err
}

isSent, err := agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})

if err != nil {
if errors.Is(err, types.ErrCustomMessagePending) {
agent.logger.Debugf(context.Background(), "Usage report is pending")
select {
case <-agent.ctx.Done():
// TODO: we probably need to drain the existing reports
return agent.ctx.Err()
case <-isSent:
// Retry sending the message once
isSent, err = agent.opampClient.SendCustomMessage(&protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: usageReport,
})
if err != nil {
return err
}
}
} else {
return err
}
}

select {
case <-agent.ctx.Done():
return agent.ctx.Err()
case <-isSent:
agent.usageTracker.completeSend()
return nil
}
}

func (agent *Agent) calculateHealth() *protobufs.ComponentHealth {
lastHealth := agent.lastHealth
report := healthMessage(agent.health.IsAlive())
Expand Down
68 changes: 68 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/honeycombio/refinery/internal/health"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand All @@ -29,6 +31,7 @@ func TestAgentOnMessage_RemoteConfig(t *testing.T) {
},
}
agent := NewAgent(Logger{Logger: &logger.NullLogger{}}, "1.0.0", cfg, &metrics.NullMetrics{}, &health.Health{})
defer agent.Stop(context.Background())

testcases := []struct {
name string
Expand Down Expand Up @@ -116,6 +119,7 @@ func TestAgentOnMessage_RemoteConfig(t *testing.T) {
func TestHealthCheck(t *testing.T) {
healthReporter := &health.MockHealthReporter{}
agent := NewAgent(Logger{Logger: &logger.NullLogger{}}, "1.0.0", &config.MockConfig{}, &metrics.NullMetrics{}, healthReporter)
defer agent.Stop(context.Background())

// health check should start with false
require.False(t, agent.calculateHealth().Healthy)
Expand All @@ -128,3 +132,67 @@ func TestHealthCheck(t *testing.T) {
healthReporter.SetReady(true)
require.True(t, agent.calculateHealth().Healthy)
}

func TestAgentUsageReport(t *testing.T) {
mockClient := &MockOpAMPClient{}
ctx, cancel := context.WithCancel(context.Background())
agent := &Agent{
ctx: ctx,
cancel: cancel,
logger: Logger{&logger.NullLogger{}},
agentType: serviceName,
agentVersion: "1.0.0",
opampClient: mockClient,
effectiveConfig: &config.MockConfig{},
metrics: &metrics.NullMetrics{},
health: &health.MockHealthReporter{},
usageTracker: newUsageTracker(),
}
agent.createAgentIdentity()
agent.hostname = "my-hostname"
defer cancel()

isSent := make(chan struct{})
close(isSent)
mockClient.On("SendCustomMessage", mock.Anything).Return(isSent, nil)

now := time.Now()
agent.usageTracker.Add(newTraceCumulativeUsage(1, now))
agent.usageTracker.Add(newLogCumulativeUsage(2, now))
agent.usageTracker.Add(newTraceCumulativeUsage(3, now))
agent.usageTracker.Add(newLogCumulativeUsage(4, now))

err := agent.sendUsageReport()
require.NoError(t, err)

// Format the timestamp to match the expected format in the payload
timeUnixNano := now.UnixNano()
expectedPayload := fmt.Sprintf(`{"resourceMetrics":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"refinery"}},{"key":"service.version","value":{"stringValue":"1.0.0"}},{"key":"host.name","value":{"stringValue":"my-hostname"}}]},"scopeMetrics":[{"scope":{},"metrics":[{"name":"bytes_received","sum":{"dataPoints":[{"attributes":[{"key":"signal","value":{"stringValue":"traces"}}],"timeUnixNano":"%d","asInt":"1"},{"attributes":[{"key":"signal","value":{"stringValue":"logs"}}],"timeUnixNano":"%d","asInt":"2"},{"attributes":[{"key":"signal","value":{"stringValue":"traces"}}],"timeUnixNano":"%d","asInt":"2"},{"attributes":[{"key":"signal","value":{"stringValue":"logs"}}],"timeUnixNano":"%d","asInt":"2"}],"aggregationTemporality":1}}]}]}]}`,
timeUnixNano, timeUnixNano, timeUnixNano, timeUnixNano)

// Assert that the mock client was called with the expected custom message
mockClient.AssertCalled(t, "SendCustomMessage", &protobufs.CustomMessage{
Capability: sendAgentTelemetryCapability,
Data: []byte(expectedPayload),
})

}

// MockOpAMPClient is a mock implementation of the OpAMPClient interface
type MockOpAMPClient struct {
mock.Mock
client.OpAMPClient
}

func (m *MockOpAMPClient) SendCustomMessage(msg *protobufs.CustomMessage) (chan struct{}, error) {
args := m.Called(msg)
return args.Get(0).(chan struct{}), args.Error(1)
}

func (m *MockOpAMPClient) SetHealth(health *protobufs.ComponentHealth) error {
return nil
}

func (m *MockOpAMPClient) Stop(ctx context.Context) error {
return nil
}
55 changes: 55 additions & 0 deletions agent/otlp_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package agent

import (
"fmt"
"math"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type otlpMetrics struct {
metrics pmetric.Metrics
ms pmetric.Sum
}

func newOTLPMetrics(serviceName, version, hostname string) *otlpMetrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
resourceAttrs := rm.Resource().Attributes()
resourceAttrs.PutStr("service.name", serviceName)
resourceAttrs.PutStr("service.version", version)
resourceAttrs.PutStr("host.name", hostname)
sm := rm.ScopeMetrics().AppendEmpty()
ms := sm.Metrics().AppendEmpty()
ms.SetName("bytes_received")
sum := ms.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
return &otlpMetrics{
metrics: metrics,
ms: sum,
}
}

func (om *otlpMetrics) addOTLPSum(timestamp time.Time, value float64, signal usageSignal) error {
intVal, err := convertFloat64ToInt64(value)
if err != nil {
return err
}
d := om.ms.DataPoints().AppendEmpty()
d.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
d.SetIntValue(intVal)
d.Attributes().PutStr("signal", string(signal))
return nil
}

func convertFloat64ToInt64(value float64) (int64, error) {
if value > math.MaxInt64 {
return 0, fmt.Errorf("value %f is too large to convert to int64", value)
}
if value < 0 {
return 0, fmt.Errorf("invalid negative value %f", value)
}
return int64(value), nil
}
Loading