Skip to content

Test scheduling solution stability #5793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ fn assert_post_condition_physical_plan_match_solution(
assert_eq!(num_indexers, id_to_ord_map.indexer_ids.len());
let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers);
convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution);
assert_eq!(solution, &reconstructed_solution);
assert_eq!(
solution.indexer_assignments,
reconstructed_solution.indexer_assignments
);
}

fn add_shard_to_indexer(
Expand Down Expand Up @@ -576,7 +579,7 @@ fn add_shard_to_indexer(
}
}

// If the total node capacities is lower than 110% of the problem load, this
// If the total node capacities is lower than 120% of the problem load, this
// function scales the load of the indexer to reach this limit.
fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
// First we scale the problem to the point where any indexer can fit the largest shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::btree_map::Entry;

use itertools::Itertools;
use quickwit_proto::indexing::CpuCapacity;
use tracing::warn;

use super::scheduling_logic_model::*;
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
Expand All @@ -41,7 +40,7 @@ pub fn solve(
previous_solution: SchedulingSolution,
) -> SchedulingSolution {
// We first inflate the indexer capacities to make sure they globally
// have at least 110% of the total problem load. This is done proportionally
// have at least 120% of the total problem load. This is done proportionally
// to their original capacity.
inflate_node_capacities_if_necessary(&mut problem);
// As a heuristic, to offer stability, we work iteratively
Expand Down Expand Up @@ -209,6 +208,27 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
);
}

// ----------------------------------------------------
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to the PR, just moved this up because it was wrongly placed.

// Phase 3
// Place unassigned sources.
//
// We use a greedy algorithm as a simple heuristic here.
//
// We go through the sources in decreasing order of their load,
// in two passes.
//
// In the first pass, we have a look at
// the nodes with which there is an affinity.
//
// If one of them has room for all of the shards, then we assign all
// of the shards to it.
//
// In the second pass, we just put as many shards as possible on the node
// with the highest available capacity.
//
// If this algorithm fails to place all remaining shards, we inflate
// the node capacities by 20% in the scheduling problem and start from the beginning.

fn attempt_place_unassigned_shards(
unassigned_shards: &[Source],
problem: &SchedulingProblem,
Expand Down Expand Up @@ -263,26 +283,6 @@ fn place_unassigned_shards_with_affinity(
}
}

// ----------------------------------------------------
// Phase 3
// Place unassigned sources.
//
// We use a greedy algorithm as a simple heuristic here.
//
// We go through the sources in decreasing order of their load,
// in two passes.
//
// In the first pass, we have a look at
// the nodes with which there is an affinity.
//
// If one of them has room for all of the shards, then we assign all
// of the shards to it.
//
// In the second pass, we just put as many shards as possible on the node
// with the highest available capacity.
//
// If this algorithm fails to place all remaining shards, we inflate
// the node capacities by 20% in the scheduling problem and start from the beginning.
#[must_use]
fn place_unassigned_shards_ignoring_affinity(
mut problem: SchedulingProblem,
Expand All @@ -294,21 +294,23 @@ fn place_unassigned_shards_ignoring_affinity(
Reverse(load)
});

// Thanks to the call to `inflate_node_capacities_if_necessary`,
// we are certain that even on our first attempt, the total capacity of the indexer exceeds 120%
// of the partial solution.
// Thanks to the call to `inflate_node_capacities_if_necessary`, we are
// certain that even on our first attempt, the total capacity of the indexer
// exceeds 120% of the partial solution. If a large shard needs to be placed
// in an already well balanced solution, it may not fit on any node. In that
// case, we iteratively grow the virtual capacity until it can be placed.
//
// 1.2^30 is about 240.
// If we reach 30 attempts we are certain to have a logical bug.
// 1.2^30 is about 240. If we reach 30 attempts we are certain to have a
// logical bug.
for attempt_number in 0..30 {
match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, partial_solution) {
Ok(solution) => {
if attempt_number != 0 {
warn!(
attempt_number = attempt_number,
"required to scale node capacity"
);
}
Ok(mut solution) => {
// the higher the attempt number, the more unbalanced the solution
tracing::warn!(
attempt_number = attempt_number,
"capacity re-scaled, scheduling solution likely unbalanced"
);
solution.capacity_scaling_iterations = attempt_number;
return solution;
}
Err(NotEnoughCapacity) => {
Expand Down Expand Up @@ -357,10 +359,6 @@ fn place_unassigned_shards_single_source(
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
let num_shards_to_place = num_placable_shards.min(num_shards);
// Update the solution, the shard load, and the number of shards to place.
if num_shards_to_place == 0u32 {
// No need to fill indexer_assignments with empty assignments.
continue;
}
solution.indexer_assignments[indexer_ord]
.add_shards(source.source_ord, num_shards_to_place);
num_shards -= num_shards_to_place;
Expand Down Expand Up @@ -780,7 +778,31 @@ mod tests {
proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
solve(problem, solution);
let solution_1 = solve(problem.clone(), solution);
let solution_2 = solve(problem.clone(), solution_1.clone());
// TODO: This assert actually fails for some scenarii. We say it is fine
// for now as long as the solution does not change again during the
// next resolution:
// let has_solution_changed_once = solution_1.indexer_assignments != solution_2.indexer_assignments;
// assert!(!has_solution_changed_once, "Solution changed for same problem\nSolution 1:{solution_1:?}\nSolution 2: {solution_2:?}");
let solution_3 = solve(problem, solution_2.clone());
let has_solution_changed_again = solution_2.indexer_assignments != solution_3.indexer_assignments;
assert!(!has_solution_changed_again, "solution unstable!!!\nSolution 1: {solution_1:?}\nSolution 2: {solution_2:?}\nSolution 3: {solution_3:?}");
}
}

#[test]
fn test_capacity_scaling_iteration_required() {
// Create a problem where affinity constraints cause suboptimal placement
// requiring iterative scaling despite initial capacity scaling.
let mut problem =
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]);
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 0
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 1
problem.add_source(1, NonZeroU32::new(1500).unwrap()); // Source 2
let previous_solution = problem.new_solution();
let solution = solve(problem, previous_solution);

assert_eq!(solution.capacity_scaling_iterations, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ impl IndexerAssignment {
.unwrap_or(0u32)
}

/// Add shards to a source (noop of `num_shards` is 0).
pub fn add_shards(&mut self, source_ord: u32, num_shards: u32) {
// No need to fill indexer_assignments with empty assignments.
if num_shards == 0 {
return;
}
Comment on lines +214 to +217
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't change the logic, just ensures a more consistent representation of the state so that we can compare it better in tests.

*self.num_shards_per_source.entry(source_ord).or_default() += num_shards;
}

Expand All @@ -229,15 +234,18 @@ impl IndexerAssignment {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub struct SchedulingSolution {
pub indexer_assignments: Vec<IndexerAssignment>,
// used for tests
pub capacity_scaling_iterations: usize,
}

impl SchedulingSolution {
pub fn with_num_indexers(num_indexers: usize) -> SchedulingSolution {
SchedulingSolution {
indexer_assignments: (0..num_indexers).map(IndexerAssignment::new).collect(),
capacity_scaling_iterations: 0,
}
}
pub fn num_indexers(&self) -> usize {
Expand Down