Skip to content

Commit 64ed787

Browse files
authored
feat: disable task stage plan binary cache (#1266)
- in some cases stage plan should not be cached, as task plan may change, as there is no easy way to invalidate the cache, it can be optionally disabled
1 parent 1badd1b commit 64ed787

File tree

2 files changed

+49
-35
lines changed

2 files changed

+49
-35
lines changed

ballista/scheduler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ required-features = ["build-binary"]
3737
[features]
3838
build-binary = ["configure_me", "clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"]
3939
default = ["build-binary"]
40+
# job info can cache stage plans, in some cases where
41+
# task plans can be re-computed, cache behavior may need to be disabled.
42+
disable-stage-plan-cache = []
4043
graphviz-support = ["dep:graphviz-rust"]
4144
keda-scaler = []
4245
prometheus-metrics = ["prometheus", "once_cell"]

ballista/scheduler/src/state/task_manager.rs

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ use dashmap::DashMap;
3939

4040
use datafusion::physical_plan::ExecutionPlan;
4141
use datafusion_proto::logical_plan::AsLogicalPlan;
42-
use datafusion_proto::physical_plan::AsExecutionPlan;
42+
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
43+
use datafusion_proto::protobuf::PhysicalPlanNode;
4344
use log::{debug, error, info, trace, warn};
4445
use rand::{rng, Rng};
4546
use std::collections::{HashMap, HashSet};
@@ -125,6 +126,7 @@ pub struct JobInfoCache {
125126
pub execution_graph: Arc<RwLock<ExecutionGraph>>,
126127
// Cache for job status
127128
pub status: Option<job_status::Status>,
129+
#[cfg(not(feature = "disable-stage-plan-cache"))]
128130
// Cache for encoded execution stage plan to avoid duplicated encoding for multiple tasks
129131
encoded_stage_plans: HashMap<usize, Vec<u8>>,
130132
}
@@ -135,9 +137,42 @@ impl JobInfoCache {
135137
Self {
136138
execution_graph: Arc::new(RwLock::new(graph)),
137139
status,
140+
#[cfg(not(feature = "disable-stage-plan-cache"))]
138141
encoded_stage_plans: HashMap::new(),
139142
}
140143
}
144+
#[cfg(not(feature = "disable-stage-plan-cache"))]
145+
fn encode_stage_plan<U: AsExecutionPlan>(
146+
&mut self,
147+
stage_id: usize,
148+
plan: &Arc<dyn ExecutionPlan>,
149+
codec: &dyn PhysicalExtensionCodec,
150+
) -> Result<Vec<u8>> {
151+
if let Some(plan) = self.encoded_stage_plans.get(&stage_id) {
152+
Ok(plan.clone())
153+
} else {
154+
let mut plan_buf: Vec<u8> = vec![];
155+
let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?;
156+
plan_proto.try_encode(&mut plan_buf)?;
157+
self.encoded_stage_plans.insert(stage_id, plan_buf.clone());
158+
159+
Ok(plan_buf)
160+
}
161+
}
162+
163+
#[cfg(feature = "disable-stage-plan-cache")]
164+
fn encode_stage_plan<U: AsExecutionPlan>(
165+
&mut self,
166+
_stage_id: usize,
167+
plan: &Arc<dyn ExecutionPlan>,
168+
codec: &dyn PhysicalExtensionCodec,
169+
) -> Result<Vec<u8>> {
170+
let mut plan_buf: Vec<u8> = vec![];
171+
let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?;
172+
plan_proto.try_encode(&mut plan_buf)?;
173+
174+
Ok(plan_buf)
175+
}
141176
}
142177

143178
#[derive(Clone)]
@@ -222,7 +257,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
222257
info!("Submitting execution graph: {:?}", graph);
223258

224259
self.state.submit_job(job_id.to_string(), &graph).await?;
225-
226260
graph.revive();
227261
self.active_job_cache
228262
.insert(job_id.to_owned(), JobInfoCache::new(graph));
@@ -483,22 +517,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
483517
let stage_id = task.partition.stage_id;
484518

485519
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
486-
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id) {
487-
plan.clone()
488-
} else {
489-
let mut plan_buf: Vec<u8> = vec![];
490-
let plan_proto = U::try_from_physical_plan(
491-
task.plan,
492-
self.codec.physical_extension_codec(),
493-
)?;
494-
plan_proto.try_encode(&mut plan_buf)?;
495-
496-
job_info
497-
.encoded_stage_plans
498-
.insert(stage_id, plan_buf.clone());
499-
500-
plan_buf
501-
};
520+
let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
521+
stage_id,
522+
&task.plan,
523+
self.codec.physical_extension_codec(),
524+
)?;
502525

503526
let task_definition = TaskDefinition {
504527
task_id: task.task_id as u32,
@@ -569,23 +592,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
569592
}
570593

571594
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
572-
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
573-
{
574-
plan.clone()
575-
} else {
576-
let mut plan_buf: Vec<u8> = vec![];
577-
let plan_proto = U::try_from_physical_plan(
578-
task.plan.clone(),
579-
self.codec.physical_extension_codec(),
580-
)?;
581-
plan_proto.try_encode(&mut plan_buf)?;
582-
583-
job_info
584-
.encoded_stage_plans
585-
.insert(stage_id, plan_buf.clone());
586-
587-
plan_buf
588-
};
595+
let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
596+
stage_id,
597+
&task.plan,
598+
self.codec.physical_extension_codec(),
599+
)?;
589600

590601
let launch_time = SystemTime::now()
591602
.duration_since(UNIX_EPOCH)

0 commit comments

Comments
 (0)