Skip to content

Commit 514aa22

Browse files
committed
fix: Cancel prepare_task_definition fail job
1 parent db98126 commit 514aa22

File tree

4 files changed

+72
-20
lines changed

4 files changed

+72
-20
lines changed

ballista/scheduler/src/cluster/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
4444

4545
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy};
4646
use crate::scheduler_server::SessionBuilder;
47-
use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
47+
use crate::state::execution_graph::{
48+
create_task_info, ExecutionGraph, ExecutionStage, TaskDescription,
49+
};
4850
use crate::state::task_manager::JobInfoCache;
4951

5052
pub mod event;
@@ -647,6 +649,29 @@ pub(crate) fn get_scan_files(
647649
Ok(collector)
648650
}
649651

652+
pub(crate) async fn unbind_prepare_failed_tasks(
653+
active_jobs: Arc<HashMap<String, JobInfoCache>>,
654+
failed_jobs: &HashMap<String, Vec<TaskDescription>>,
655+
) {
656+
for (job_id, failed_tasks) in failed_jobs.iter() {
657+
if let Some(job_info) = active_jobs.get(job_id) {
658+
let mut graph = job_info.execution_graph.write().await;
659+
failed_tasks.iter().for_each(|task| {
660+
if let Some(ExecutionStage::Running(running_stage)) =
661+
graph.stages_mut().get_mut(&task.partition.stage_id)
662+
{
663+
if let Some(task_info) = running_stage
664+
.task_infos
665+
.get_mut(task.partition.partition_id)
666+
{
667+
*task_info = None;
668+
}
669+
}
670+
});
671+
}
672+
}
673+
}
674+
650675
#[derive(Clone)]
651676
pub struct TopologyNode {
652677
pub id: String,

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@ use ballista_core::serde::scheduler::ExecutorMetadata;
3535
use datafusion_proto::logical_plan::AsLogicalPlan;
3636
use datafusion_proto::physical_plan::AsExecutionPlan;
3737
use log::{debug, error, info, trace, warn};
38+
use std::collections::HashMap;
3839
use std::net::SocketAddr;
3940

4041
use std::ops::Deref;
4142

42-
use crate::cluster::{bind_task_bias, bind_task_round_robin};
43+
use crate::cluster::{
44+
bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks,
45+
};
4346
use crate::config::TaskDistributionPolicy;
4447
use crate::scheduler_server::event::QueryStageSchedulerEvent;
48+
use crate::state::execution_graph::TaskDescription;
4549
use std::time::{SystemTime, UNIX_EPOCH};
4650
use tonic::{Request, Response, Status};
4751

@@ -112,10 +116,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
112116
let active_jobs = self.state.task_manager.get_running_job_cache();
113117
let schedulable_tasks = match self.state.config.task_distribution {
114118
TaskDistributionPolicy::Bias => {
115-
bind_task_bias(available_slots, active_jobs, |_| false).await
119+
bind_task_bias(available_slots, active_jobs.clone(), |_| false).await
116120
}
117121
TaskDistributionPolicy::RoundRobin => {
118-
bind_task_round_robin(available_slots, active_jobs, |_| false).await
122+
bind_task_round_robin(available_slots, active_jobs.clone(), |_| false).await
119123
}
120124
TaskDistributionPolicy::ConsistentHash{..} => {
121125
return Err(Status::unimplemented(
@@ -124,14 +128,36 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
124128
};
125129

126130
let mut tasks = vec![];
131+
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new();
127132
for (_, task) in schedulable_tasks {
128-
match self.state.task_manager.prepare_task_definition(task) {
133+
let job_id = task.partition.job_id.clone();
134+
if prepare_failed_jobs.contains_key(&job_id) {
135+
prepare_failed_jobs.entry(job_id).or_default().push(task);
136+
continue;
137+
}
138+
match self
139+
.state
140+
.task_manager
141+
.prepare_task_definition(task.clone())
142+
{
129143
Ok(task_definition) => tasks.push(task_definition),
130144
Err(e) => {
131145
error!("Error preparing task definition: {:?}", e);
146+
prepare_failed_jobs.entry(job_id).or_default().push(task);
132147
}
133148
}
134149
}
150+
151+
unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await;
152+
for job_id in prepare_failed_jobs.into_keys() {
153+
info!("Cancel prepare task definition failed job: {}", job_id);
154+
self.cancel_job(job_id).await.map_err(|e| {
155+
let msg = format!("Cancel job error due to {e:?}");
156+
error!("{}", msg);
157+
Status::internal(msg)
158+
})?;
159+
}
160+
135161
Ok(Response::new(PollWorkResult { tasks }))
136162
} else {
137163
warn!("Received invalid executor poll_work request");
@@ -527,21 +553,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
527553
) -> Result<Response<CancelJobResult>, Status> {
528554
let job_id = request.into_inner().job_id;
529555
info!("Received cancellation request for job {}", job_id);
530-
531-
self.query_stage_event_loop
532-
.get_sender()
533-
.map_err(|e| {
534-
let msg = format!("Get query stage event loop error due to {e:?}");
535-
error!("{}", msg);
536-
Status::internal(msg)
537-
})?
538-
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
539-
.await
540-
.map_err(|e| {
541-
let msg = format!("Post to query stage event loop error due to {e:?}");
542-
error!("{}", msg);
543-
Status::internal(msg)
544-
})?;
556+
self.cancel_job(job_id).await.map_err(|e| {
557+
let msg = format!("Cancel job error due to {e:?}");
558+
error!("{}", msg);
559+
Status::internal(msg)
560+
})?;
545561
Ok(Response::new(CancelJobResult { cancelled: true }))
546562
}
547563

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
183183
.await
184184
}
185185

186+
pub(crate) async fn cancel_job(&self, job_id: String) -> Result<()> {
187+
self.query_stage_event_loop
188+
.get_sender()?
189+
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
190+
.await
191+
}
192+
186193
/// It just send task status update event to the channel,
187194
/// and will not guarantee the event processing completed after return
188195
pub(crate) async fn update_task_status(

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ impl ExecutionGraph {
225225
&self.stages
226226
}
227227

228+
pub(crate) fn stages_mut(&mut self) -> &mut HashMap<usize, ExecutionStage> {
229+
&mut self.stages
230+
}
231+
228232
/// An ExecutionGraph is successful if all its stages are successful
229233
pub fn is_successful(&self) -> bool {
230234
self.stages

0 commit comments

Comments
 (0)