From 1971bc9caeeccfb50a39faa8670deea76986154e Mon Sep 17 00:00:00 2001 From: "westhide.dev" Date: Thu, 20 Mar 2025 08:03:03 +0800 Subject: [PATCH 1/3] feat: add test to check for `ctx.read_json()` --- ballista/client/testdata/simple.json | 1 + ballista/client/tests/context_unsupported.rs | 31 ++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 ballista/client/testdata/simple.json diff --git a/ballista/client/testdata/simple.json b/ballista/client/testdata/simple.json new file mode 100644 index 000000000..0187f3b09 --- /dev/null +++ b/ballista/client/testdata/simple.json @@ -0,0 +1 @@ +{"a":1} diff --git a/ballista/client/tests/context_unsupported.rs b/ballista/client/tests/context_unsupported.rs index 805e81325..6d70ff517 100644 --- a/ballista/client/tests/context_unsupported.rs +++ b/ballista/client/tests/context_unsupported.rs @@ -209,6 +209,37 @@ mod unsupported { "+----+------------+---------------------+", ]; + assert_batches_eq!(expected, &result); + } + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + #[should_panic] + // "Error preparing task definition: Unsupported plan and extension codec failed with unsupported plan type: NdJsonExec" + async fn should_support_json_source( + #[future(awt)] + #[case] + ctx: SessionContext, + test_data: String, + ) { + let result = ctx + .read_json(&format!("{test_data}/simple.json"), Default::default()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + #[rustfmt::skip] + let expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "+---+" + ]; + assert_batches_eq!(expected, &result); } } From 1dc68c53ae32ccfa6d137c28d938e7f4a094d775 Mon Sep 17 00:00:00 2001 From: "westhide.dev" Date: Sat, 22 Mar 2025 15:57:35 +0800 Subject: [PATCH 2/3] fix: Cancel prepare_task_definition fail job --- ballista/scheduler/src/cluster/mod.rs | 27 +++- .../scheduler/src/scheduler_server/grpc.rs | 54 +++++--- .../scheduler/src/scheduler_server/mod.rs | 7 + .../scheduler/src/state/execution_graph.rs | 4 + ballista/scheduler/src/state/mod.rs | 120 ++++++++++++------ ballista/scheduler/src/state/task_manager.rs | 27 ++-- 6 files changed, 171 insertions(+), 68 deletions(-) diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index fd546f59b..b14f62543 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -44,7 +44,9 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState}; use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy}; use crate::scheduler_server::SessionBuilder; -use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription}; +use crate::state::execution_graph::{ + create_task_info, ExecutionGraph, ExecutionStage, TaskDescription, +}; use crate::state::task_manager::JobInfoCache; pub mod event; @@ -647,6 +649,29 @@ pub(crate) fn get_scan_files( Ok(collector) } +pub(crate) async fn unbind_prepare_failed_tasks( + active_jobs: Arc>, + failed_jobs: &HashMap>, +) { + for (job_id, failed_tasks) in failed_jobs.iter() { + if let Some(job_info) = active_jobs.get(job_id) { + let mut graph = job_info.execution_graph.write().await; + failed_tasks.iter().for_each(|task| { + if let Some(ExecutionStage::Running(running_stage)) = + graph.stages_mut().get_mut(&task.partition.stage_id) + { + if let Some(task_info) = running_stage + .task_infos + .get_mut(task.partition.partition_id) + { + *task_info = None; + } + } + }); + } + } +} + #[derive(Clone)] pub struct TopologyNode { pub id: String, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 02c21a884..894e43dbc 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -35,13 +35,17 @@ use ballista_core::serde::scheduler::ExecutorMetadata; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use log::{debug, error, info, trace, warn}; +use std::collections::HashMap; use std::net::SocketAddr; use std::ops::Deref; -use crate::cluster::{bind_task_bias, bind_task_round_robin}; +use crate::cluster::{ + bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks, +}; use crate::config::TaskDistributionPolicy; use crate::scheduler_server::event::QueryStageSchedulerEvent; +use crate::state::execution_graph::TaskDescription; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; @@ -112,10 +116,10 @@ impl SchedulerGrpc let active_jobs = self.state.task_manager.get_running_job_cache(); let schedulable_tasks = match self.state.config.task_distribution { TaskDistributionPolicy::Bias => { - bind_task_bias(available_slots, active_jobs, |_| false).await + bind_task_bias(available_slots, active_jobs.clone(), |_| false).await } TaskDistributionPolicy::RoundRobin => { - bind_task_round_robin(available_slots, active_jobs, |_| false).await + bind_task_round_robin(available_slots, active_jobs.clone(), |_| false).await } TaskDistributionPolicy::ConsistentHash{..} => { return Err(Status::unimplemented( @@ -124,14 +128,36 @@ impl SchedulerGrpc }; let mut tasks = vec![]; + let mut prepare_failed_jobs = HashMap::>::new(); for (_, task) in schedulable_tasks { - match self.state.task_manager.prepare_task_definition(task) { + let job_id = task.partition.job_id.clone(); + if prepare_failed_jobs.contains_key(&job_id) { + prepare_failed_jobs.entry(job_id).or_default().push(task); + continue; + } + match self + .state + .task_manager + .prepare_task_definition(task.clone()) + { Ok(task_definition) => tasks.push(task_definition), Err(e) => { error!("Error preparing task definition: {:?}", e); + prepare_failed_jobs.entry(job_id).or_default().push(task); } } } + + unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await; + for job_id in prepare_failed_jobs.into_keys() { + info!("Cancel prepare task definition failed job: {}", job_id); + self.cancel_job(job_id).await.map_err(|e| { + let msg = format!("Cancel job error due to {e:?}"); + error!("{}", msg); + Status::internal(msg) + })?; + } + Ok(Response::new(PollWorkResult { tasks })) } else { warn!("Received invalid executor poll_work request"); @@ -527,21 +553,11 @@ impl SchedulerGrpc ) -> Result, Status> { let job_id = request.into_inner().job_id; info!("Received cancellation request for job {}", job_id); - - self.query_stage_event_loop - .get_sender() - .map_err(|e| { - let msg = format!("Get query stage event loop error due to {e:?}"); - error!("{}", msg); - Status::internal(msg) - })? - .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) - .await - .map_err(|e| { - let msg = format!("Post to query stage event loop error due to {e:?}"); - error!("{}", msg); - Status::internal(msg) - })?; + self.cancel_job(job_id).await.map_err(|e| { + let msg = format!("Cancel job error due to {e:?}"); + error!("{}", msg); + Status::internal(msg) + })?; Ok(Response::new(CancelJobResult { cancelled: true })) } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 653e2d410..9f3ac2e34 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -183,6 +183,13 @@ impl SchedulerServer Result<()> { + self.query_stage_event_loop + .get_sender()? + .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) + .await + } + /// It just send task status update event to the channel, /// and will not guarantee the event processing completed after return pub(crate) async fn update_task_status( diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index f3e6bf768..7a43560a2 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -225,6 +225,10 @@ impl ExecutionGraph { &self.stages } + pub(crate) fn stages_mut(&mut self) -> &mut HashMap { + &mut self.stages + } + /// An ExecutionGraph is successful if all its stages are successful pub fn is_successful(&self) -> bool { self.stages diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 4394dc009..4e39613e5 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider; use datafusion::error::DataFusionError; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use std::any::type_name; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; @@ -174,17 +174,31 @@ impl SchedulerState { - if !unassigned_executor_slots.is_empty() { - if let Err(e) = state - .executor_manager - .unbind_tasks(unassigned_executor_slots) - .await + Ok(launch_tasks_futs) => { + let unassigned_slots = launch_tasks_futs + .iter() + .flat_map(LaunchMultiTaskFut::unassigned_slot) + .collect::>(); + if !unassigned_slots.is_empty() { + if let Err(e) = + state.executor_manager.unbind_tasks(unassigned_slots).await { error!("Fail to unbind tasks: {}", e); } if_revive = true; } + let failed_jobs = launch_tasks_futs + .into_iter() + .flat_map(|fut| fut.prepare_failed_jobs.into_keys()) + .collect::>(); + for job_id in failed_jobs { + if let Err(e) = sender + .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) + .await + { + error!("Cancel job error due to {e}"); + } + } } Err(e) => { error!("Fail to launch tasks: {}", e); @@ -248,7 +262,7 @@ impl SchedulerState, - ) -> Result> { + ) -> Result> { // Put tasks to the same executor together // And put tasks belonging to the same stage together for creating MultiTaskDefinition let mut executor_stage_assignments: HashMap< @@ -275,62 +289,58 @@ impl SchedulerState> = tasks.into_values().collect(); // Total number of tasks to be launched for one executor - let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); + let n_tasks: usize = tasks.values().map(Vec::len).sum(); let state = self.clone(); let join_handle = tokio::spawn(async move { - let success = match state + match state .executor_manager .get_executor_metadata(&executor_id) .await { Ok(executor) => { - if let Err(e) = state + match state .task_manager .launch_multi_task(&executor, tasks, &state.executor_manager) .await { - let err_msg = format!("Failed to launch new task: {e}"); - error!("{}", err_msg.clone()); - - // It's OK to remove executor aggressively, - // since if the executor is in healthy state, it will be registered again. - state.remove_executor(&executor_id, Some(err_msg)).await; - - false - } else { - true + Ok(prepare_failed_jobs) => LaunchMultiTaskFut::new( + executor_id, + 0, + prepare_failed_jobs, + ), + Err(e) => { + let err_msg = format!("Failed to launch new task: {e}"); + error!("{}", err_msg.clone()); + + // It's OK to remove executor aggressively, + // since if the executor is in healthy state, it will be registered again. + state.remove_executor(&executor_id, Some(err_msg)).await; + LaunchMultiTaskFut::new( + executor_id, + n_tasks, + HashMap::new(), + ) + } } } Err(e) => { error!("Failed to launch new task, could not get executor metadata: {}", e); - false + LaunchMultiTaskFut::new(executor_id, n_tasks, HashMap::new()) } - }; - if success { - vec![] - } else { - vec![(executor_id.clone(), n_tasks as u32)] } }); join_handles.push(join_handle); } - let unassigned_executor_slots = - futures::future::join_all(join_handles) - .await - .into_iter() - .collect::>, - tokio::task::JoinError, - >>()?; - - Ok(unassigned_executor_slots + let launch_futs = futures::future::join_all(join_handles) + .await .into_iter() - .flatten() - .collect::>()) + .collect::, tokio::task::JoinError>>( + )?; + + Ok(launch_futs) } pub(crate) async fn update_task_statuses( @@ -463,3 +473,33 @@ impl SchedulerState>, +} + +impl LaunchMultiTaskFut { + pub fn new( + executor_id: String, + unassigned_num: usize, + prepare_failed_jobs: HashMap>, + ) -> Self { + Self { + executor_id, + unassigned_num, + prepare_failed_jobs, + } + } + + pub fn unassigned_slot(&self) -> Option { + let fail_num: usize = self.prepare_failed_jobs.values().map(Vec::len).sum(); + let slots = (self.unassigned_num + fail_num) as u32; + if slots > 0 { + Some((self.executor_id.clone(), slots)) + } else { + None + } + } +} diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 53a352bd1..75afbc8d8 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -524,24 +524,35 @@ impl TaskManager pub(crate) async fn launch_multi_task( &self, executor: &ExecutorMetadata, - tasks: Vec>, + tasks: HashMap<(String, usize), Vec>, executor_manager: &ExecutorManager, - ) -> Result<()> { + ) -> Result>> { let mut multi_tasks = vec![]; - for stage_tasks in tasks { - match self.prepare_multi_task_definition(stage_tasks) { + let mut prepare_failed_jobs = HashMap::>::new(); + for ((job_id, _stage_id), stage_tasks) in tasks { + if prepare_failed_jobs.contains_key(&job_id) { + prepare_failed_jobs + .entry(job_id) + .or_default() + .extend(stage_tasks); + continue; + } + match self.prepare_multi_task_definition(stage_tasks.clone()) { Ok(stage_tasks) => multi_tasks.extend(stage_tasks), - Err(e) => error!("Fail to prepare task definition: {:?}", e), + Err(e) => { + error!("Fail to prepare task definition: {:?}", e); + prepare_failed_jobs.insert(job_id, stage_tasks); + } } } if !multi_tasks.is_empty() { self.launcher .launch_tasks(executor, multi_tasks, executor_manager) - .await - } else { - Ok(()) + .await?; } + + Ok(prepare_failed_jobs) } #[allow(dead_code)] From f109cd8532043350ba9d8b395d1da1e296aef852 Mon Sep 17 00:00:00 2001 From: "westhide.dev" Date: Sun, 30 Mar 2025 22:06:26 +0800 Subject: [PATCH 3/3] fix: Cancel prepare_task_definition fail job --- ballista/scheduler/src/cluster/mod.rs | 27 +-------- .../scheduler/src/scheduler_server/grpc.rs | 57 ++++++++----------- .../scheduler/src/state/execution_graph.rs | 4 -- 3 files changed, 24 insertions(+), 64 deletions(-) diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index b14f62543..fd546f59b 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -44,9 +44,7 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState}; use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy}; use crate::scheduler_server::SessionBuilder; -use crate::state::execution_graph::{ - create_task_info, ExecutionGraph, ExecutionStage, TaskDescription, -}; +use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription}; use crate::state::task_manager::JobInfoCache; pub mod event; @@ -649,29 +647,6 @@ pub(crate) fn get_scan_files( Ok(collector) } -pub(crate) async fn unbind_prepare_failed_tasks( - active_jobs: Arc>, - failed_jobs: &HashMap>, -) { - for (job_id, failed_tasks) in failed_jobs.iter() { - if let Some(job_info) = active_jobs.get(job_id) { - let mut graph = job_info.execution_graph.write().await; - failed_tasks.iter().for_each(|task| { - if let Some(ExecutionStage::Running(running_stage)) = - graph.stages_mut().get_mut(&task.partition.stage_id) - { - if let Some(task_info) = running_stage - .task_infos - .get_mut(task.partition.partition_id) - { - *task_info = None; - } - } - }); - } - } -} - #[derive(Clone)] pub struct TopologyNode { pub id: String, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 894e43dbc..e99224271 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -35,17 +35,13 @@ use ballista_core::serde::scheduler::ExecutorMetadata; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use log::{debug, error, info, trace, warn}; -use std::collections::HashMap; use std::net::SocketAddr; use std::ops::Deref; -use crate::cluster::{ - bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks, -}; +use crate::cluster::{bind_task_bias, bind_task_round_robin}; use crate::config::TaskDistributionPolicy; use crate::scheduler_server::event::QueryStageSchedulerEvent; -use crate::state::execution_graph::TaskDescription; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; @@ -116,10 +112,10 @@ impl SchedulerGrpc let active_jobs = self.state.task_manager.get_running_job_cache(); let schedulable_tasks = match self.state.config.task_distribution { TaskDistributionPolicy::Bias => { - bind_task_bias(available_slots, active_jobs.clone(), |_| false).await + bind_task_bias(available_slots, active_jobs, |_| false).await } TaskDistributionPolicy::RoundRobin => { - bind_task_round_robin(available_slots, active_jobs.clone(), |_| false).await + bind_task_round_robin(available_slots, active_jobs, |_| false).await } TaskDistributionPolicy::ConsistentHash{..} => { return Err(Status::unimplemented( @@ -128,36 +124,19 @@ impl SchedulerGrpc }; let mut tasks = vec![]; - let mut prepare_failed_jobs = HashMap::>::new(); for (_, task) in schedulable_tasks { let job_id = task.partition.job_id.clone(); - if prepare_failed_jobs.contains_key(&job_id) { - prepare_failed_jobs.entry(job_id).or_default().push(task); - continue; - } - match self - .state - .task_manager - .prepare_task_definition(task.clone()) - { + match self.state.task_manager.prepare_task_definition(task) { Ok(task_definition) => tasks.push(task_definition), Err(e) => { error!("Error preparing task definition: {:?}", e); - prepare_failed_jobs.entry(job_id).or_default().push(task); + info!("Cancel prepare task definition failed job: {}", job_id); + if let Err(err) = self.cancel_job(job_id).await { + error!("Failed to cancel job {err:?}"); + } } } } - - unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await; - for job_id in prepare_failed_jobs.into_keys() { - info!("Cancel prepare task definition failed job: {}", job_id); - self.cancel_job(job_id).await.map_err(|e| { - let msg = format!("Cancel job error due to {e:?}"); - error!("{}", msg); - Status::internal(msg) - })?; - } - Ok(Response::new(PollWorkResult { tasks })) } else { warn!("Received invalid executor poll_work request"); @@ -553,11 +532,21 @@ impl SchedulerGrpc ) -> Result, Status> { let job_id = request.into_inner().job_id; info!("Received cancellation request for job {}", job_id); - self.cancel_job(job_id).await.map_err(|e| { - let msg = format!("Cancel job error due to {e:?}"); - error!("{}", msg); - Status::internal(msg) - })?; + + self.query_stage_event_loop + .get_sender() + .map_err(|e| { + let msg = format!("Get query stage event loop error due to {e:?}"); + error!("{}", msg); + Status::internal(msg) + })? + .post_event(QueryStageSchedulerEvent::JobCancel(job_id)) + .await + .map_err(|e| { + let msg = format!("Post to query stage event loop error due to {e:?}"); + error!("{}", msg); + Status::internal(msg) + })?; Ok(Response::new(CancelJobResult { cancelled: true })) } diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 7a43560a2..f3e6bf768 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -225,10 +225,6 @@ impl ExecutionGraph { &self.stages } - pub(crate) fn stages_mut(&mut self) -> &mut HashMap { - &mut self.stages - } - /// An ExecutionGraph is successful if all its stages are successful pub fn is_successful(&self) -> bool { self.stages