Skip to content

Commit dd9db21

Browse files
committed
Explicit a scenario where the scheduling yields unbalanced solutions
1 parent dc54c8e commit dd9db21

File tree

3 files changed

+34
-15
lines changed

3 files changed

+34
-15
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ fn add_shard_to_indexer(
576576
}
577577
}
578578

579-
// If the total node capacities is lower than 110% of the problem load, this
579+
// If the total node capacities is lower than 120% of the problem load, this
580580
// function scales the load of the indexer to reach this limit.
581581
fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
582582
// First we scale the problem to the point where any indexer can fit the largest shard.

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::collections::btree_map::Entry;
1818

1919
use itertools::Itertools;
2020
use quickwit_proto::indexing::CpuCapacity;
21-
use tracing::warn;
2221

2322
use super::scheduling_logic_model::*;
2423
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
@@ -41,7 +40,7 @@ pub fn solve(
4140
previous_solution: SchedulingSolution,
4241
) -> SchedulingSolution {
4342
// We first inflate the indexer capacities to make sure they globally
44-
// have at least 110% of the total problem load. This is done proportionally
43+
// have at least 120% of the total problem load. This is done proportionally
4544
// to their original capacity.
4645
inflate_node_capacities_if_necessary(&mut problem);
4746
// As a heuristic, to offer stability, we work iteratively
@@ -294,21 +293,23 @@ fn place_unassigned_shards_ignoring_affinity(
294293
Reverse(load)
295294
});
296295

297-
// Thanks to the call to `inflate_node_capacities_if_necessary`,
298-
// we are certain that even on our first attempt, the total capacity of the indexer exceeds 120%
299-
// of the partial solution.
296+
// Thanks to the call to `inflate_node_capacities_if_necessary`, we are
297+
// certain that even on our first attempt, the total capacity of the indexer
298+
// exceeds 120% of the partial solution. If a large shard needs to be placed
299+
// in an already well balanced solution, it may not fit on any node. In that
300+
// case, we iteratively grow the virtual capacity until it can be placed.
300301
//
301-
// 1.2^30 is about 240.
302-
// If we reach 30 attempts we are certain to have a logical bug.
302+
// 1.2^30 is about 240. If we reach 30 attempts we are certain to have a
303+
// logical bug.
303304
for attempt_number in 0..30 {
304305
match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, partial_solution) {
305-
Ok(solution) => {
306-
if attempt_number != 0 {
307-
warn!(
308-
attempt_number = attempt_number,
309-
"required to scale node capacity"
310-
);
311-
}
306+
Ok(mut solution) => {
307+
// the higher the attempt number, the more unbalanced the solution
308+
tracing::warn!(
309+
attempt_number = attempt_number,
310+
"capacity re-scaled, scheduling solution likely unbalanced"
311+
);
312+
solution.capacity_scaling_iterations = attempt_number;
312313
return solution;
313314
}
314315
Err(NotEnoughCapacity) => {
@@ -783,4 +784,19 @@ mod tests {
783784
solve(problem, solution);
784785
}
785786
}
787+
788+
#[test]
789+
fn test_capacity_scaling_iteration_required() {
790+
// Create a problem where affinity constraints cause suboptimal placement
791+
// requiring iterative scaling despite initial capacity scaling.
792+
let mut problem =
793+
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]);
794+
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 0
795+
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 1
796+
problem.add_source(1, NonZeroU32::new(1500).unwrap()); // Source 2
797+
let previous_solution = problem.new_solution();
798+
let solution = solve(problem, previous_solution);
799+
800+
assert_eq!(solution.capacity_scaling_iterations, 1);
801+
}
786802
}

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,15 @@ impl IndexerAssignment {
232232
#[derive(Clone, Debug, Eq, PartialEq)]
233233
pub struct SchedulingSolution {
234234
pub indexer_assignments: Vec<IndexerAssignment>,
235+
// used for tests
236+
pub capacity_scaling_iterations: usize,
235237
}
236238

237239
impl SchedulingSolution {
238240
pub fn with_num_indexers(num_indexers: usize) -> SchedulingSolution {
239241
SchedulingSolution {
240242
indexer_assignments: (0..num_indexers).map(IndexerAssignment::new).collect(),
243+
capacity_scaling_iterations: 0,
241244
}
242245
}
243246
pub fn num_indexers(&self) -> usize {

0 commit comments

Comments
 (0)