Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPIK-1005] OpenTelemetry Traces ingestion endpoint #1309

Merged
merged 7 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/opik-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@
<artifactId>commons-compress</artifactId>
<version>1.27.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.29.3</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>1.5.0-alpha</version>
</dependency>

<!-- Test -->

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.comet.opik.api.resources.v1.priv;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.domain.OpenTelemetryService;
import com.comet.opik.domain.ProjectService;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.utils.AsyncUtils;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

@Path("/v1/private/otel/v1")
@Timed
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Tag(name = "OpenTelemetry Ingestion", description = "Resource to ingest Traces and Spans via OpenTelemetry")
public class OpenTelemetryResource {

private final @NonNull OpenTelemetryService openTelemetryService;
private final @NonNull Provider<RequestContext> requestContext;

@Path("/traces")
@POST
@Consumes("application/x-protobuf")
public Response receiveTraces(InputStream in) {
var projectName = requestContext.get().getHeaders()
.getOrDefault(RequestContext.PROJECT_NAME, List.of(ProjectService.DEFAULT_PROJECT))
.getFirst();
var userName = requestContext.get().getUserName();
var workspaceName = requestContext.get().getWorkspaceName();
var workspaceId = requestContext.get().getWorkspaceId();

try {
// Parse the incoming Protobuf message
ExportTraceServiceRequest traceRequest = ExportTraceServiceRequest.parseFrom(in);

log.info("Received spans batch via OpenTelemetry for project '{}' in workspace '{}'", projectName,
workspaceName);

Long stored = openTelemetryService
.parseAndStoreSpans(traceRequest, projectName)
.contextWrite(ctx -> AsyncUtils.setRequestContext(ctx, userName, workspaceName, workspaceId))
.block();

log.info("Stored {} spans via OpenTelemetry for project '{}' in workspace '{}'", stored, projectName,
workspaceName);

// Return a successful HTTP response
return Response.ok().build();
} catch (IOException e) {
// Log the error and return a 400 Bad Request response
log.error("Error parsing Protobuf payload", e);
return Response.status(Response.Status.BAD_REQUEST)
.entity("Invalid Protobuf payload")
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.comet.opik.domain;

import com.comet.opik.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.trace.v1.Span;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.text.StringEscapeUtils;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;

@UtilityClass
@Slf4j
public class OpenTelemetryMapper {
/**
* Converts an OpenTelemetry Span into an Opik Span. Despite similar conceptually, but require some translation
* of concepts, especially around ids.
*
* @param otelSpan an OpenTelemetry Span
* @return a converted Opik Span
*/
public static com.comet.opik.api.Span toOpikSpan(Span otelSpan) {
var startTimeMs = Duration.ofNanos(otelSpan.getStartTimeUnixNano()).toMillis();
var endTimeMs = Duration.ofNanos(otelSpan.getEndTimeUnixNano()).toMillis();

var otelTraceId = otelSpan.getTraceId();
var opikTraceId = convertOtelIdToUUIDv7(otelTraceId.toByteArray(), startTimeMs, true);

var otelSpanId = otelSpan.getSpanId();
var opikSpanId = convertOtelIdToUUIDv7(otelSpanId.toByteArray(), startTimeMs, true);

var otelParentSpanId = otelSpan.getParentSpanId();
var opikParentSpanId = otelParentSpanId.isEmpty()
? null
: convertOtelIdToUUIDv7(otelParentSpanId.toByteArray(), startTimeMs, true);

var attributes = convertAttributesToJson(otelSpan.getAttributesList());

return com.comet.opik.api.Span.builder()
.id(opikSpanId)
.traceId(opikTraceId)
.parentSpanId(opikParentSpanId)
.name(otelSpan.getName())
.type(SpanType.general)
.startTime(Instant.ofEpochMilli(startTimeMs))
.endTime(Instant.ofEpochMilli(endTimeMs))
.input(attributes)
.build();
}

/**
* Converts a list of protobuf KeyValue into a JsonNode, preserving their types.
*
* @param attributes a list of
* @return
*/
protected static JsonNode convertAttributesToJson(List<KeyValue> attributes) {
ObjectMapper mapper = JsonUtils.MAPPER;
ObjectNode node = mapper.createObjectNode();

// Iterate over each attribute key-value pair
attributes.forEach(attribute -> {
var key = attribute.getKey();
var value = attribute.getValue();

switch (value.getValueCase()) {
case STRING_VALUE -> node.put(key, StringEscapeUtils.unescapeJson(value.getStringValue()));
case INT_VALUE -> node.put(key, value.getIntValue());
case DOUBLE_VALUE -> node.put(key, value.getDoubleValue());
case BOOL_VALUE -> node.put(key, value.getBoolValue());
default -> log.warn("Unsupported attribute: {}", attribute);
}
});

return node;
}

static long DAY_MILLISECONDS = 24 * 60 * 60 * 1000L;

/**
* Uses 64-bit integer OpenTelemetry SpanId and its timestamp to prepare a good UUIDv7 id. This is actually
* a good UUIDv7 (in opposition of the traceId) as its composed from an id and a timestamp, so spans will be
* properly ordered in the span table.
*
* The truncate timestamp option is relevant when you receive non-UUIDs in multiple batches and can't predict
* what's going to be the actual Opik UUID from the Otel integer id you know. So we take the span timestamp truncated
* by time window as form to make it predictable. This works fine as makes UUID predictable and they are stored next
* to each other on Clickhouse, but it has two drawbacks: (1) traces might show up un-ordered in Traces page (a trace
* from Monday can appear as 'newer' than a Friday trace as their UUID have the same timestamp: Sunday at 00:00:00;
* (2) a routine running between Saturday 23:59:30 and Sunday 00:00:30 will be split in 2 traces; both incomplete.
*
* @param otelSpanId a OpenTelemetry 64-bit integer spanId
* @param spanTimestampMs a timestamp for the span in millis
* @param timeTruncate truncates the timestamp on returned UUID by a time window level
* @return a valid UUIDv7
*/
public static UUID convertOtelIdToUUIDv7(byte[] otelSpanId, long spanTimestampMs, boolean timeTruncate) {
// Prepare the 16-byte array for the UUID
byte[] uuidBytes = new byte[16];

long timestampMs = timeTruncate ? (spanTimestampMs / DAY_MILLISECONDS) * DAY_MILLISECONDS : spanTimestampMs;

// Bytes 0-5: 48-bit timestamp (big-endian)
long ts48 = timestampMs & 0xFFFFFFFFFFFFL; // 48 bits
uuidBytes[0] = (byte) ((ts48 >> 40) & 0xFF);
uuidBytes[1] = (byte) ((ts48 >> 32) & 0xFF);
uuidBytes[2] = (byte) ((ts48 >> 24) & 0xFF);
uuidBytes[3] = (byte) ((ts48 >> 16) & 0xFF);
uuidBytes[4] = (byte) ((ts48 >> 8) & 0xFF);
uuidBytes[5] = (byte) (ts48 & 0xFF);

// Bytes 6-15: 80 bits derived from the spanId hash
// Hash the spanId (8 bytes) using SHA-256 and take the first 10 bytes (80 bits)
byte[] hash = DigestUtils.sha256(otelSpanId);
System.arraycopy(hash, 0, uuidBytes, 6, 10);

// Set the version to 7 (stored in the high nibble of byte 6)
uuidBytes[6] = (byte) ((uuidBytes[6] & 0x0F) | 0x70);
// Set the variant (the two most-significant bits of byte 8 should be 10)
uuidBytes[8] = (byte) ((uuidBytes[8] & 0x3F) | 0x80);

// Build the UUID from the byte array
ByteBuffer byteBuffer = ByteBuffer.wrap(uuidBytes);
long mostSigBits = byteBuffer.getLong(); // FYI: it reads and change offset
long leastSigBits = byteBuffer.getLong();
return new UUID(mostSigBits, leastSigBits);
}

/**
* Uses 64-bit integer OpenTelemetry SpanId and its timestamp to prepare a good UUIDv7 id. This is actually
* a good UUIDv7 (in opposition of the traceId) as its composed from an id and a timestamp, so spans will be
* properly ordered in the span table.
*
* @param otelSpanId a OpenTelemetry 64-bit integer spanId
* @param timestampMs a timestamp for the span in millis
* @return a valid UUIDv7
*/
public static UUID convertOtelIdToUUIDv7(byte[] otelSpanId, long timestampMs) throws Exception {
return convertOtelIdToUUIDv7(otelSpanId, timestampMs, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.comet.opik.domain;

import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.Trace;
import com.google.inject.ImplementedBy;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ImplementedBy(OpenTelemetryServiceImpl.class)
public interface OpenTelemetryService {

Mono<Long> parseAndStoreSpans(@NonNull ExportTraceServiceRequest traceRequest, @NonNull String projectName);
}

@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Slf4j
class OpenTelemetryServiceImpl implements OpenTelemetryService {

private final @NonNull TraceService traceService;
private final @NonNull SpanService spanService;

@Override
public Mono<Long> parseAndStoreSpans(@NonNull ExportTraceServiceRequest traceRequest, @NonNull String projectName) {

var opikSpans = traceRequest.getResourceSpansList().stream()
.flatMap(resourceSpans -> resourceSpans.getScopeSpansList().stream())
.flatMap(scopeSpans -> scopeSpans.getSpansList().stream())
.map(OpenTelemetryMapper::toOpikSpan)
.map(opikSpan -> opikSpan.toBuilder()
.projectName(projectName)
.build())
.toList();

// check if there spans without parentId: we will use them as a Trace too
return Flux.fromStream(opikSpans.stream().filter(span -> span.parentSpanId() == null))
.flatMap(rootSpan -> {
var trace = Trace.builder()
.id(rootSpan.traceId())
.name(rootSpan.name())
.projectName(rootSpan.projectName())
.startTime(rootSpan.startTime())
.endTime(rootSpan.endTime())
.duration(rootSpan.duration())
.input(rootSpan.input())
.output(rootSpan.output())
.metadata(rootSpan.metadata())
.build();

return traceService.create(trace);
})
.doOnNext(traceId -> log.info("TraceId '{}' created", traceId))
.then(Mono.defer(() -> {
var spanBatch = SpanBatch.builder().spans(opikSpans).build();

log.info("Parsed OpenTelemetry span batch for project '{}' into {} spans", projectName,
opikSpans.size());

return spanService.create(spanBatch);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class RequestContext {
public static final String USER_LIMIT = "Opik-User-Limit";
public static final String USER_REMAINING_LIMIT = "Opik-User-Remaining-Limit";
public static final String USER_LIMIT_REMAINING_TTL = "Opik-User-Remaining-Limit-TTL-Millis";
public static final String PROJECT_NAME = "projectName";

private String userName;
private String workspaceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.ContentType;
import org.apache.http.HttpStatus;
import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
Expand Down Expand Up @@ -145,6 +146,25 @@ public Span getById(UUID id, String workspaceName, String apiKey) {
return response.readEntity(Span.class);
}

public Span.SpanPage getByTraceIdAndProject(UUID traceId, String projectName, String workspaceName, String apiKey) {
var requestBuilder = client.target(RESOURCE_PATH.formatted(baseURI))
.queryParam("trace_id", traceId.toString());

if (StringUtils.isNotEmpty(projectName)) {
requestBuilder = requestBuilder.queryParam("project", projectName);
}

var response = requestBuilder
.queryParam("project_name", projectName)
.request()
.header(HttpHeaders.AUTHORIZATION, apiKey)
.header(WORKSPACE_HEADER, workspaceName)
.get();

assertThat(response.getStatusInfo().getStatusCode()).isEqualTo(HttpStatus.SC_OK);
return response.readEntity(Span.SpanPage.class);
}

public void deleteSpan(UUID id, String workspaceName, String apiKey) {
try (var actualResponse = client.target(RESOURCE_PATH.formatted(baseURI))
.path(id.toString())
Expand Down
Loading