-
Notifications
You must be signed in to change notification settings - Fork 217
feat: add test to check for ctx.read_json()
#1212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
take |
The test in version |
apparently you found another bug:
maybe if it is changed to: for (_, task) in schedulable_tasks {
match self
.state
.task_manager
.prepare_task_definition(task.clone())
{
Ok(task_definition) => tasks.push(task_definition),
Err(e) => {
let job_id = task.partition.job_id;
error!(
"Error preparing task for job_id: {} error: {:?} ",
job_id,
e.to_string(),
);
let _ = self
.state
.task_manager
.abort_job(&job_id, e.to_string())
.await;
}
}
} whole job gets cancelled in case of error, wdyt? |
Yes, I will try to fix this bug by sending the scheduler_server error to client side.
|
I believe whole job should be cancelled |
Yes, working on it. |
ah no, sorry for misunderstanding please do not cancel this PR. |
Got you, I'm working on fix this bug by reset the taskinfo and cancel the ballista job
Get Outlook for iOS<https://aka.ms/o0ukef>
…________________________________
From: Marko Milenković ***@***.***>
Sent: Saturday, March 22, 2025 12:25:58 AM
To: apache/datafusion-ballista ***@***.***>
Cc: westhide ***@***.***>; State change ***@***.***>
Subject: Re: [apache/datafusion-ballista] feat: add test to check for `ctx.read_json()` (PR #1212)
ah no, sorry for misunderstanding please do not cancel this PR.
what I meant, in case of this type of error, ballista job should be cancelled.
—
Reply to this email directly, view it on GitHub<#1212 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/BF34KCABOSHUAMA6K3OTNVL2VQ4RNAVCNFSM6AAAAABZMEWM2KVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDONBTHA3DMOBXHA>.
You are receiving this because you modified the open/close state.Message ID: ***@***.***>
[milenkovicm]milenkovicm left a comment (apache/datafusion-ballista#1212)<#1212 (comment)>
ah no, sorry for misunderstanding please do not cancel this PR.
what I meant, in case of this type of error, ballista job should be cancelled.
—
Reply to this email directly, view it on GitHub<#1212 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/BF34KCABOSHUAMA6K3OTNVL2VQ4RNAVCNFSM6AAAAABZMEWM2KVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDONBTHA3DMOBXHA>.
You are receiving this because you modified the open/close state.Message ID: ***@***.***>
|
514aa22
to
1e760ca
Compare
Hello @milenkovicm, Cancel Job after prepare task definition failed ready for review, Thx~ |
I won't be able to review this pr for few days. Will follow up asap. |
} | ||
} | ||
} | ||
|
||
unbind_prepare_failed_tasks(active_jobs, &prepare_failed_jobs).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whats the reason for this method?
When we detect that preparation of task failed, we can not recover from it so job should be cancelled.
Would self.cancel_job(job_id)
trigger cancelation of all running tasks for given job and do cleanup of execution graph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when task_manager execute prepare_task_definition
, it will set task_info
for running_stage, without this unbind_prepare_failed_tasks
function to reset the task_info to None
, when Scheduler try to cancel the job, it will try to send a stop task event to Executor, that will cause a task stop fail error log on the Executor side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So will it just show error in the log or crash executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just error log~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI
Scheduler get running_tasks
by filter_map
Some(task_info)
datafusion-ballista/ballista/scheduler/src/state/execution_graph/execution_stage.rs
Lines 587 to 599 in 9f8e4fc
pub(super) fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> { | |
self.task_infos | |
.iter() | |
.enumerate() | |
.filter_map(|(partition, info)| match info { | |
Some(TaskInfo {task_id, | |
task_status: task_status::Status::Running(RunningTask { executor_id }), ..}) => { | |
Some((*task_id, self.stage_id, partition, executor_id.clone())) | |
} | |
_ => None, | |
}) | |
.collect() | |
} |
Scheduler Send CancelTasks
event to Executor
datafusion-ballista/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Lines 232 to 251 in 9f8e4fc
QueryStageSchedulerEvent::JobCancel(job_id) => { | |
self.metrics_collector.record_cancelled(&job_id); | |
info!("Job {} Cancelled", job_id); | |
match self.state.task_manager.cancel_job(&job_id).await { | |
Ok((running_tasks, _pending_tasks)) => { | |
event_sender | |
.post_event(QueryStageSchedulerEvent::CancelTasks( | |
running_tasks, | |
)) | |
.await?; | |
} | |
Err(e) => { | |
error!( | |
"Fail to invoke cancel_job for job {} due to {:?}", | |
job_id, e | |
); | |
} | |
} | |
self.state.clean_up_failed_job(job_id); |
Executor log error!("Error cancelling task: {:?}", e);
if cancel_tasks
fail
datafusion-ballista/ballista/executor/src/executor_server.rs
Lines 706 to 732 in 9f8e4fc
async fn cancel_tasks( | |
&self, | |
request: Request<CancelTasksParams>, | |
) -> Result<Response<CancelTasksResult>, Status> { | |
let task_infos = request.into_inner().task_infos; | |
info!("Cancelling tasks for {:?}", task_infos); | |
let mut cancelled = true; | |
for task in task_infos { | |
if let Err(e) = self | |
.executor | |
.cancel_task( | |
task.task_id as usize, | |
task.job_id, | |
task.stage_id as usize, | |
task.partition_id as usize, | |
) | |
.await | |
{ | |
error!("Error cancelling task: {:?}", e); | |
cancelled = false; | |
} | |
} | |
Ok(Response::new(CancelTasksResult { cancelled })) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to encode physical_plan to proto before create_task_info
, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this issue captures very rare corner case, which should not happen in properly configured cluster.
for the sake of simplicity and understanding can we should just cancel the job (if cluster state is consistent at the end) If the consequence of canceling failed task is error log it may not be too big of a problem.
what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we should keep code simplicity. unbind_prepare_failed_tasks
reverted.
@@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |||
async fn launch_tasks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it makes sense to propagate sender:EventSender<QueryStageSchedulerEvent>,
and task.manager.launch_multi_task
and cancel the jobs there?
task.manager.launch_multi_task
semantics is not clean, do yield error if only one task fail?
@@ -524,24 +524,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> | |||
pub(crate) async fn launch_multi_task( | |||
&self, | |||
executor: &ExecutorMetadata, | |||
tasks: Vec<Vec<TaskDescription>>, | |||
tasks: HashMap<(String, usize), Vec<TaskDescription>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to change this parameter? if it is just job_id that we need, we can get that from task.partition
if im not mistaken
executor_manager: &ExecutorManager, | ||
) -> Result<()> { | ||
) -> Result<HashMap<String, Vec<TaskDescription>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what would be better approach, do propagate the sender as parameter and cancel job or to return job_ids which tasks failed. anyway it does not look like we need to return anything but hash set failed job_is
match self.state.task_manager.prepare_task_definition(task) { | ||
Ok(task_definition) => tasks.push(task_definition), | ||
Err(e) => { | ||
error!("Error preparing task definition: {:?}", e); | ||
info!("Cancel prepare task definition failed job: {}", job_id); | ||
if let Err(err) = self.cancel_job(job_id).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better if we fail job instead of canceling it QueryStageSchedulerEvent::JobRunningFailed {...}
, not sure about queued_at
property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it actually failed.
hey @westhide are you still interested to get this PR merged? |
moving to draft as its waiting for changes |
Which issue does this PR close?
Closes #1209.
Rationale for this change
Add test for #1209.
Also fix #1214
What changes are included in this PR?
Are there any user-facing changes?