From 6524dd1f8427c5fafe1926dec1ab75817d5abd43 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 23 May 2025 09:42:16 +0200 Subject: [PATCH 1/5] feat(spans): Compute breakdowns in segment enrichment --- .../consumers/process_segments/enrichment.py | 97 ++++++++++++++++++- .../consumers/process_segments/message.py | 2 + 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/sentry/spans/consumers/process_segments/enrichment.py b/src/sentry/spans/consumers/process_segments/enrichment.py index 962111986a710a..26e43a2a081ea3 100644 --- a/src/sentry/spans/consumers/process_segments/enrichment.py +++ b/src/sentry/spans/consumers/process_segments/enrichment.py @@ -1,5 +1,10 @@ +from collections import defaultdict from typing import Any, cast +# TODO(ja): Fix and update the schema +from sentry_kafka_schemas.schema_types.buffered_segments_v1 import _MeasurementValue + +from sentry.models.project import Project from sentry.spans.consumers.process_segments.types import Span # Keys in `sentry_tags` that are shared across all spans in a segment. This list @@ -127,7 +132,7 @@ def set_exclusive_time(spans: list[Span]) -> None: span_map: dict[str, list[tuple[int, int]]] = {} for span in spans: if parent_span_id := span.get("parent_span_id"): - interval = (_us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"])) + interval = _span_interval(span) span_map.setdefault(parent_span_id, []).append(interval) for span in spans: @@ -136,7 +141,7 @@ def set_exclusive_time(spans: list[Span]) -> None: intervals.sort(key=lambda x: (x[0], -x[1])) exclusive_time_us: int = 0 # microseconds to prevent rounding issues - start, end = _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"]) + start, end = _span_interval(span) # Progressively add time gaps before the next span and then skip to its end. for child_start, child_end in intervals: @@ -155,7 +160,95 @@ def set_exclusive_time(spans: list[Span]) -> None: span["exclusive_time_ms"] = exclusive_time_us / 1_000 +def _span_interval(span: Span) -> tuple[int, int]: + """Get the start and end timestamps of a span in microseconds.""" + return _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"]) + + def _us(timestamp: float) -> int: """Convert the floating point duration or timestamp to integer microsecond precision.""" return int(timestamp * 1_000_000) + + +def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> None: + """ + Computes breakdowns from all spans and writes them to the segment span. + + Breakdowns are measurements that are derived from the spans in the segment. + By convention, their unit is in milliseconds. In the end, these measurements + are converted into attributes on the span trace item. + """ + + config = project.get_option("sentry:breakdowns") + + for breakdown_name, breakdown_config in config.items(): + ty = breakdown_config.get("type") + + if ty == "spanOperations": + measurements = _compute_span_ops(spans, breakdown_config) + else: + continue + + measurements = segment.setdefault("measurements", {}) + for key, value in measurements.items(): + measurements[f"{breakdown_name}.{key}"] = value + + +def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, _MeasurementValue]: + matches = config.get("matches") + if not matches: + return {} + + emit_intervals = defaultdict(list) + other_intervals = defaultdict(list) + for span in spans: + # If the span operation matches one of the prefixes, emit a breakdown + # for this interval. Otherwise, only count it for total_time. + op = span["op"] + if operation_name := next(filter(lambda m: op.startswith(m), matches), None): + emit_intervals[operation_name].append(_span_interval(span)) + else: + other_intervals[op].append(_span_interval(span)) + + total_time = 0 + measurements = {} + + for _, intervals in other_intervals.items(): + total_time += _get_duration_us(intervals) + + for operation_name, intervals in emit_intervals.items(): + total_time += _get_duration_us(intervals) + measurements[f"ops.{operation_name}"] = _measurement_us(_get_duration_us(intervals)) + + measurements["total.time"] = _measurement_us(total_time) + return measurements + + +def _get_duration_us(intervals: list[tuple[int, int]]) -> int: + """ + Get the wall clock time duration covered by the intervals in microseconds. + + Overlapping intervals are merged so that they are not counted twice. For + example, the intervals [(1, 3), (2, 4)] would yield a duration of 3, not 4. + """ + + duration = 0 + last_end = 0 + + intervals.sort(key=lambda x: (x[0], -x[1])) + for start, end in intervals: + # Ensure the current interval doesn't overlap with the last one + start = max(start, last_end) + duration += max(end - start, 0) + last_end = end + + return duration + + +def _measurement_us(value_us: int) -> _MeasurementValue: + """ + Create a measurement value from the provided value in microseconds + """ + + return {"value": value_us / 1000, "unit": "millisecond"} diff --git a/src/sentry/spans/consumers/process_segments/message.py b/src/sentry/spans/consumers/process_segments/message.py index 0457f36af38112..2db83cf341862d 100644 --- a/src/sentry/spans/consumers/process_segments/message.py +++ b/src/sentry/spans/consumers/process_segments/message.py @@ -24,6 +24,7 @@ record_release_received, ) from sentry.spans.consumers.process_segments.enrichment import ( + compute_breakdowns, match_schemas, set_exclusive_time, set_shared_tags, @@ -50,6 +51,7 @@ def process_segment(unprocessed_spans: list[UnprocessedSpan]) -> list[Span]: # If the project does not exist then it might have been deleted during ingestion. return [] + compute_breakdowns(segment_span, spans, project) _create_models(segment_span, project) _detect_performance_problems(segment_span, spans, project) _record_signals(segment_span, spans, project) From d79f10bbbb0c5399932e789698ea3d92decbc639 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 23 May 2025 09:47:49 +0200 Subject: [PATCH 2/5] ref(spans): Simplify by removing total.time measurement --- .../consumers/process_segments/enrichment.py | 32 ++++--------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/src/sentry/spans/consumers/process_segments/enrichment.py b/src/sentry/spans/consumers/process_segments/enrichment.py index 26e43a2a081ea3..6f9172979ed5cd 100644 --- a/src/sentry/spans/consumers/process_segments/enrichment.py +++ b/src/sentry/spans/consumers/process_segments/enrichment.py @@ -200,28 +200,16 @@ def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, _MeasurementV if not matches: return {} - emit_intervals = defaultdict(list) - other_intervals = defaultdict(list) + intervals_by_op = defaultdict(list) for span in spans: - # If the span operation matches one of the prefixes, emit a breakdown - # for this interval. Otherwise, only count it for total_time. op = span["op"] if operation_name := next(filter(lambda m: op.startswith(m), matches), None): - emit_intervals[operation_name].append(_span_interval(span)) - else: - other_intervals[op].append(_span_interval(span)) - - total_time = 0 - measurements = {} - - for _, intervals in other_intervals.items(): - total_time += _get_duration_us(intervals) + intervals_by_op[operation_name].append(_span_interval(span)) - for operation_name, intervals in emit_intervals.items(): - total_time += _get_duration_us(intervals) - measurements[f"ops.{operation_name}"] = _measurement_us(_get_duration_us(intervals)) - - measurements["total.time"] = _measurement_us(total_time) + measurements: dict[str, _MeasurementValue] = {} + for operation_name, intervals in intervals_by_op.items(): + duration = _get_duration_us(intervals) + measurements[f"ops.{operation_name}"] = {"value": duration / 1000, "unit": "millisecond"} return measurements @@ -244,11 +232,3 @@ def _get_duration_us(intervals: list[tuple[int, int]]) -> int: last_end = end return duration - - -def _measurement_us(value_us: int) -> _MeasurementValue: - """ - Create a measurement value from the provided value in microseconds - """ - - return {"value": value_us / 1000, "unit": "millisecond"} From 30f1b9c6675636f4a71ba0fb8f31dca1499b7505 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 23 May 2025 10:23:11 +0200 Subject: [PATCH 3/5] test: Tests and fixes --- .../consumers/process_segments/enrichment.py | 6 +- .../process_segments/test_enrichment.py | 81 ++++++++++++++++++- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/sentry/spans/consumers/process_segments/enrichment.py b/src/sentry/spans/consumers/process_segments/enrichment.py index 6f9172979ed5cd..5579c49e2c2578 100644 --- a/src/sentry/spans/consumers/process_segments/enrichment.py +++ b/src/sentry/spans/consumers/process_segments/enrichment.py @@ -186,12 +186,12 @@ def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> No ty = breakdown_config.get("type") if ty == "spanOperations": - measurements = _compute_span_ops(spans, breakdown_config) + breakdowns = _compute_span_ops(spans, breakdown_config) else: continue measurements = segment.setdefault("measurements", {}) - for key, value in measurements.items(): + for key, value in breakdowns.items(): measurements[f"{breakdown_name}.{key}"] = value @@ -202,7 +202,7 @@ def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, _MeasurementV intervals_by_op = defaultdict(list) for span in spans: - op = span["op"] + op = span.get("sentry_tags", {}).get("op", "") if operation_name := next(filter(lambda m: op.startswith(m), matches), None): intervals_by_op[operation_name].append(_span_interval(span)) diff --git a/tests/sentry/spans/consumers/process_segments/test_enrichment.py b/tests/sentry/spans/consumers/process_segments/test_enrichment.py index e49e8ad934e3a6..55c1ade28f0a50 100644 --- a/tests/sentry/spans/consumers/process_segments/test_enrichment.py +++ b/tests/sentry/spans/consumers/process_segments/test_enrichment.py @@ -1,4 +1,8 @@ -from sentry.spans.consumers.process_segments.enrichment import set_exclusive_time +from sentry.spans.consumers.process_segments.enrichment import ( + compute_breakdowns, + set_exclusive_time, +) +from sentry.testutils.pytest.fixtures import django_db_all from tests.sentry.spans.consumers.process import build_mock_span # Tests ported from Relay @@ -303,3 +307,78 @@ def test_only_immediate_child_spans_affect_calculation(): "cccccccccccccccc": 400.0, "dddddddddddddddd": 400.0, } + + +@django_db_all +def test_emit_ops_breakdown(default_project): + segment_span = build_mock_span( + project_id=1, + is_segment=True, + start_timestamp_precise=1577836800.0, + end_timestamp_precise=1577858400.01, + span_id="ffffffffffffffff", + ) + + spans = [ + build_mock_span( + project_id=1, + start_timestamp_precise=1577836800.0, # 2020-01-01 00:00:00 + end_timestamp_precise=1577840400.0, # 2020-01-01 01:00:00 + span_id="fa90fdead5f74052", + parent_span_id=segment_span["span_id"], + span_op="http", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577844000.0, # 2020-01-01 02:00:00 + end_timestamp_precise=1577847600.0, # 2020-01-01 03:00:00 + span_id="bbbbbbbbbbbbbbbb", + parent_span_id=segment_span["span_id"], + span_op="db", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577845800.0, # 2020-01-01 02:30:00 + end_timestamp_precise=1577849400.0, # 2020-01-01 03:30:00 + span_id="cccccccccccccccc", + parent_span_id=segment_span["span_id"], + span_op="db.postgres", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577851200.0, # 2020-01-01 04:00:00 + end_timestamp_precise=1577853000.0, # 2020-01-01 04:30:00 + span_id="dddddddddddddddd", + parent_span_id=segment_span["span_id"], + span_op="db.mongo", + ), + build_mock_span( + project_id=1, + start_timestamp_precise=1577854800.0, # 2020-01-01 05:00:00 + end_timestamp_precise=1577858400.01, # 2020-01-01 06:00:00.01 + span_id="eeeeeeeeeeeeeeee", + parent_span_id=segment_span["span_id"], + span_op="browser", + ), + segment_span, + ] + + breakdowns_config = { + "span_ops": {"type": "spanOperations", "matches": ["http", "db"]}, + "span_ops_2": {"type": "spanOperations", "matches": ["http", "db"]}, + } + default_project.update_option("sentry:breakdowns", breakdowns_config) + + # Compute breakdowns for the segment span + compute_breakdowns(segment_span, spans, default_project) + measurements = segment_span["measurements"] + + assert measurements["span_ops.ops.http"]["value"] == 3600000.0 + assert measurements["span_ops.ops.db"]["value"] == 7200000.0 + assert measurements["span_ops_2.ops.http"]["value"] == 3600000.0 + assert measurements["span_ops_2.ops.db"]["value"] == 7200000.0 + + # NOTE: Relay used to extract a total.time breakdown, which is no longer + # included in span breakdowns. + # assert measurements["span_ops.total.time"]["value"] == 14400000.01 + # assert measurements["span_ops_2.total.time"]["value"] == 14400000.01 From 0901942803b3e6392cdd9f4bf283eba194cd42b0 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 23 May 2025 10:47:11 +0200 Subject: [PATCH 4/5] ref: Change imports for upcoming sentry-kafka-schemas release --- .../spans/consumers/process_segments/enrichment.py | 9 +++------ src/sentry/spans/consumers/process_segments/types.py | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/sentry/spans/consumers/process_segments/enrichment.py b/src/sentry/spans/consumers/process_segments/enrichment.py index 5579c49e2c2578..5f82e52de7ec7b 100644 --- a/src/sentry/spans/consumers/process_segments/enrichment.py +++ b/src/sentry/spans/consumers/process_segments/enrichment.py @@ -1,11 +1,8 @@ from collections import defaultdict from typing import Any, cast -# TODO(ja): Fix and update the schema -from sentry_kafka_schemas.schema_types.buffered_segments_v1 import _MeasurementValue - from sentry.models.project import Project -from sentry.spans.consumers.process_segments.types import Span +from sentry.spans.consumers.process_segments.types import MeasurementValue, Span # Keys in `sentry_tags` that are shared across all spans in a segment. This list # is taken from `extract_shared_tags` in Relay. @@ -195,7 +192,7 @@ def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> No measurements[f"{breakdown_name}.{key}"] = value -def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, _MeasurementValue]: +def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, MeasurementValue]: matches = config.get("matches") if not matches: return {} @@ -206,7 +203,7 @@ def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, _MeasurementV if operation_name := next(filter(lambda m: op.startswith(m), matches), None): intervals_by_op[operation_name].append(_span_interval(span)) - measurements: dict[str, _MeasurementValue] = {} + measurements: dict[str, MeasurementValue] = {} for operation_name, intervals in intervals_by_op.items(): duration = _get_duration_us(intervals) measurements[f"ops.{operation_name}"] = {"value": duration / 1000, "unit": "millisecond"} diff --git a/src/sentry/spans/consumers/process_segments/types.py b/src/sentry/spans/consumers/process_segments/types.py index b11e62d8a5c087..45e5b5494396aa 100644 --- a/src/sentry/spans/consumers/process_segments/types.py +++ b/src/sentry/spans/consumers/process_segments/types.py @@ -1,8 +1,10 @@ -from typing import Any, NotRequired +from typing import NotRequired +from sentry_kafka_schemas.schema_types.buffered_segments_v1 import MeasurementValue from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan as UnprocessedSpan __all__ = ( + "MeasurementValue", "Span", "UnprocessedSpan", ) @@ -14,11 +16,6 @@ class Span(UnprocessedSpan, total=True): extracted. """ - # Missing in schema - start_timestamp_precise: float - end_timestamp_precise: float - data: NotRequired[dict[str, Any]] # currently unused - # Added in enrichment exclusive_time: float exclusive_time_ms: float From 60c69b090d34ad2863072c2e54321f3216e2c8da Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 23 May 2025 11:49:08 +0200 Subject: [PATCH 5/5] build: Bump sentry-kafka-schemas --- requirements-base.txt | 2 +- requirements-dev-frozen.txt | 2 +- requirements-frozen.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements-base.txt b/requirements-base.txt index 6bb4d126c14716..c65394497517ca 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -65,7 +65,7 @@ rfc3339-validator>=0.1.2 rfc3986-validator>=0.1.1 # [end] jsonschema format validators sentry-arroyo>=2.21.0 -sentry-kafka-schemas>=1.3.2 +sentry-kafka-schemas>=1.3.6 sentry-ophio>=1.1.3 sentry-protos==0.2.0 sentry-redis-tools>=0.5.0 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index 9fa3f791811105..50aa1b84b2b29e 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -185,7 +185,7 @@ sentry-devenv==1.21.0 sentry-forked-django-stubs==5.2.0.post3 sentry-forked-djangorestframework-stubs==3.16.0.post1 sentry-forked-email-reply-parser==0.5.12.post1 -sentry-kafka-schemas==1.3.2 +sentry-kafka-schemas==1.3.6 sentry-ophio==1.1.3 sentry-protos==0.2.0 sentry-redis-tools==0.5.0 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index 7bb8144d55aa12..0e28dbe7cb77c1 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -123,7 +123,7 @@ rsa==4.8 s3transfer==0.10.0 sentry-arroyo==2.21.0 sentry-forked-email-reply-parser==0.5.12.post1 -sentry-kafka-schemas==1.3.2 +sentry-kafka-schemas==1.3.6 sentry-ophio==1.1.3 sentry-protos==0.2.0 sentry-redis-tools==0.5.0