Skip to content

Commit e8bf824

Browse files
authored
Merge branch 'dev' into tninesling/schema-upgrader-tests
2 parents b0a91c9 + 69cd568 commit e8bf824

File tree

10 files changed

+349
-51
lines changed

10 files changed

+349
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
### Add compute pool metrics ([PR #7184](https://github.com/apollographql/router/pull/7184))
2+
3+
The compute job pool is used within the router for compute intensive jobs that should not block the Tokio worker threads.
4+
When this pool becomes saturated it is difficult for users to see why so that they can take action.
5+
This change adds new metrics to help users understand how long jobs are waiting to be processed.
6+
7+
New metrics:
8+
- `apollo.router.compute_jobs.queue_is_full` - A counter of requests rejected because the queue was full.
9+
- `apollo.router.compute_jobs.duration` - A histogram of time spent in the compute pipeline by the job, including the queue and query planning.
10+
- `job.type`: (`QueryPlanning`, `QueryParsing`, `Introspection`)
11+
- `job.outcome`: (`ExecutedOk`, `ExecutedError`, `ChannelError`, `RejectedQueueFull`, `Abandoned`)
12+
- `apollo.router.compute_jobs.queue.wait.duration` - A histogram of time spent in the compute queue by the job.
13+
- `job.type`: (`QueryPlanning`, `QueryParsing`, `Introspection`)
14+
- `apollo.router.compute_jobs.execution.duration` - A histogram of time spent to execute job (excludes time spent in the queue).
15+
- `job.type`: (`QueryPlanning`, `QueryParsing`, `Introspection`)
16+
- `apollo.router.compute_jobs.active_jobs` - A gauge of the number of compute jobs being processed in parallel.
17+
- `job.type`: (`QueryPlanning`, `QueryParsing`, `Introspection`)
18+
19+
By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7184

Cargo.lock

+13-13
Original file line numberDiff line numberDiff line change
@@ -1351,7 +1351,7 @@ dependencies = [
13511351
"bitflags 2.6.0",
13521352
"cexpr",
13531353
"clang-sys",
1354-
"itertools 0.10.5",
1354+
"itertools 0.11.0",
13551355
"lazy_static",
13561356
"lazycell",
13571357
"log",
@@ -1946,9 +1946,9 @@ dependencies = [
19461946

19471947
[[package]]
19481948
name = "crossbeam-channel"
1949-
version = "0.5.14"
1949+
version = "0.5.15"
19501950
source = "registry+https://github.com/rust-lang/crates.io-index"
1951-
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
1951+
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
19521952
dependencies = [
19531953
"crossbeam-utils",
19541954
]
@@ -2382,7 +2382,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
23822382
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
23832383
dependencies = [
23842384
"libc",
2385-
"windows-sys 0.52.0",
2385+
"windows-sys 0.59.0",
23862386
]
23872387

23882388
[[package]]
@@ -3993,7 +3993,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
39933993
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
39943994
dependencies = [
39953995
"cfg-if",
3996-
"windows-targets 0.48.5",
3996+
"windows-targets 0.52.6",
39973997
]
39983998

39993999
[[package]]
@@ -5149,7 +5149,7 @@ version = "0.13.4"
51495149
source = "registry+https://github.com/rust-lang/crates.io-index"
51505150
checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b"
51515151
dependencies = [
5152-
"heck 0.4.1",
5152+
"heck 0.5.0",
51535153
"itertools 0.13.0",
51545154
"log",
51555155
"multimap 0.10.0",
@@ -5170,7 +5170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
51705170
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
51715171
dependencies = [
51725172
"anyhow",
5173-
"itertools 0.10.5",
5173+
"itertools 0.11.0",
51745174
"proc-macro2",
51755175
"quote",
51765176
"syn 2.0.90",
@@ -5291,7 +5291,7 @@ dependencies = [
52915291
"once_cell",
52925292
"socket2",
52935293
"tracing",
5294-
"windows-sys 0.52.0",
5294+
"windows-sys 0.59.0",
52955295
]
52965296

52975297
[[package]]
@@ -5912,7 +5912,7 @@ dependencies = [
59125912
"errno",
59135913
"libc",
59145914
"linux-raw-sys 0.4.14",
5915-
"windows-sys 0.52.0",
5915+
"windows-sys 0.59.0",
59165916
]
59175917

59185918
[[package]]
@@ -5925,7 +5925,7 @@ dependencies = [
59255925
"errno",
59265926
"libc",
59275927
"linux-raw-sys 0.9.3",
5928-
"windows-sys 0.52.0",
5928+
"windows-sys 0.59.0",
59295929
]
59305930

59315931
[[package]]
@@ -6474,7 +6474,7 @@ dependencies = [
64746474
"cfg-if",
64756475
"libc",
64766476
"psm",
6477-
"windows-sys 0.52.0",
6477+
"windows-sys 0.59.0",
64786478
]
64796479

64806480
[[package]]
@@ -6666,7 +6666,7 @@ dependencies = [
66666666
"getrandom 0.3.1",
66676667
"once_cell",
66686668
"rustix 1.0.3",
6669-
"windows-sys 0.52.0",
6669+
"windows-sys 0.59.0",
66706670
]
66716671

66726672
[[package]]
@@ -7739,7 +7739,7 @@ version = "0.1.9"
77397739
source = "registry+https://github.com/rust-lang/crates.io-index"
77407740
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
77417741
dependencies = [
7742-
"windows-sys 0.48.0",
7742+
"windows-sys 0.59.0",
77437743
]
77447744

77457745
[[package]]

apollo-router/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ clap = { version = "4.5.8", default-features = false, features = [
8181
"help",
8282
] }
8383
cookie = { version = "0.18.0", default-features = false }
84-
crossbeam-channel = "0.5.14"
84+
crossbeam-channel = "0.5.15"
8585
ci_info = { version = "0.14.14", features = ["serde-1"] }
8686
dashmap = { version = "5.5.3", features = ["serde"] }
8787
derivative = "2.2.0"
+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use std::time::Duration;
2+
use std::time::Instant;
3+
4+
use crate::compute_job::ComputeJobType;
5+
6+
#[derive(Copy, Clone, strum_macros::IntoStaticStr)]
7+
pub(super) enum Outcome {
8+
ExecutedOk,
9+
ExecutedError,
10+
ChannelError,
11+
RejectedQueueFull,
12+
Abandoned,
13+
}
14+
15+
impl From<Outcome> for opentelemetry::Value {
16+
fn from(outcome: Outcome) -> Self {
17+
let s: &'static str = outcome.into();
18+
s.into()
19+
}
20+
}
21+
22+
pub(super) struct JobWatcher {
23+
queue_start: Instant,
24+
compute_job_type: ComputeJobType,
25+
pub(super) outcome: Outcome,
26+
}
27+
28+
impl JobWatcher {
29+
pub(super) fn new(compute_job_type: ComputeJobType) -> Self {
30+
Self {
31+
queue_start: Instant::now(),
32+
outcome: Outcome::Abandoned,
33+
compute_job_type,
34+
}
35+
}
36+
}
37+
38+
impl Drop for JobWatcher {
39+
fn drop(&mut self) {
40+
let full_duration = self.queue_start.elapsed();
41+
f64_histogram_with_unit!(
42+
"apollo.router.compute_jobs.duration",
43+
"Total job processing time",
44+
"s",
45+
full_duration.as_secs_f64(),
46+
"job.type" = self.compute_job_type,
47+
"job.outcome" = self.outcome
48+
);
49+
}
50+
}
51+
52+
pub(super) struct ActiveComputeMetric {
53+
compute_job_type: ComputeJobType,
54+
}
55+
56+
impl ActiveComputeMetric {
57+
// create metric (auto-increments and decrements)
58+
pub(super) fn register(compute_job_type: ComputeJobType) -> Self {
59+
let s = Self { compute_job_type };
60+
s.incr(1);
61+
s
62+
}
63+
64+
fn incr(&self, value: i64) {
65+
i64_up_down_counter_with_unit!(
66+
"apollo.router.compute_jobs.active_jobs",
67+
"Number of computation jobs in progress",
68+
"{job}",
69+
value,
70+
job.type = self.compute_job_type
71+
);
72+
}
73+
}
74+
75+
impl Drop for ActiveComputeMetric {
76+
fn drop(&mut self) {
77+
self.incr(-1);
78+
}
79+
}
80+
81+
pub(super) fn observe_queue_wait_duration(
82+
compute_job_type: ComputeJobType,
83+
queue_duration: Duration,
84+
) {
85+
f64_histogram_with_unit!(
86+
"apollo.router.compute_jobs.queue.wait.duration",
87+
"Time spent by the job in the compute queue",
88+
"s",
89+
queue_duration.as_secs_f64(),
90+
"job.type" = compute_job_type
91+
);
92+
}
93+
94+
pub(super) fn observe_compute_duration(compute_job_type: ComputeJobType, job_duration: Duration) {
95+
f64_histogram_with_unit!(
96+
"apollo.router.compute_jobs.execution.duration",
97+
"Time to execute the job, after it has been pulled from the queue",
98+
"s",
99+
job_duration.as_secs_f64(),
100+
"job.type" = compute_job_type
101+
);
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use crate::compute_job::ComputeJobType;
107+
use crate::compute_job::metrics::ActiveComputeMetric;
108+
use crate::compute_job::metrics::JobWatcher;
109+
use crate::compute_job::metrics::Outcome;
110+
111+
#[test]
112+
fn test_job_watcher() {
113+
let check_histogram_count =
114+
|count: u64, job_type: &'static str, job_outcome: &'static str| {
115+
assert_histogram_count!(
116+
"apollo.router.compute_jobs.duration",
117+
count,
118+
"job.type" = job_type,
119+
"job.outcome" = job_outcome
120+
);
121+
};
122+
123+
{
124+
let _job_watcher = JobWatcher::new(ComputeJobType::Introspection);
125+
}
126+
check_histogram_count(1, "Introspection", Outcome::Abandoned.into());
127+
128+
{
129+
let mut job_watcher = JobWatcher::new(ComputeJobType::QueryParsing);
130+
job_watcher.outcome = Outcome::ExecutedOk;
131+
}
132+
check_histogram_count(1, "QueryParsing", Outcome::ExecutedOk.into());
133+
134+
for count in 1..5 {
135+
{
136+
let mut job_watcher = JobWatcher::new(ComputeJobType::QueryPlanning);
137+
job_watcher.outcome = Outcome::RejectedQueueFull;
138+
}
139+
check_histogram_count(count, "QueryPlanning", Outcome::RejectedQueueFull.into());
140+
}
141+
}
142+
143+
#[test]
144+
fn test_active_compute_metric() {
145+
let check_count = |count: i64, job_type: &'static str| {
146+
assert_up_down_counter!(
147+
"apollo.router.compute_jobs.active_jobs",
148+
count,
149+
"job.type" = job_type
150+
);
151+
};
152+
153+
{
154+
let _introspection_1 = ActiveComputeMetric::register(ComputeJobType::Introspection);
155+
let _introspection_2 = ActiveComputeMetric::register(ComputeJobType::Introspection);
156+
let introspection_3 = ActiveComputeMetric::register(ComputeJobType::Introspection);
157+
check_count(3, "Introspection");
158+
159+
let _planning_1 = ActiveComputeMetric::register(ComputeJobType::QueryPlanning);
160+
check_count(3, "Introspection");
161+
check_count(1, "QueryPlanning");
162+
163+
drop(introspection_3);
164+
check_count(2, "Introspection");
165+
check_count(1, "QueryPlanning");
166+
}
167+
168+
// block ended, so should have no ongoing computation
169+
check_count(0, "Introspection");
170+
check_count(0, "QueryPlanning");
171+
}
172+
}

0 commit comments

Comments
 (0)