Skip to content

Commit ca3d3af

Browse files
committed
fix: Cancel prepare_task_definition fail job
1 parent db98126 commit ca3d3af

File tree

6 files changed

+171
-68
lines changed

6 files changed

+171
-68
lines changed

ballista/scheduler/src/cluster/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
4444

4545
use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy};
4646
use crate::scheduler_server::SessionBuilder;
47-
use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
47+
use crate::state::execution_graph::{
48+
create_task_info, ExecutionGraph, ExecutionStage, TaskDescription,
49+
};
4850
use crate::state::task_manager::JobInfoCache;
4951

5052
pub mod event;
@@ -647,6 +649,29 @@ pub(crate) fn get_scan_files(
647649
Ok(collector)
648650
}
649651

652+
pub(crate) async fn unbind_prepare_failed_tasks(
653+
active_jobs: Arc<HashMap<String, JobInfoCache>>,
654+
failed_jobs: &HashMap<String, Vec<TaskDescription>>,
655+
) {
656+
for (job_id, failed_tasks) in failed_jobs.iter() {
657+
if let Some(job_info) = active_jobs.get(job_id) {
658+
let mut graph = job_info.execution_graph.write().await;
659+
failed_tasks.iter().for_each(|task| {
660+
if let Some(ExecutionStage::Running(running_stage)) =
661+
graph.stages_mut().get_mut(&task.partition.stage_id)
662+
{
663+
if let Some(task_info) = running_stage
664+
.task_infos
665+
.get_mut(task.partition.partition_id)
666+
{
667+
*task_info = None;
668+
}
669+
}
670+
});
671+
}
672+
}
673+
}
674+
650675
#[derive(Clone)]
651676
pub struct TopologyNode {
652677
pub id: String,

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@ use ballista_core::serde::scheduler::ExecutorMetadata;
3535
use datafusion_proto::logical_plan::AsLogicalPlan;
3636
use datafusion_proto::physical_plan::AsExecutionPlan;
3737
use log::{debug, error, info, trace, warn};
38+
use std::collections::HashMap;
3839
use std::net::SocketAddr;
3940

4041
use std::ops::Deref;
4142

42-
use crate::cluster::{bind_task_bias, bind_task_round_robin};
43+
use crate::cluster::{
44+
bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks,
45+
};
4346
use crate::config::TaskDistributionPolicy;
4447
use crate::scheduler_server::event::QueryStageSchedulerEvent;
48+
use crate::state::execution_graph::TaskDescription;
4549
use std::time::{SystemTime, UNIX_EPOCH};
4650
use tonic::{Request, Response, Status};
4751

@@ -112,10 +116,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
112116
let active_jobs = self.state.task_manager.get_running_job_cache();
113117
let schedulable_tasks = match self.state.config.task_distribution {
114118
TaskDistributionPolicy::Bias => {
115-
bind_task_bias(available_slots, active_jobs, |_| false).await
119+
bind_task_bias(available_slots, active_jobs.clone(), |_| false).await
116120
}
117121
TaskDistributionPolicy::RoundRobin => {
118-
bind_task_round_robin(available_slots, active_jobs, |_| false).await
122+
bind_task_round_robin(available_slots, active_jobs.clone(), |_| false).await
119123
}
120124
TaskDistributionPolicy::ConsistentHash{..} => {
121125
return Err(Status::unimplemented(
@@ -124,14 +128,36 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
124128
};
125129

126130
let mut tasks = vec![];
131+
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new();
127132
for (_, task) in schedulable_tasks {
128-
match self.state.task_manager.prepare_task_definition(task) {
133+
let job_id = task.partition.job_id.clone();
134+
if prepare_failed_jobs.contains_key(&job_id) {
135+
prepare_failed_jobs.entry(job_id).or_default().push(task);
136+
continue;
137+
}
138+
match self
139+
.state
140+
.task_manager
141+
.prepare_task_definition(task.clone())
142+
{
129143
Ok(task_definition) => tasks.push(task_definition),
130144
Err(e) => {
131145
error!("Error preparing task definition: {:?}", e);
146+
prepare_failed_jobs.entry(job_id).or_default().push(task);
132147
}
133148
}
134149
}
150+
151+
unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await;
152+
for job_id in prepare_failed_jobs.into_keys() {
153+
info!("Cancel prepare task definition failed job: {}", job_id);
154+
self.cancel_job(job_id).await.map_err(|e| {
155+
let msg = format!("Cancel job error due to {e:?}");
156+
error!("{}", msg);
157+
Status::internal(msg)
158+
})?;
159+
}
160+
135161
Ok(Response::new(PollWorkResult { tasks }))
136162
} else {
137163
warn!("Received invalid executor poll_work request");
@@ -527,21 +553,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
527553
) -> Result<Response<CancelJobResult>, Status> {
528554
let job_id = request.into_inner().job_id;
529555
info!("Received cancellation request for job {}", job_id);
530-
531-
self.query_stage_event_loop
532-
.get_sender()
533-
.map_err(|e| {
534-
let msg = format!("Get query stage event loop error due to {e:?}");
535-
error!("{}", msg);
536-
Status::internal(msg)
537-
})?
538-
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
539-
.await
540-
.map_err(|e| {
541-
let msg = format!("Post to query stage event loop error due to {e:?}");
542-
error!("{}", msg);
543-
Status::internal(msg)
544-
})?;
556+
self.cancel_job(job_id).await.map_err(|e| {
557+
let msg = format!("Cancel job error due to {e:?}");
558+
error!("{}", msg);
559+
Status::internal(msg)
560+
})?;
545561
Ok(Response::new(CancelJobResult { cancelled: true }))
546562
}
547563

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
183183
.await
184184
}
185185

186+
pub(crate) async fn cancel_job(&self, job_id: String) -> Result<()> {
187+
self.query_stage_event_loop
188+
.get_sender()?
189+
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
190+
.await
191+
}
192+
186193
/// It just send task status update event to the channel,
187194
/// and will not guarantee the event processing completed after return
188195
pub(crate) async fn update_task_status(

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ impl ExecutionGraph {
225225
&self.stages
226226
}
227227

228+
pub(crate) fn stages_mut(&mut self) -> &mut HashMap<usize, ExecutionStage> {
229+
&mut self.stages
230+
}
231+
228232
/// An ExecutionGraph is successful if all its stages are successful
229233
pub fn is_successful(&self) -> bool {
230234
self.stages

ballista/scheduler/src/state/mod.rs

Lines changed: 80 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider;
2121
use datafusion::error::DataFusionError;
2222
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
2323
use std::any::type_name;
24-
use std::collections::HashMap;
24+
use std::collections::{HashMap, HashSet};
2525
use std::sync::Arc;
2626
use std::time::Instant;
2727

@@ -174,17 +174,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
174174
tokio::spawn(async move {
175175
let mut if_revive = false;
176176
match state.launch_tasks(schedulable_tasks).await {
177-
Ok(unassigned_executor_slots) => {
178-
if !unassigned_executor_slots.is_empty() {
179-
if let Err(e) = state
180-
.executor_manager
181-
.unbind_tasks(unassigned_executor_slots)
182-
.await
177+
Ok(launch_tasks_futs) => {
178+
let unassigned_slots = launch_tasks_futs
179+
.iter()
180+
.flat_map(LaunchMultiTaskFut::unassigned_slot)
181+
.collect::<Vec<_>>();
182+
if !unassigned_slots.is_empty() {
183+
if let Err(e) =
184+
state.executor_manager.unbind_tasks(unassigned_slots).await
183185
{
184186
error!("Fail to unbind tasks: {}", e);
185187
}
186188
if_revive = true;
187189
}
190+
let failed_jobs = launch_tasks_futs
191+
.into_iter()
192+
.flat_map(|fut| fut.prepare_failed_jobs.into_keys())
193+
.collect::<HashSet<String>>();
194+
for job_id in failed_jobs {
195+
if let Err(e) = sender
196+
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
197+
.await
198+
{
199+
error!("Cancel job error due to {e}");
200+
}
201+
}
188202
}
189203
Err(e) => {
190204
error!("Fail to launch tasks: {}", e);
@@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
248262
async fn launch_tasks(
249263
&self,
250264
bound_tasks: Vec<BoundTask>,
251-
) -> Result<Vec<ExecutorSlot>> {
265+
) -> Result<Vec<LaunchMultiTaskFut>> {
252266
// Put tasks to the same executor together
253267
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
254268
let mut executor_stage_assignments: HashMap<
@@ -275,62 +289,58 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
275289

276290
let mut join_handles = vec![];
277291
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
278-
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
279292
// Total number of tasks to be launched for one executor
280-
let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
293+
let n_tasks: usize = tasks.values().map(Vec::len).sum();
281294

282295
let state = self.clone();
283296
let join_handle = tokio::spawn(async move {
284-
let success = match state
297+
match state
285298
.executor_manager
286299
.get_executor_metadata(&executor_id)
287300
.await
288301
{
289302
Ok(executor) => {
290-
if let Err(e) = state
303+
match state
291304
.task_manager
292305
.launch_multi_task(&executor, tasks, &state.executor_manager)
293306
.await
294307
{
295-
let err_msg = format!("Failed to launch new task: {e}");
296-
error!("{}", err_msg.clone());
297-
298-
// It's OK to remove executor aggressively,
299-
// since if the executor is in healthy state, it will be registered again.
300-
state.remove_executor(&executor_id, Some(err_msg)).await;
301-
302-
false
303-
} else {
304-
true
308+
Ok(prepare_failed_jobs) => LaunchMultiTaskFut::new(
309+
executor_id,
310+
0,
311+
prepare_failed_jobs,
312+
),
313+
Err(e) => {
314+
let err_msg = format!("Failed to launch new task: {e}");
315+
error!("{}", err_msg.clone());
316+
317+
// It's OK to remove executor aggressively,
318+
// since if the executor is in healthy state, it will be registered again.
319+
state.remove_executor(&executor_id, Some(err_msg)).await;
320+
LaunchMultiTaskFut::new(
321+
executor_id,
322+
n_tasks,
323+
HashMap::new(),
324+
)
325+
}
305326
}
306327
}
307328
Err(e) => {
308329
error!("Failed to launch new task, could not get executor metadata: {}", e);
309-
false
330+
LaunchMultiTaskFut::new(executor_id, n_tasks, HashMap::new())
310331
}
311-
};
312-
if success {
313-
vec![]
314-
} else {
315-
vec![(executor_id.clone(), n_tasks as u32)]
316332
}
317333
});
318334
join_handles.push(join_handle);
319335
}
320336

321-
let unassigned_executor_slots =
322-
futures::future::join_all(join_handles)
323-
.await
324-
.into_iter()
325-
.collect::<std::result::Result<
326-
Vec<Vec<ExecutorSlot>>,
327-
tokio::task::JoinError,
328-
>>()?;
329-
330-
Ok(unassigned_executor_slots
337+
let launch_futs = futures::future::join_all(join_handles)
338+
.await
331339
.into_iter()
332-
.flatten()
333-
.collect::<Vec<ExecutorSlot>>())
340+
.collect::<std::result::Result<Vec<_>, tokio::task::JoinError>>(
341+
)?;
342+
343+
Ok(launch_futs)
334344
}
335345

336346
pub(crate) async fn update_task_statuses(
@@ -463,3 +473,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
463473
);
464474
}
465475
}
476+
477+
pub(crate) struct LaunchMultiTaskFut {
478+
pub executor_id: String,
479+
pub unassigned_num: usize,
480+
pub prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>,
481+
}
482+
483+
impl LaunchMultiTaskFut {
484+
pub fn new(
485+
executor_id: String,
486+
unassigned_num: usize,
487+
prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>,
488+
) -> Self {
489+
Self {
490+
executor_id,
491+
unassigned_num,
492+
prepare_failed_jobs,
493+
}
494+
}
495+
496+
pub fn unassigned_slot(&self) -> Option<ExecutorSlot> {
497+
let fail_num: usize = self.prepare_failed_jobs.values().map(Vec::len).sum();
498+
let slots = (self.unassigned_num + fail_num) as u32;
499+
if slots > 0 {
500+
Some((self.executor_id.clone(), slots))
501+
} else {
502+
None
503+
}
504+
}
505+
}

ballista/scheduler/src/state/task_manager.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -524,24 +524,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
524524
pub(crate) async fn launch_multi_task(
525525
&self,
526526
executor: &ExecutorMetadata,
527-
tasks: Vec<Vec<TaskDescription>>,
527+
tasks: HashMap<(String, usize), Vec<TaskDescription>>,
528528
executor_manager: &ExecutorManager,
529-
) -> Result<()> {
529+
) -> Result<HashMap<String, Vec<TaskDescription>>> {
530530
let mut multi_tasks = vec![];
531-
for stage_tasks in tasks {
532-
match self.prepare_multi_task_definition(stage_tasks) {
531+
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new();
532+
for ((job_id, _stage_id), stage_tasks) in tasks {
533+
if prepare_failed_jobs.contains_key(&job_id) {
534+
prepare_failed_jobs
535+
.entry(job_id)
536+
.or_default()
537+
.extend(stage_tasks);
538+
continue;
539+
}
540+
match self.prepare_multi_task_definition(stage_tasks.clone()) {
533541
Ok(stage_tasks) => multi_tasks.extend(stage_tasks),
534-
Err(e) => error!("Fail to prepare task definition: {:?}", e),
542+
Err(e) => {
543+
error!("Fail to prepare task definition: {:?}", e);
544+
prepare_failed_jobs.insert(job_id, stage_tasks);
545+
}
535546
}
536547
}
537548

538549
if !multi_tasks.is_empty() {
539550
self.launcher
540551
.launch_tasks(executor, multi_tasks, executor_manager)
541-
.await
542-
} else {
543-
Ok(())
552+
.await?;
544553
}
554+
555+
Ok(prepare_failed_jobs)
545556
}
546557

547558
#[allow(dead_code)]

0 commit comments

Comments
 (0)