diff --git a/ballista/client/testdata/simple.json b/ballista/client/testdata/simple.json new file mode 100644 index 000000000..0187f3b09 --- /dev/null +++ b/ballista/client/testdata/simple.json @@ -0,0 +1 @@ +{"a":1} diff --git a/ballista/client/tests/context_unsupported.rs b/ballista/client/tests/context_unsupported.rs index 805e81325..6d70ff517 100644 --- a/ballista/client/tests/context_unsupported.rs +++ b/ballista/client/tests/context_unsupported.rs @@ -209,6 +209,37 @@ mod unsupported { "+----+------------+---------------------+", ]; + assert_batches_eq!(expected, &result); + } + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + #[should_panic] + // "Error preparing task definition: Unsupported plan and extension codec failed with unsupported plan type: NdJsonExec" + async fn should_support_json_source( + #[future(awt)] + #[case] + ctx: SessionContext, + test_data: String, + ) { + let result = ctx + .read_json(&format!("{test_data}/simple.json"), Default::default()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + #[rustfmt::skip] + let expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "+---+" + ]; + assert_batches_eq!(expected, &result); } } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 02c21a884..e99224271 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -125,10 +125,15 @@ impl SchedulerGrpc let mut tasks = vec![]; for (_, task) in schedulable_tasks { + let job_id = task.partition.job_id.clone(); match self.state.task_manager.prepare_task_definition(task) { Ok(task_definition) => tasks.push(task_definition), Err(e) => { error!("Error preparing task definition: {:?}", e); + info!("Cancel prepare task definition failed job: {}", job_id); + if let Err(err) = self.cancel_job(job_id).await { + error!("Failed to cancel job {err:?}"); + } } } } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 653e2d410..9f3ac2e34 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -183,6 +183,13 @@ impl SchedulerServer Result<()> { + self.query_stage_event_loop + .get_sender()? + .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) + .await + } + /// It just send task status update event to the channel, /// and will not guarantee the event processing completed after return pub(crate) async fn update_task_status( diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 4394dc009..4e39613e5 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider; use datafusion::error::DataFusionError; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use std::any::type_name; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; @@ -174,17 +174,31 @@ impl SchedulerState { - if !unassigned_executor_slots.is_empty() { - if let Err(e) = state - .executor_manager - .unbind_tasks(unassigned_executor_slots) - .await + Ok(launch_tasks_futs) => { + let unassigned_slots = launch_tasks_futs + .iter() + .flat_map(LaunchMultiTaskFut::unassigned_slot) + .collect::>(); + if !unassigned_slots.is_empty() { + if let Err(e) = + state.executor_manager.unbind_tasks(unassigned_slots).await { error!("Fail to unbind tasks: {}", e); } if_revive = true; } + let failed_jobs = launch_tasks_futs + .into_iter() + .flat_map(|fut| fut.prepare_failed_jobs.into_keys()) + .collect::>(); + for job_id in failed_jobs { + if let Err(e) = sender + .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) + .await + { + error!("Cancel job error due to {e}"); + } + } } Err(e) => { error!("Fail to launch tasks: {}", e); @@ -248,7 +262,7 @@ impl SchedulerState, - ) -> Result> { + ) -> Result> { // Put tasks to the same executor together // And put tasks belonging to the same stage together for creating MultiTaskDefinition let mut executor_stage_assignments: HashMap< @@ -275,62 +289,58 @@ impl SchedulerState> = tasks.into_values().collect(); // Total number of tasks to be launched for one executor - let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); + let n_tasks: usize = tasks.values().map(Vec::len).sum(); let state = self.clone(); let join_handle = tokio::spawn(async move { - let success = match state + match state .executor_manager .get_executor_metadata(&executor_id) .await { Ok(executor) => { - if let Err(e) = state + match state .task_manager .launch_multi_task(&executor, tasks, &state.executor_manager) .await { - let err_msg = format!("Failed to launch new task: {e}"); - error!("{}", err_msg.clone()); - - // It's OK to remove executor aggressively, - // since if the executor is in healthy state, it will be registered again. - state.remove_executor(&executor_id, Some(err_msg)).await; - - false - } else { - true + Ok(prepare_failed_jobs) => LaunchMultiTaskFut::new( + executor_id, + 0, + prepare_failed_jobs, + ), + Err(e) => { + let err_msg = format!("Failed to launch new task: {e}"); + error!("{}", err_msg.clone()); + + // It's OK to remove executor aggressively, + // since if the executor is in healthy state, it will be registered again. + state.remove_executor(&executor_id, Some(err_msg)).await; + LaunchMultiTaskFut::new( + executor_id, + n_tasks, + HashMap::new(), + ) + } } } Err(e) => { error!("Failed to launch new task, could not get executor metadata: {}", e); - false + LaunchMultiTaskFut::new(executor_id, n_tasks, HashMap::new()) } - }; - if success { - vec![] - } else { - vec![(executor_id.clone(), n_tasks as u32)] } }); join_handles.push(join_handle); } - let unassigned_executor_slots = - futures::future::join_all(join_handles) - .await - .into_iter() - .collect::>, - tokio::task::JoinError, - >>()?; - - Ok(unassigned_executor_slots + let launch_futs = futures::future::join_all(join_handles) + .await .into_iter() - .flatten() - .collect::>()) + .collect::, tokio::task::JoinError>>( + )?; + + Ok(launch_futs) } pub(crate) async fn update_task_statuses( @@ -463,3 +473,33 @@ impl SchedulerState>, +} + +impl LaunchMultiTaskFut { + pub fn new( + executor_id: String, + unassigned_num: usize, + prepare_failed_jobs: HashMap>, + ) -> Self { + Self { + executor_id, + unassigned_num, + prepare_failed_jobs, + } + } + + pub fn unassigned_slot(&self) -> Option { + let fail_num: usize = self.prepare_failed_jobs.values().map(Vec::len).sum(); + let slots = (self.unassigned_num + fail_num) as u32; + if slots > 0 { + Some((self.executor_id.clone(), slots)) + } else { + None + } + } +} diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 53a352bd1..75afbc8d8 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -524,24 +524,35 @@ impl TaskManager pub(crate) async fn launch_multi_task( &self, executor: &ExecutorMetadata, - tasks: Vec>, + tasks: HashMap<(String, usize), Vec>, executor_manager: &ExecutorManager, - ) -> Result<()> { + ) -> Result>> { let mut multi_tasks = vec![]; - for stage_tasks in tasks { - match self.prepare_multi_task_definition(stage_tasks) { + let mut prepare_failed_jobs = HashMap::>::new(); + for ((job_id, _stage_id), stage_tasks) in tasks { + if prepare_failed_jobs.contains_key(&job_id) { + prepare_failed_jobs + .entry(job_id) + .or_default() + .extend(stage_tasks); + continue; + } + match self.prepare_multi_task_definition(stage_tasks.clone()) { Ok(stage_tasks) => multi_tasks.extend(stage_tasks), - Err(e) => error!("Fail to prepare task definition: {:?}", e), + Err(e) => { + error!("Fail to prepare task definition: {:?}", e); + prepare_failed_jobs.insert(job_id, stage_tasks); + } } } if !multi_tasks.is_empty() { self.launcher .launch_tasks(executor, multi_tasks, executor_manager) - .await - } else { - Ok(()) + .await?; } + + Ok(prepare_failed_jobs) } #[allow(dead_code)]