Skip to content

Commit

Permalink
feat: Add tracing support (#253)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Adds tracing support to Husky that can be configured by library users
(eg beeline or otel). This is done by exposing a new global var
`AddTelemetryAttributeFunc` that husky will call internally. The new
func receives a context struct and a map of attributes with the library
responsible to setting that telemetry on whatever telemetry client being
used.

For example, a library user may configure the beeline then set the
`AddTelemetryAttributeFunc` to add the attributes to the current beeline
span like this:

```golang
husky.AddTelemetryAttributeFunc = func(ctx context.Context, key string, value any) {
	beeline.AddField(ctx, key, value)
}
```

I would like to make husky just use OTel for tracing, but some consumers
are still using beeline and I don’t want to add beeline as a dependency.

## Short description of the changes
- Adds new `AddTelemetryAttributeFunc` that library users can set to
receive telemetry fields about what husky is doing
- Updates all public funcs to also take a ctx object and pass along to
internal funcs (eg `addAttributesToMap`)
- Set telemetry attributes when handling array, bytes or kvlist
attributes
- Record the max depth of a kvlist as it's being traversed
- Add tests to verify telemetry attributes are called
  • Loading branch information
MikeGoldsmith authored Mar 15, 2024
1 parent 43f7f89 commit 99db9ba
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 48 deletions.
28 changes: 19 additions & 9 deletions otlp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

"github.com/honeycombio/husky"

jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/zstd"
collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1"
Expand Down Expand Up @@ -277,25 +279,25 @@ func getValueFromMetadata(md metadata.MD, key string) string {
// Supported types are string, bool, double, int, bytes, array, and kvlist.
// kvlist attributes are flattened to a depth of (maxDepth), if the depth is exceeded, the attribute is added as a JSON string.
// Bytes and array values are always added as JSON strings.
func AddAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyValue) {
func AddAttributesToMap(ctx context.Context, attrs map[string]interface{}, attributes []*common.KeyValue) {
for _, attr := range attributes {
// ignore entries if the key is empty or value is nil
if attr.Key == "" || attr.Value == nil {
continue
}
addAttributeToMap(attrs, attr.Key, attr.Value, 0)
addAttributeToMap(ctx, attrs, attr.Key, attr.Value, 0)
}
}

func getResourceAttributes(resource *resource.Resource) map[string]interface{} {
func getResourceAttributes(ctx context.Context, resource *resource.Resource) map[string]interface{} {
attrs := map[string]interface{}{}
if resource != nil {
AddAttributesToMap(attrs, resource.Attributes)
AddAttributesToMap(ctx, attrs, resource.Attributes)
}
return attrs
}

func getScopeAttributes(scope *common.InstrumentationScope) map[string]interface{} {
func getScopeAttributes(ctx context.Context, scope *common.InstrumentationScope) map[string]interface{} {
attrs := map[string]interface{}{}
if scope != nil {
if scope.Name != "" {
Expand All @@ -307,7 +309,7 @@ func getScopeAttributes(scope *common.InstrumentationScope) map[string]interface
if scope.Version != "" {
attrs["library.version"] = scope.Version
}
AddAttributesToMap(attrs, scope.Attributes)
AddAttributesToMap(ctx, attrs, scope.Attributes)
}
return attrs
}
Expand Down Expand Up @@ -419,7 +421,7 @@ func getMarshallableValue(value *common.AnyValue) interface{} {
// Supported types are string, bool, double, int, bytes, array, and kvlist.
// kvlist attributes are flattened to a depth of (maxDepth), if the depth is exceeded, the attribute is added as a JSON string.
// Bytes and array values are always added as JSON strings.
func addAttributeToMap(result map[string]interface{}, key string, value *common.AnyValue, depth int) {
func addAttributeToMap(ctx context.Context, result map[string]interface{}, key string, value *common.AnyValue, depth int) {
switch value.Value.(type) {
case *common.AnyValue_StringValue:
result[key] = value.GetStringValue()
Expand All @@ -429,13 +431,21 @@ func addAttributeToMap(result map[string]interface{}, key string, value *common.
result[key] = value.GetDoubleValue()
case *common.AnyValue_IntValue:
result[key] = value.GetIntValue()
case *common.AnyValue_BytesValue, *common.AnyValue_ArrayValue:
case *common.AnyValue_BytesValue:
husky.AddTelemetryAttribute(ctx, "received_bytes_attr_type", true)
addAttributeToMapAsJson(result, key, value)
case *common.AnyValue_ArrayValue:
husky.AddTelemetryAttribute(ctx, "received_array_attr_type", true)
addAttributeToMapAsJson(result, key, value)
case *common.AnyValue_KvlistValue:
husky.AddTelemetryAttributes(ctx, map[string]interface{}{
"received_kvlist_attr_type": true,
"kvlist_max_depth": depth,
})
for _, entry := range value.GetKvlistValue().Values {
k := key + "." + entry.Key
if depth < maxDepth {
addAttributeToMap(result, k, entry.Value, depth+1)
addAttributeToMap(ctx, result, k, entry.Value, depth+1)
} else {
addAttributeToMapAsJson(result, k, entry.Value)
}
Expand Down
4 changes: 2 additions & 2 deletions otlp/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestAddAttributesToMap(t *testing.T) {

for _, tc := range testCases {
attrs := map[string]interface{}{}
AddAttributesToMap(attrs, []*common.KeyValue{tc.attribute})
AddAttributesToMap(context.Background(), attrs, []*common.KeyValue{tc.attribute})
assert.Equal(t, tc.expected, attrs)
}
}
Expand Down Expand Up @@ -393,7 +393,7 @@ func Test_getValue(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
attrs := map[string]interface{}{}
addAttributeToMap(attrs, "body", tt.value, 0)
addAttributeToMap(context.Background(), attrs, "body", tt.value, 0)
assert.Equal(t, tt.want, attrs)
})
}
Expand Down
15 changes: 8 additions & 7 deletions otlp/logs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package otlp

import (
"context"
"io"
"time"

Expand All @@ -11,31 +12,31 @@ import (

// TranslateLogsRequestFromReader translates an OTLP log request into Honeycomb-friendly structure from a reader (eg HTTP body)
// RequestInfo is the parsed information from the gRPC metadata
func TranslateLogsRequestFromReader(body io.ReadCloser, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
func TranslateLogsRequestFromReader(ctx context.Context, body io.ReadCloser, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
if err := ri.ValidateLogsHeaders(); err != nil {
return nil, err
}
request := &collectorLogs.ExportLogsServiceRequest{}
if err := parseOtlpRequestBody(body, ri.ContentType, ri.ContentEncoding, request); err != nil {
return nil, ErrFailedParseBody
}
return TranslateLogsRequest(request, ri)
return TranslateLogsRequest(ctx, request, ri)
}

// TranslateLogsRequest translates an OTLP proto log request into Honeycomb-friendly structure
// RequestInfo is the parsed information from the gRPC metadata
func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
func TranslateLogsRequest(ctx context.Context, request *collectorLogs.ExportLogsServiceRequest, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
if err := ri.ValidateLogsHeaders(); err != nil {
return nil, err
}
batches := []Batch{}
for _, resourceLog := range request.ResourceLogs {
var events []Event
resourceAttrs := getResourceAttributes(resourceLog.Resource)
resourceAttrs := getResourceAttributes(ctx, resourceLog.Resource)
dataset := getLogsDataset(ri, resourceAttrs)

for _, scopeLog := range resourceLog.ScopeLogs {
scopeAttrs := getScopeAttributes(scopeLog.Scope)
scopeAttrs := getScopeAttributes(ctx, scopeLog.Scope)

for _, log := range scopeLog.GetLogRecords() {
attrs := map[string]interface{}{
Expand All @@ -57,7 +58,7 @@ func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri Re
}
if log.Body != nil {
// convert the log body to attributes, includes flattening kv pairs into multiple attributes
addAttributeToMap(attrs, "body", log.Body, 0)
addAttributeToMap(ctx, attrs, "body", log.Body, 0)
// if the body attribute is not set, add the whole body as a json string
if _, ok := attrs["body"]; !ok {
addAttributeToMapAsJson(attrs, "body", log.Body)
Expand All @@ -72,7 +73,7 @@ func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri Re
attrs[k] = v
}
if log.Attributes != nil {
AddAttributesToMap(attrs, log.Attributes)
AddAttributesToMap(ctx, attrs, log.Attributes)
}

// Now we need to wrap the eventAttrs in an event so we can specify the timestamp
Expand Down
19 changes: 10 additions & 9 deletions otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otlp

import (
"bytes"
"context"
"encoding/hex"
"io"
"strings"
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestTranslateLogsRequest(t *testing.T) {
}
for _, tC := range testCases {
t.Run(tC.Name, func(t *testing.T) {
result, err := TranslateLogsRequest(req, tC.ri)
result, err := TranslateLogsRequest(context.Background(), req, tC.ri)
assert.Nil(t, err)
assert.Equal(t, proto.Size(req), result.RequestSize)
assert.Equal(t, 1, len(result.Batches))
Expand Down Expand Up @@ -124,7 +125,7 @@ func TestTranslateHttpLogsRequest(t *testing.T) {
body, err := prepareOtlpRequestHttpBody(req, testCaseContentType, testCaseContentEncoding)
require.NoError(t, err, "Womp womp. Ought to have been able to turn the OTLP log request into an HTTP body.")

result, err := TranslateLogsRequestFromReader(io.NopCloser(strings.NewReader(body)), tC.ri)
result, err := TranslateLogsRequestFromReader(context.Background(), io.NopCloser(strings.NewReader(body)), tC.ri)
require.NoError(t, err)
assert.Equal(t, proto.Size(req), result.RequestSize)
assert.Equal(t, 1, len(result.Batches))
Expand Down Expand Up @@ -238,13 +239,13 @@ func TestLogs_DetermineDestinationDataset(t *testing.T) {

switch protocol {
case "GRPC":
result, err = TranslateLogsRequest(req, ri)
result, err = TranslateLogsRequest(context.Background(), req, ri)
require.NoError(t, err, "Wasn't able to translate that OTLP logs request.")
case "HTTP":
body, err := prepareOtlpRequestHttpBody(req, ri.ContentType, "")
require.NoError(t, err, "Womp womp. Ought to have been able to turn the OTLP log request into an HTTP body.")

result, err = TranslateLogsRequestFromReader(io.NopCloser(strings.NewReader(body)), ri)
result, err = TranslateLogsRequestFromReader(context.Background(), io.NopCloser(strings.NewReader(body)), ri)
require.NoError(t, err, "Wasn't able to translate that OTLP logs request.")
default:
t.Errorf("lolwut - What kind of protocol is %v?", protocol)
Expand Down Expand Up @@ -308,7 +309,7 @@ func TestCanDetectLogSeverity(t *testing.T) {
}},
}

result, err := TranslateLogsRequest(req, ri)
result, err := TranslateLogsRequest(context.Background(), req, ri)
assert.NotNil(t, result)
assert.Nil(t, err)
assert.Equal(t, tc.name, result.Batches[0].Events[0].Attributes["severity"])
Expand Down Expand Up @@ -421,7 +422,7 @@ func TestCanExtractBody(t *testing.T) {
}},
}

result, err := TranslateLogsRequest(req, ri)
result, err := TranslateLogsRequest(context.Background(), req, ri)
assert.NotNil(t, result)
assert.Nil(t, err)
assert.Equal(t, tc.expectedValue, result.Batches[0].Events[0].Attributes)
Expand All @@ -436,7 +437,7 @@ func TestLogsRequestWithInvalidContentTypeReturnsError(t *testing.T) {
ContentType: "application/binary",
}

result, err := TranslateLogsRequest(req, ri)
result, err := TranslateLogsRequest(context.Background(), req, ri)
assert.Nil(t, result)
assert.Equal(t, ErrInvalidContentType, err)
}
Expand All @@ -449,7 +450,7 @@ func TestLogsRequestWithInvalidBodyReturnsError(t *testing.T) {
ContentType: "application/protobuf",
}

result, err := TranslateLogsRequestFromReader(body, ri)
result, err := TranslateLogsRequestFromReader(context.Background(), body, ri)
assert.Nil(t, result)
assert.Equal(t, ErrFailedParseBody, err)
}
Expand Down Expand Up @@ -482,7 +483,7 @@ func TestLogsWithoutTraceIdDoesNotGetAnnotationType(t *testing.T) {
}},
}

result, err := TranslateLogsRequest(req, ri)
result, err := TranslateLogsRequest(context.Background(), req, ri)
assert.Nil(t, err)
assert.Equal(t, proto.Size(req), result.RequestSize)
assert.Equal(t, 1, len(result.Batches))
Expand Down
17 changes: 9 additions & 8 deletions otlp/traces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package otlp

import (
"context"
"encoding/hex"
"io"
"math"
Expand All @@ -22,31 +23,31 @@ const (

// TranslateTraceRequestFromReader translates an OTLP/HTTP request into Honeycomb-friendly structure
// RequestInfo is the parsed information from the HTTP headers
func TranslateTraceRequestFromReader(body io.ReadCloser, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
func TranslateTraceRequestFromReader(ctx context.Context, body io.ReadCloser, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
if err := ri.ValidateTracesHeaders(); err != nil {
return nil, err
}
request := &collectorTrace.ExportTraceServiceRequest{}
if err := parseOtlpRequestBody(body, ri.ContentType, ri.ContentEncoding, request); err != nil {
return nil, ErrFailedParseBody
}
return TranslateTraceRequest(request, ri)
return TranslateTraceRequest(ctx, request, ri)
}

// TranslateTraceRequest translates an OTLP/gRPC request into Honeycomb-friendly structure
// RequestInfo is the parsed information from the gRPC metadata
func TranslateTraceRequest(request *collectorTrace.ExportTraceServiceRequest, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
func TranslateTraceRequest(ctx context.Context, request *collectorTrace.ExportTraceServiceRequest, ri RequestInfo) (*TranslateOTLPRequestResult, error) {
if err := ri.ValidateTracesHeaders(); err != nil {
return nil, err
}
var batches []Batch
for _, resourceSpan := range request.ResourceSpans {
var events []Event
resourceAttrs := getResourceAttributes(resourceSpan.Resource)
resourceAttrs := getResourceAttributes(ctx, resourceSpan.Resource)
dataset := getDataset(ri, resourceAttrs)

for _, scopeSpan := range resourceSpan.ScopeSpans {
scopeAttrs := getScopeAttributes(scopeSpan.Scope)
scopeAttrs := getScopeAttributes(ctx, scopeSpan.Scope)

for _, span := range scopeSpan.GetSpans() {
traceID := BytesToTraceID(span.TraceId)
Expand Down Expand Up @@ -91,7 +92,7 @@ func TranslateTraceRequest(request *collectorTrace.ExportTraceServiceRequest, ri
eventAttrs[k] = v
}
if span.Attributes != nil {
AddAttributesToMap(eventAttrs, span.Attributes)
AddAttributesToMap(ctx, eventAttrs, span.Attributes)
}

// get sample rate after resource and scope attributes have been added
Expand Down Expand Up @@ -121,7 +122,7 @@ func TranslateTraceRequest(request *collectorTrace.ExportTraceServiceRequest, ri
}

if sevent.Attributes != nil {
AddAttributesToMap(attrs, sevent.Attributes)
AddAttributesToMap(ctx, attrs, sevent.Attributes)
}
if isError {
attrs["error"] = true
Expand Down Expand Up @@ -177,7 +178,7 @@ func TranslateTraceRequest(request *collectorTrace.ExportTraceServiceRequest, ri
}

if slink.Attributes != nil {
AddAttributesToMap(attrs, slink.Attributes)
AddAttributesToMap(ctx, attrs, slink.Attributes)
}
if isError {
attrs["error"] = true
Expand Down
Loading

0 comments on commit 99db9ba

Please sign in to comment.