Skip to content

feat: add test to check for ctx.read_json() #1212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/client/testdata/simple.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"a":1}
31 changes: 31 additions & 0 deletions ballista/client/tests/context_unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,37 @@ mod unsupported {
"+----+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
}
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
#[should_panic]
// "Error preparing task definition: Unsupported plan and extension codec failed with unsupported plan type: NdJsonExec"
async fn should_support_json_source(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
let result = ctx
.read_json(&format!("{test_data}/simple.json"), Default::default())
.await
.unwrap()
.collect()
.await
.unwrap();

#[rustfmt::skip]
let expected = [
"+---+",
"| a |",
"+---+",
"| 1 |",
"+---+"
];

assert_batches_eq!(expected, &result);
}
}
5 changes: 5 additions & 0 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc

let mut tasks = vec![];
for (_, task) in schedulable_tasks {
let job_id = task.partition.job_id.clone();
match self.state.task_manager.prepare_task_definition(task) {
Ok(task_definition) => tasks.push(task_definition),
Err(e) => {
error!("Error preparing task definition: {:?}", e);
info!("Cancel prepare task definition failed job: {}", job_id);
if let Err(err) = self.cancel_job(job_id).await {
Copy link
Contributor

@milenkovicm milenkovicm Mar 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we fail job instead of canceling it QueryStageSchedulerEvent::JobRunningFailed {...}, not sure about queued_at property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it actually failed.

error!("Failed to cancel job {err:?}");
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.await
}

pub(crate) async fn cancel_job(&self, job_id: String) -> Result<()> {
self.query_stage_event_loop
.get_sender()?
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
.await
}

/// It just send task status update event to the channel,
/// and will not guarantee the event processing completed after return
pub(crate) async fn update_task_status(
Expand Down
120 changes: 80 additions & 40 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::any::type_name;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -174,17 +174,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
tokio::spawn(async move {
let mut if_revive = false;
match state.launch_tasks(schedulable_tasks).await {
Ok(unassigned_executor_slots) => {
if !unassigned_executor_slots.is_empty() {
if let Err(e) = state
.executor_manager
.unbind_tasks(unassigned_executor_slots)
.await
Ok(launch_tasks_futs) => {
let unassigned_slots = launch_tasks_futs
.iter()
.flat_map(LaunchMultiTaskFut::unassigned_slot)
.collect::<Vec<_>>();
if !unassigned_slots.is_empty() {
if let Err(e) =
state.executor_manager.unbind_tasks(unassigned_slots).await
{
error!("Fail to unbind tasks: {}", e);
}
if_revive = true;
}
let failed_jobs = launch_tasks_futs
.into_iter()
.flat_map(|fut| fut.prepare_failed_jobs.into_keys())
.collect::<HashSet<String>>();
for job_id in failed_jobs {
if let Err(e) = sender
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
.await
{
error!("Cancel job error due to {e}");
}
}
}
Err(e) => {
error!("Fail to launch tasks: {}", e);
Expand Down Expand Up @@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
async fn launch_tasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to propagate sender:EventSender<QueryStageSchedulerEvent>, and task.manager.launch_multi_task and cancel the jobs there?

task.manager.launch_multi_task semantics is not clean, do yield error if only one task fail?

&self,
bound_tasks: Vec<BoundTask>,
) -> Result<Vec<ExecutorSlot>> {
) -> Result<Vec<LaunchMultiTaskFut>> {
// Put tasks to the same executor together
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
let mut executor_stage_assignments: HashMap<
Expand All @@ -275,62 +289,58 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,

let mut join_handles = vec![];
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
// Total number of tasks to be launched for one executor
let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
let n_tasks: usize = tasks.values().map(Vec::len).sum();

let state = self.clone();
let join_handle = tokio::spawn(async move {
let success = match state
match state
.executor_manager
.get_executor_metadata(&executor_id)
.await
{
Ok(executor) => {
if let Err(e) = state
match state
.task_manager
.launch_multi_task(&executor, tasks, &state.executor_manager)
.await
{
let err_msg = format!("Failed to launch new task: {e}");
error!("{}", err_msg.clone());

// It's OK to remove executor aggressively,
// since if the executor is in healthy state, it will be registered again.
state.remove_executor(&executor_id, Some(err_msg)).await;

false
} else {
true
Ok(prepare_failed_jobs) => LaunchMultiTaskFut::new(
executor_id,
0,
prepare_failed_jobs,
),
Err(e) => {
let err_msg = format!("Failed to launch new task: {e}");
error!("{}", err_msg.clone());

// It's OK to remove executor aggressively,
// since if the executor is in healthy state, it will be registered again.
state.remove_executor(&executor_id, Some(err_msg)).await;
LaunchMultiTaskFut::new(
executor_id,
n_tasks,
HashMap::new(),
)
}
}
}
Err(e) => {
error!("Failed to launch new task, could not get executor metadata: {}", e);
false
LaunchMultiTaskFut::new(executor_id, n_tasks, HashMap::new())
}
};
if success {
vec![]
} else {
vec![(executor_id.clone(), n_tasks as u32)]
}
});
join_handles.push(join_handle);
}

let unassigned_executor_slots =
futures::future::join_all(join_handles)
.await
.into_iter()
.collect::<std::result::Result<
Vec<Vec<ExecutorSlot>>,
tokio::task::JoinError,
>>()?;

Ok(unassigned_executor_slots
let launch_futs = futures::future::join_all(join_handles)
.await
.into_iter()
.flatten()
.collect::<Vec<ExecutorSlot>>())
.collect::<std::result::Result<Vec<_>, tokio::task::JoinError>>(
)?;

Ok(launch_futs)
}

pub(crate) async fn update_task_statuses(
Expand Down Expand Up @@ -463,3 +473,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
);
}
}

pub(crate) struct LaunchMultiTaskFut {
pub executor_id: String,
pub unassigned_num: usize,
pub prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>,
}

impl LaunchMultiTaskFut {
pub fn new(
executor_id: String,
unassigned_num: usize,
prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>,
) -> Self {
Self {
executor_id,
unassigned_num,
prepare_failed_jobs,
}
}

pub fn unassigned_slot(&self) -> Option<ExecutorSlot> {
let fail_num: usize = self.prepare_failed_jobs.values().map(Vec::len).sum();
let slots = (self.unassigned_num + fail_num) as u32;
if slots > 0 {
Some((self.executor_id.clone(), slots))
} else {
None
}
}
}
27 changes: 19 additions & 8 deletions ballista/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,24 +524,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
pub(crate) async fn launch_multi_task(
&self,
executor: &ExecutorMetadata,
tasks: Vec<Vec<TaskDescription>>,
tasks: HashMap<(String, usize), Vec<TaskDescription>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this parameter? if it is just job_id that we need, we can get that from task.partition if im not mistaken

executor_manager: &ExecutorManager,
) -> Result<()> {
) -> Result<HashMap<String, Vec<TaskDescription>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what would be better approach, do propagate the sender as parameter and cancel job or to return job_ids which tasks failed. anyway it does not look like we need to return anything but hash set failed job_is

let mut multi_tasks = vec![];
for stage_tasks in tasks {
match self.prepare_multi_task_definition(stage_tasks) {
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new();
for ((job_id, _stage_id), stage_tasks) in tasks {
if prepare_failed_jobs.contains_key(&job_id) {
prepare_failed_jobs
.entry(job_id)
.or_default()
.extend(stage_tasks);
continue;
}
match self.prepare_multi_task_definition(stage_tasks.clone()) {
Ok(stage_tasks) => multi_tasks.extend(stage_tasks),
Err(e) => error!("Fail to prepare task definition: {:?}", e),
Err(e) => {
error!("Fail to prepare task definition: {:?}", e);
prepare_failed_jobs.insert(job_id, stage_tasks);
}
}
}

if !multi_tasks.is_empty() {
self.launcher
.launch_tasks(executor, multi_tasks, executor_manager)
.await
} else {
Ok(())
.await?;
}

Ok(prepare_failed_jobs)
}

#[allow(dead_code)]
Expand Down
Loading