Skip to content

feat: add test to check for ctx.read_json() #1212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

westhide
Copy link
Contributor

@westhide westhide commented Mar 20, 2025

Which issue does this PR close?

Closes #1209.

Rationale for this change

Add test for #1209.
Also fix #1214

What changes are included in this PR?

Are there any user-facing changes?

@westhide
Copy link
Contributor Author

take

@westhide
Copy link
Contributor Author

The test in version 45.0.0 seems blocking, try fixing

@milenkovicm
Copy link
Contributor

apparently you found another bug:

for (_, task) in schedulable_tasks {

maybe if it is changed to:

            for (_, task) in schedulable_tasks {
                match self
                    .state
                    .task_manager
                    .prepare_task_definition(task.clone())
                {
                    Ok(task_definition) => tasks.push(task_definition),
                    Err(e) => {
                        let job_id = task.partition.job_id;
                        error!(
                            "Error preparing task for job_id: {} error: {:?} ",
                            job_id,
                            e.to_string(),
                        );
                        let _ = self
                            .state
                            .task_manager
                            .abort_job(&job_id, e.to_string())
                            .await;
                    }
                }
            }

whole job gets cancelled in case of error, wdyt?

@westhide
Copy link
Contributor Author

westhide commented Mar 21, 2025

Yes, I will try to fix this bug by sending the scheduler_server error to client side.

ballista_scheduler::scheduler_server::grpc: Error preparing task definition: DataFusionError(Internal("Unsupported plan and extension codec failed with [Internal error: unsupported plan type: NdJsonExec { base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]}, projection=[a], projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, file_compression_type: FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, output_ordering: None } }.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker]. Plan: NdJsonExec { base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]}, projection=[a], projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, file_compression_type: FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, output_ordering: None } }"))

@milenkovicm
Copy link
Contributor

I believe whole job should be cancelled

@westhide
Copy link
Contributor Author

I believe whole job should be cancelled

Yes, working on it.

@westhide westhide closed this Mar 21, 2025
@milenkovicm
Copy link
Contributor

ah no, sorry for misunderstanding please do not cancel this PR.
what I meant, in case of this type of error, ballista job should be cancelled.

@westhide
Copy link
Contributor Author

westhide commented Mar 21, 2025 via email

@westhide
Copy link
Contributor Author

ah no, sorry for misunderstanding please do not cancel this PR. what I meant, in case of this type of error, ballista job should be cancelled.

Hello @milenkovicm, Cancel Job after prepare task definition failed ready for review, Thx~

@milenkovicm
Copy link
Contributor

I won't be able to review this pr for few days. Will follow up asap.

}
}
}

unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whats the reason for this method?

When we detect that preparation of task failed, we can not recover from it so job should be cancelled.

Would self.cancel_job(job_id) trigger cancelation of all running tasks for given job and do cleanup of execution graph?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when task_manager execute prepare_task_definition, it will set task_info for running_stage, without this unbind_prepare_failed_tasks function to reset the task_info to None, when Scheduler try to cancel the job, it will try to send a stop task event to Executor, that will cause a task stop fail error log on the Executor side.

Copy link
Contributor

@milenkovicm milenkovicm Mar 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will it just show error in the log or crash executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just error log~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI

Scheduler get running_tasks by filter_map Some(task_info)

pub(super) fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> {
self.task_infos
.iter()
.enumerate()
.filter_map(|(partition, info)| match info {
Some(TaskInfo {task_id,
task_status: task_status::Status::Running(RunningTask { executor_id }), ..}) => {
Some((*task_id, self.stage_id, partition, executor_id.clone()))
}
_ => None,
})
.collect()
}

Scheduler Send CancelTasks event to Executor

QueryStageSchedulerEvent::JobCancel(job_id) => {
self.metrics_collector.record_cancelled(&job_id);
info!("Job {} Cancelled", job_id);
match self.state.task_manager.cancel_job(&job_id).await {
Ok((running_tasks, _pending_tasks)) => {
event_sender
.post_event(QueryStageSchedulerEvent::CancelTasks(
running_tasks,
))
.await?;
}
Err(e) => {
error!(
"Fail to invoke cancel_job for job {} due to {:?}",
job_id, e
);
}
}
self.state.clean_up_failed_job(job_id);

Executor log error!("Error cancelling task: {:?}", e); if cancel_tasks fail

async fn cancel_tasks(
&self,
request: Request<CancelTasksParams>,
) -> Result<Response<CancelTasksResult>, Status> {
let task_infos = request.into_inner().task_infos;
info!("Cancelling tasks for {:?}", task_infos);
let mut cancelled = true;
for task in task_infos {
if let Err(e) = self
.executor
.cancel_task(
task.task_id as usize,
task.job_id,
task.stage_id as usize,
task.partition_id as usize,
)
.await
{
error!("Error cancelling task: {:?}", e);
cancelled = false;
}
}
Ok(Response::new(CancelTasksResult { cancelled }))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to encode physical_plan to proto before create_task_info, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this issue captures very rare corner case, which should not happen in properly configured cluster.

for the sake of simplicity and understanding can we should just cancel the job (if cluster state is consistent at the end) If the consequence of canceling failed task is error log it may not be too big of a problem.

what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we should keep code simplicity. unbind_prepare_failed_tasks reverted.

@@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
async fn launch_tasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to propagate sender:EventSender<QueryStageSchedulerEvent>, and task.manager.launch_multi_task and cancel the jobs there?

task.manager.launch_multi_task semantics is not clean, do yield error if only one task fail?

@@ -524,24 +524,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
pub(crate) async fn launch_multi_task(
&self,
executor: &ExecutorMetadata,
tasks: Vec<Vec<TaskDescription>>,
tasks: HashMap<(String, usize), Vec<TaskDescription>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this parameter? if it is just job_id that we need, we can get that from task.partition if im not mistaken

executor_manager: &ExecutorManager,
) -> Result<()> {
) -> Result<HashMap<String, Vec<TaskDescription>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what would be better approach, do propagate the sender as parameter and cancel job or to return job_ids which tasks failed. anyway it does not look like we need to return anything but hash set failed job_is

match self.state.task_manager.prepare_task_definition(task) {
Ok(task_definition) => tasks.push(task_definition),
Err(e) => {
error!("Error preparing task definition: {:?}", e);
info!("Cancel prepare task definition failed job: {}", job_id);
if let Err(err) = self.cancel_job(job_id).await {
Copy link
Contributor

@milenkovicm milenkovicm Mar 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we fail job instead of canceling it QueryStageSchedulerEvent::JobRunningFailed {...}, not sure about queued_at property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it actually failed.

@milenkovicm
Copy link
Contributor

hey @westhide are you still interested to get this PR merged?

@milenkovicm milenkovicm marked this pull request as draft April 21, 2025 21:36
@milenkovicm
Copy link
Contributor

moving to draft as its waiting for changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants