diff --git a/requirements-base.txt b/requirements-base.txt index 6bb4d126c14716..c65394497517ca 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -65,7 +65,7 @@ rfc3339-validator>=0.1.2 rfc3986-validator>=0.1.1 # [end] jsonschema format validators sentry-arroyo>=2.21.0 -sentry-kafka-schemas>=1.3.2 +sentry-kafka-schemas>=1.3.6 sentry-ophio>=1.1.3 sentry-protos==0.2.0 sentry-redis-tools>=0.5.0 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index 9fa3f791811105..50aa1b84b2b29e 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -185,7 +185,7 @@ sentry-devenv==1.21.0 sentry-forked-django-stubs==5.2.0.post3 sentry-forked-djangorestframework-stubs==3.16.0.post1 sentry-forked-email-reply-parser==0.5.12.post1 -sentry-kafka-schemas==1.3.2 +sentry-kafka-schemas==1.3.6 sentry-ophio==1.1.3 sentry-protos==0.2.0 sentry-redis-tools==0.5.0 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index 7bb8144d55aa12..0e28dbe7cb77c1 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -123,7 +123,7 @@ rsa==4.8 s3transfer==0.10.0 sentry-arroyo==2.21.0 sentry-forked-email-reply-parser==0.5.12.post1 -sentry-kafka-schemas==1.3.2 +sentry-kafka-schemas==1.3.6 sentry-ophio==1.1.3 sentry-protos==0.2.0 sentry-redis-tools==0.5.0 diff --git a/src/sentry/spans/consumers/process_segments/enrichment.py b/src/sentry/spans/consumers/process_segments/enrichment.py index 962111986a710a..5f82e52de7ec7b 100644 --- a/src/sentry/spans/consumers/process_segments/enrichment.py +++ b/src/sentry/spans/consumers/process_segments/enrichment.py @@ -1,6 +1,8 @@ +from collections import defaultdict from typing import Any, cast -from sentry.spans.consumers.process_segments.types import Span +from sentry.models.project import Project +from sentry.spans.consumers.process_segments.types import MeasurementValue, Span # Keys in `sentry_tags` that are shared across all spans in a segment. This list # is taken from `extract_shared_tags` in Relay. @@ -127,7 +129,7 @@ def set_exclusive_time(spans: list[Span]) -> None: span_map: dict[str, list[tuple[int, int]]] = {} for span in spans: if parent_span_id := span.get("parent_span_id"): - interval = (_us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"])) + interval = _span_interval(span) span_map.setdefault(parent_span_id, []).append(interval) for span in spans: @@ -136,7 +138,7 @@ def set_exclusive_time(spans: list[Span]) -> None: intervals.sort(key=lambda x: (x[0], -x[1])) exclusive_time_us: int = 0 # microseconds to prevent rounding issues - start, end = _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"]) + start, end = _span_interval(span) # Progressively add time gaps before the next span and then skip to its end. for child_start, child_end in intervals: @@ -155,7 +157,75 @@ def set_exclusive_time(spans: list[Span]) -> None: span["exclusive_time_ms"] = exclusive_time_us / 1_000 +def _span_interval(span: Span) -> tuple[int, int]: + """Get the start and end timestamps of a span in microseconds.""" + return _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"]) + + def _us(timestamp: float) -> int: """Convert the floating point duration or timestamp to integer microsecond precision.""" return int(timestamp * 1_000_000) + + +def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> None: + """ + Computes breakdowns from all spans and writes them to the segment span. + + Breakdowns are measurements that are derived from the spans in the segment. + By convention, their unit is in milliseconds. In the end, these measurements + are converted into attributes on the span trace item. + """ + + config = project.get_option("sentry:breakdowns") + + for breakdown_name, breakdown_config in config.items(): + ty = breakdown_config.get("type") + + if ty == "spanOperations": + breakdowns = _compute_span_ops(spans, breakdown_config) + else: + continue + + measurements = segment.setdefault("measurements", {}) + for key, value in breakdowns.items(): + measurements[f"{breakdown_name}.{key}"] = value + + +def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, MeasurementValue]: + matches = config.get("matches") + if not matches: + return {} + + intervals_by_op = defaultdict(list) + for span in spans: + op = span.get("sentry_tags", {}).get("op", "") + if operation_name := next(filter(lambda m: op.startswith(m), matches), None): + intervals_by_op[operation_name].append(_span_interval(span)) + + measurements: dict[str, MeasurementValue] = {} + for operation_name, intervals in intervals_by_op.items(): + duration = _get_duration_us(intervals) + measurements[f"ops.{operation_name}"] = {"value": duration / 1000, "unit": "millisecond"} + return measurements + + +def _get_duration_us(intervals: list[tuple[int, int]]) -> int: + """ + Get the wall clock time duration covered by the intervals in microseconds. + + Overlapping intervals are merged so that they are not counted twice. For + example, the intervals [(1, 3), (2, 4)] would yield a duration of 3, not 4. + """ + + duration = 0 + last_end = 0 + + intervals.sort(key=lambda x: (x[0], -x[1])) + for start, end in intervals: + # Ensure the current interval doesn't overlap with the last one + start = max(start, last_end) + duration += max(end - start, 0) + last_end = end + + return duration diff --git a/src/sentry/spans/consumers/process_segments/message.py b/src/sentry/spans/consumers/process_segments/message.py index 0457f36af38112..2db83cf341862d 100644 --- a/src/sentry/spans/consumers/process_segments/message.py +++ b/src/sentry/spans/consumers/process_segments/message.py @@ -24,6 +24,7 @@ record_release_received, ) from sentry.spans.consumers.process_segments.enrichment import ( + compute_breakdowns, match_schemas, set_exclusive_time, set_shared_tags, @@ -50,6 +51,7 @@ def process_segment(unprocessed_spans: list[UnprocessedSpan]) -> list[Span]: # If the project does not exist then it might have been deleted during ingestion. return [] + compute_breakdowns(segment_span, spans, project) _create_models(segment_span, project) _detect_performance_problems(segment_span, spans, project) _record_signals(segment_span, spans, project) diff --git a/src/sentry/spans/consumers/process_segments/types.py b/src/sentry/spans/consumers/process_segments/types.py index b11e62d8a5c087..45e5b5494396aa 100644 --- a/src/sentry/spans/consumers/process_segments/types.py +++ b/src/sentry/spans/consumers/process_segments/types.py @@ -1,8 +1,10 @@ -from typing import Any, NotRequired +from typing import NotRequired +from sentry_kafka_schemas.schema_types.buffered_segments_v1 import MeasurementValue from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan as UnprocessedSpan __all__ = ( + "MeasurementValue", "Span", "UnprocessedSpan", ) @@ -14,11 +16,6 @@ class Span(UnprocessedSpan, total=True): extracted. """ - # Missing in schema - start_timestamp_precise: float - end_timestamp_precise: float - data: NotRequired[dict[str, Any]] # currently unused - # Added in enrichment exclusive_time: float exclusive_time_ms: float diff --git a/tests/sentry/spans/consumers/process_segments/test_enrichment.py b/tests/sentry/spans/consumers/process_segments/test_enrichment.py index e49e8ad934e3a6..55c1ade28f0a50 100644 --- a/tests/sentry/spans/consumers/process_segments/test_enrichment.py +++ b/tests/sentry/spans/consumers/process_segments/test_enrichment.py @@ -1,4 +1,8 @@ -from sentry.spans.consumers.process_segments.enrichment import set_exclusive_time +from sentry.spans.consumers.process_segments.enrichment import ( + compute_breakdowns, + set_exclusive_time, +) +from sentry.testutils.pytest.fixtures import django_db_all from tests.sentry.spans.consumers.process import build_mock_span # Tests ported from Relay @@ -303,3 +307,78 @@ def test_only_immediate_child_spans_affect_calculation(): "cccccccccccccccc": 400.0, "dddddddddddddddd": 400.0, } + + +@django_db_all +def test_emit_ops_breakdown(default_project): + segment_span = build_mock_span( + project_id=1, + is_segment=True, + start_timestamp_precise=1577836800.0, + end_timestamp_precise=1577858400.01, + span_id="ffffffffffffffff", + ) + + spans = [ + build_mock_span( + project_id=1, + start_timestamp_precise=1577836800.0, # 2020-01-01 00:00:00 + end_timestamp_precise=1577840400.0, # 2020-01-01 01:00:00 + span_id="fa90fdead5f74052", + parent_span_id=segment_span["span_id"], + span_op="http", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577844000.0, # 2020-01-01 02:00:00 + end_timestamp_precise=1577847600.0, # 2020-01-01 03:00:00 + span_id="bbbbbbbbbbbbbbbb", + parent_span_id=segment_span["span_id"], + span_op="db", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577845800.0, # 2020-01-01 02:30:00 + end_timestamp_precise=1577849400.0, # 2020-01-01 03:30:00 + span_id="cccccccccccccccc", + parent_span_id=segment_span["span_id"], + span_op="db.postgres", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577851200.0, # 2020-01-01 04:00:00 + end_timestamp_precise=1577853000.0, # 2020-01-01 04:30:00 + span_id="dddddddddddddddd", + parent_span_id=segment_span["span_id"], + span_op="db.mongo", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577854800.0, # 2020-01-01 05:00:00 + end_timestamp_precise=1577858400.01, # 2020-01-01 06:00:00.01 + span_id="eeeeeeeeeeeeeeee", + parent_span_id=segment_span["span_id"], + span_op="browser", + ), + segment_span, + ] + + breakdowns_config = { + "span_ops": {"type": "spanOperations", "matches": ["http", "db"]}, + "span_ops_2": {"type": "spanOperations", "matches": ["http", "db"]}, + } + default_project.update_option("sentry:breakdowns", breakdowns_config) + + # Compute breakdowns for the segment span + compute_breakdowns(segment_span, spans, default_project) + measurements = segment_span["measurements"] + + assert measurements["span_ops.ops.http"]["value"] == 3600000.0 + assert measurements["span_ops.ops.db"]["value"] == 7200000.0 + assert measurements["span_ops_2.ops.http"]["value"] == 3600000.0 + assert measurements["span_ops_2.ops.db"]["value"] == 7200000.0 + + # NOTE: Relay used to extract a total.time breakdown, which is no longer + # included in span breakdowns. + # assert measurements["span_ops.total.time"]["value"] == 14400000.01 + # assert measurements["span_ops_2.total.time"]["value"] == 14400000.01