Skip to content

Commit f109cd8

Browse files
committed
fix: Cancel prepare_task_definition fail job
1 parent 1dc68c5 commit f109cd8

File tree

3 files changed

+24
-64
lines changed

3 files changed

+24
-64
lines changed

ballista/scheduler/src/cluster/mod.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
4444

4545
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy};
4646
use crate::scheduler_server::SessionBuilder;
47-
use crate::state::execution_graph::{
48-
create_task_info, ExecutionGraph, ExecutionStage, TaskDescription,
49-
};
47+
use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
5048
use crate::state::task_manager::JobInfoCache;
5149

5250
pub mod event;
@@ -649,29 +647,6 @@ pub(crate) fn get_scan_files(
649647
Ok(collector)
650648
}
651649

652-
pub(crate) async fn unbind_prepare_failed_tasks(
653-
active_jobs: Arc<HashMap<String, JobInfoCache>>,
654-
failed_jobs: &HashMap<String, Vec<TaskDescription>>,
655-
) {
656-
for (job_id, failed_tasks) in failed_jobs.iter() {
657-
if let Some(job_info) = active_jobs.get(job_id) {
658-
let mut graph = job_info.execution_graph.write().await;
659-
failed_tasks.iter().for_each(|task| {
660-
if let Some(ExecutionStage::Running(running_stage)) =
661-
graph.stages_mut().get_mut(&task.partition.stage_id)
662-
{
663-
if let Some(task_info) = running_stage
664-
.task_infos
665-
.get_mut(task.partition.partition_id)
666-
{
667-
*task_info = None;
668-
}
669-
}
670-
});
671-
}
672-
}
673-
}
674-
675650
#[derive(Clone)]
676651
pub struct TopologyNode {
677652
pub id: String,

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,13 @@ use ballista_core::serde::scheduler::ExecutorMetadata;
3535
use datafusion_proto::logical_plan::AsLogicalPlan;
3636
use datafusion_proto::physical_plan::AsExecutionPlan;
3737
use log::{debug, error, info, trace, warn};
38-
use std::collections::HashMap;
3938
use std::net::SocketAddr;
4039

4140
use std::ops::Deref;
4241

43-
use crate::cluster::{
44-
bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks,
45-
};
42+
use crate::cluster::{bind_task_bias, bind_task_round_robin};
4643
use crate::config::TaskDistributionPolicy;
4744
use crate::scheduler_server::event::QueryStageSchedulerEvent;
48-
use crate::state::execution_graph::TaskDescription;
4945
use std::time::{SystemTime, UNIX_EPOCH};
5046
use tonic::{Request, Response, Status};
5147

@@ -116,10 +112,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
116112
let active_jobs = self.state.task_manager.get_running_job_cache();
117113
let schedulable_tasks = match self.state.config.task_distribution {
118114
TaskDistributionPolicy::Bias => {
119-
bind_task_bias(available_slots, active_jobs.clone(), |_| false).await
115+
bind_task_bias(available_slots, active_jobs, |_| false).await
120116
}
121117
TaskDistributionPolicy::RoundRobin => {
122-
bind_task_round_robin(available_slots, active_jobs.clone(), |_| false).await
118+
bind_task_round_robin(available_slots, active_jobs, |_| false).await
123119
}
124120
TaskDistributionPolicy::ConsistentHash{..} => {
125121
return Err(Status::unimplemented(
@@ -128,36 +124,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
128124
};
129125

130126
let mut tasks = vec![];
131-
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new();
132127
for (_, task) in schedulable_tasks {
133128
let job_id = task.partition.job_id.clone();
134-
if prepare_failed_jobs.contains_key(&job_id) {
135-
prepare_failed_jobs.entry(job_id).or_default().push(task);
136-
continue;
137-
}
138-
match self
139-
.state
140-
.task_manager
141-
.prepare_task_definition(task.clone())
142-
{
129+
match self.state.task_manager.prepare_task_definition(task) {
143130
Ok(task_definition) => tasks.push(task_definition),
144131
Err(e) => {
145132
error!("Error preparing task definition: {:?}", e);
146-
prepare_failed_jobs.entry(job_id).or_default().push(task);
133+
info!("Cancel prepare task definition failed job: {}", job_id);
134+
if let Err(err) = self.cancel_job(job_id).await {
135+
error!("Failed to cancel job {err:?}");
136+
}
147137
}
148138
}
149139
}
150-
151-
unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await;
152-
for job_id in prepare_failed_jobs.into_keys() {
153-
info!("Cancel prepare task definition failed job: {}", job_id);
154-
self.cancel_job(job_id).await.map_err(|e| {
155-
let msg = format!("Cancel job error due to {e:?}");
156-
error!("{}", msg);
157-
Status::internal(msg)
158-
})?;
159-
}
160-
161140
Ok(Response::new(PollWorkResult { tasks }))
162141
} else {
163142
warn!("Received invalid executor poll_work request");
@@ -553,11 +532,21 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
553532
) -> Result<Response<CancelJobResult>, Status> {
554533
let job_id = request.into_inner().job_id;
555534
info!("Received cancellation request for job {}", job_id);
556-
self.cancel_job(job_id).await.map_err(|e| {
557-
let msg = format!("Cancel job error due to {e:?}");
558-
error!("{}", msg);
559-
Status::internal(msg)
560-
})?;
535+
536+
self.query_stage_event_loop
537+
.get_sender()
538+
.map_err(|e| {
539+
let msg = format!("Get query stage event loop error due to {e:?}");
540+
error!("{}", msg);
541+
Status::internal(msg)
542+
})?
543+
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
544+
.await
545+
.map_err(|e| {
546+
let msg = format!("Post to query stage event loop error due to {e:?}");
547+
error!("{}", msg);
548+
Status::internal(msg)
549+
})?;
561550
Ok(Response::new(CancelJobResult { cancelled: true }))
562551
}
563552

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,6 @@ impl ExecutionGraph {
225225
&self.stages
226226
}
227227

228-
pub(crate) fn stages_mut(&mut self) -> &mut HashMap<usize, ExecutionStage> {
229-
&mut self.stages
230-
}
231-
232228
/// An ExecutionGraph is successful if all its stages are successful
233229
pub fn is_successful(&self) -> bool {
234230
self.stages

0 commit comments

Comments
 (0)