Skip to content

Commit bcb51c6

Browse files
colin-sentryandrewshie-sentry
authored andcommitted
feat(ourlogs): Allow graphing ourlogs with the timeseries APIs (#89306)
We want to graph logs in the logs page (amongst other places) - add a count() aggregate and a shim to make it work with events-stats and events-timeseries endpoints.
1 parent da2b91e commit bcb51c6

File tree

7 files changed

+247
-49
lines changed

7 files changed

+247
-49
lines changed

src/sentry/api/endpoints/organization_events_stats.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,12 @@ def get(self, request: Request, organization: Organization) -> Response:
280280
return Response({"detail": f"Metric type must be one of: {metric_types}"}, status=400)
281281

282282
force_metrics_layer = request.GET.get("forceMetricsLayer") == "true"
283-
use_rpc = request.GET.get("useRpc", "0") == "1" and dataset == spans_eap
283+
use_rpc = (
284+
request.GET.get("useRpc", "0") == "1" and dataset == spans_eap
285+
) or dataset == ourlogs
284286
sampling_mode = request.GET.get("sampling")
285287
transform_alias_to_input_format = (
286-
request.GET.get("transformAliasToInputFormat") == "1" or use_rpc or dataset == ourlogs
288+
request.GET.get("transformAliasToInputFormat") == "1" or use_rpc
287289
)
288290
sentry_sdk.set_tag("performance.use_rpc", use_rpc)
289291

@@ -298,6 +300,8 @@ def _get_event_stats(
298300
) -> SnubaTSResult | dict[str, SnubaTSResult]:
299301
if top_events > 0:
300302
if use_rpc:
303+
if scoped_dataset == ourlogs:
304+
raise NotImplementedError("You can not use top_events with logs for now.")
301305
return spans_rpc.run_top_events_timeseries_query(
302306
params=snuba_params,
303307
query_string=query,
@@ -338,7 +342,9 @@ def _get_event_stats(
338342
)
339343

340344
if use_rpc:
341-
return spans_rpc.run_timeseries_query(
345+
if scoped_dataset == spans_eap:
346+
scoped_dataset = spans_rpc
347+
return scoped_dataset.run_timeseries_query(
342348
params=snuba_params,
343349
query_string=query,
344350
y_axes=query_columns,

src/sentry/api/endpoints/organization_events_timeseries.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,10 @@ def get_event_stats(
272272
),
273273
)
274274

275-
if dataset == spans_eap:
276-
return spans_rpc.run_timeseries_query(
275+
if dataset == spans_eap or dataset == ourlogs:
276+
if dataset == spans_eap:
277+
dataset = spans_rpc
278+
return dataset.run_timeseries_query(
277279
params=snuba_params,
278280
query_string=query,
279281
y_axes=query_columns,

src/sentry/search/eap/ourlogs/definitions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
OURLOG_ATTRIBUTE_DEFINITIONS,
66
OURLOG_VIRTUAL_CONTEXTS,
77
)
8+
from sentry.search.eap.spans.aggregates import LOG_AGGREGATE_DEFINITIONS
89

910
OURLOG_DEFINITIONS = ColumnDefinitions(
10-
aggregates={},
11+
aggregates=LOG_AGGREGATE_DEFINITIONS,
1112
conditional_aggregates={},
1213
formulas={},
1314
columns=OURLOG_ATTRIBUTE_DEFINITIONS,

src/sentry/search/eap/spans/aggregates.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,3 +476,20 @@ def resolve_bounded_sample(args: ResolvedArguments) -> tuple[AttributeKey, Trace
476476
attribute_resolver=transform_vital_score_to_ratio,
477477
),
478478
}
479+
480+
LOG_AGGREGATE_DEFINITIONS = {
481+
"count": AggregateDefinition(
482+
internal_function=Function.FUNCTION_COUNT,
483+
infer_search_type_from_arguments=False,
484+
processor=count_processor,
485+
default_search_type="integer",
486+
arguments=[
487+
AttributeArgumentDefinition(
488+
attribute_types={
489+
"string",
490+
},
491+
default_arg="log.body",
492+
)
493+
],
494+
),
495+
}

src/sentry/snuba/ourlogs.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import logging
2+
from datetime import timedelta
23

4+
from sentry_sdk import trace
35
from snuba_sdk import Column, Condition
46

57
from sentry.search.eap.ourlogs.definitions import OURLOG_DEFINITIONS
68
from sentry.search.eap.resolver import SearchResolver
79
from sentry.search.eap.types import SearchResolverConfig
8-
from sentry.search.events.types import EventsResponse, SnubaParams
10+
from sentry.search.eap.utils import handle_downsample_meta
11+
from sentry.search.events.types import EventsMeta, EventsResponse, SnubaParams
12+
from sentry.snuba import rpc_dataset_common
913
from sentry.snuba.dataset import Dataset
14+
from sentry.snuba.discover import zerofill
1015
from sentry.snuba.metrics.extraction import MetricSpecType
1116
from sentry.snuba.query_sources import QuerySource
1217
from sentry.snuba.rpc_dataset_common import run_table_query
18+
from sentry.utils import snuba_rpc
19+
from sentry.utils.snuba import SnubaTSResult
1320

1421
logger = logging.getLogger("sentry.snuba.ourlogs")
1522

@@ -77,3 +84,56 @@ def query(
7784
),
7885
debug=debug,
7986
)
87+
88+
89+
@trace
90+
def run_timeseries_query(
91+
params: SnubaParams,
92+
query_string: str,
93+
y_axes: list[str],
94+
referrer: str,
95+
config: SearchResolverConfig,
96+
sampling_mode: str | None,
97+
comparison_delta: timedelta | None = None,
98+
) -> SnubaTSResult:
99+
rpc_dataset_common.validate_granularity(params)
100+
search_resolver = get_resolver(params, config)
101+
rpc_request, aggregates, groupbys = rpc_dataset_common.get_timeseries_query(
102+
search_resolver, params, query_string, y_axes, [], referrer, sampling_mode=None
103+
)
104+
105+
"""Run the query"""
106+
rpc_response = snuba_rpc.timeseries_rpc([rpc_request])[0]
107+
108+
"""Process the results"""
109+
result = rpc_dataset_common.ProcessedTimeseries()
110+
final_meta: EventsMeta = EventsMeta(
111+
fields={}, full_scan=handle_downsample_meta(rpc_response.meta.downsampled_storage_meta)
112+
)
113+
for resolved_field in aggregates + groupbys:
114+
final_meta["fields"][resolved_field.public_alias] = resolved_field.search_type
115+
116+
for timeseries in rpc_response.result_timeseries:
117+
processed = rpc_dataset_common.process_timeseries_list([timeseries])
118+
if len(result.timeseries) == 0:
119+
result = processed
120+
else:
121+
for attr in ["timeseries", "confidence", "sample_count", "sampling_rate"]:
122+
for existing, new in zip(getattr(result, attr), getattr(processed, attr)):
123+
existing.update(new)
124+
if len(result.timeseries) == 0:
125+
# The rpc only zerofills for us when there are results, if there aren't any we have to do it ourselves
126+
result.timeseries = zerofill(
127+
[],
128+
params.start_date,
129+
params.end_date,
130+
params.timeseries_granularity_secs,
131+
["time"],
132+
)
133+
134+
return SnubaTSResult(
135+
{"data": result.timeseries, "processed_timeseries": result, "meta": final_meta},
136+
params.start,
137+
params.end,
138+
params.granularity_secs,
139+
)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from datetime import timedelta
2+
3+
from django.urls import reverse
4+
5+
from sentry.testutils.helpers.datetime import before_now
6+
from tests.snuba.api.endpoints.test_organization_events import OrganizationEventsEndpointTestBase
7+
from tests.snuba.api.endpoints.test_organization_events_timeseries_spans import (
8+
AnyConfidence,
9+
build_expected_timeseries,
10+
)
11+
12+
any_confidence = AnyConfidence()
13+
14+
15+
class OrganizationEventsStatsOurlogsMetricsEndpointTest(OrganizationEventsEndpointTestBase):
16+
endpoint = "sentry-api-0-organization-events-timeseries"
17+
18+
def setUp(self):
19+
super().setUp()
20+
self.login_as(user=self.user)
21+
self.start = self.day_ago = before_now(days=1).replace(
22+
hour=10, minute=0, second=0, microsecond=0
23+
)
24+
self.end = self.start + timedelta(hours=6)
25+
self.two_days_ago = self.day_ago - timedelta(days=1)
26+
27+
self.url = reverse(
28+
self.endpoint,
29+
kwargs={"organization_id_or_slug": self.project.organization.slug},
30+
)
31+
32+
def _do_request(self, data, url=None, features=None):
33+
if features is None:
34+
features = {"organizations:ourlogs": True}
35+
features.update(self.features)
36+
with self.feature(features):
37+
return self.client.get(self.url if url is None else url, data=data, format="json")
38+
39+
def test_count(self):
40+
event_counts = [6, 0, 6, 3, 0, 3]
41+
logs = []
42+
for hour, count in enumerate(event_counts):
43+
logs.extend(
44+
[
45+
self.create_ourlog(
46+
{"body": "foo"},
47+
timestamp=self.start + timedelta(hours=hour, minutes=minute),
48+
attributes={"status": {"string_value": "success"}},
49+
)
50+
for minute in range(count)
51+
],
52+
)
53+
self.store_ourlogs(logs)
54+
55+
response = self._do_request(
56+
data={
57+
"start": self.start,
58+
"end": self.end,
59+
"interval": "1h",
60+
"yAxis": "count()",
61+
"project": self.project.id,
62+
"dataset": "ourlogs",
63+
},
64+
)
65+
assert response.status_code == 200, response.content
66+
assert response.data["meta"] == {
67+
"dataset": "ourlogs",
68+
"start": self.start.timestamp() * 1000,
69+
"end": self.end.timestamp() * 1000,
70+
}
71+
assert len(response.data["timeseries"]) == 1
72+
timeseries = response.data["timeseries"][0]
73+
assert len(timeseries["values"]) == 6
74+
assert timeseries["yaxis"] == "count()"
75+
assert timeseries["values"] == build_expected_timeseries(
76+
self.start,
77+
3_600_000,
78+
event_counts,
79+
sample_count=event_counts,
80+
sample_rate=[1 if val else 0 for val in event_counts],
81+
confidence=[any_confidence if val else None for val in event_counts],
82+
)
83+
assert timeseries["meta"] == {
84+
"valueType": "integer",
85+
"interval": 3_600_000,
86+
}
87+
88+
def test_zerofill(self):
89+
response = self._do_request(
90+
data={
91+
"start": self.start,
92+
"end": self.end,
93+
"interval": "1h",
94+
"yAxis": "count()",
95+
"project": self.project.id,
96+
"dataset": "ourlogs",
97+
},
98+
)
99+
assert response.status_code == 200, response.content
100+
assert response.data["meta"] == {
101+
"dataset": "ourlogs",
102+
"start": self.start.timestamp() * 1000,
103+
"end": self.end.timestamp() * 1000,
104+
}
105+
assert len(response.data["timeseries"]) == 1
106+
timeseries = response.data["timeseries"][0]
107+
assert len(timeseries["values"]) == 7
108+
assert timeseries["values"] == build_expected_timeseries(
109+
self.start,
110+
3_600_000,
111+
[0] * 7,
112+
)

0 commit comments

Comments
 (0)