-
Notifications
You must be signed in to change notification settings - Fork 217
feat: add test to check for ctx.read_json()
#1212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{"a":1} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider; | |
use datafusion::error::DataFusionError; | ||
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; | ||
use std::any::type_name; | ||
use std::collections::HashMap; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
|
@@ -174,17 +174,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
tokio::spawn(async move { | ||
let mut if_revive = false; | ||
match state.launch_tasks(schedulable_tasks).await { | ||
Ok(unassigned_executor_slots) => { | ||
if !unassigned_executor_slots.is_empty() { | ||
if let Err(e) = state | ||
.executor_manager | ||
.unbind_tasks(unassigned_executor_slots) | ||
.await | ||
Ok(launch_tasks_futs) => { | ||
let unassigned_slots = launch_tasks_futs | ||
.iter() | ||
.flat_map(LaunchMultiTaskFut::unassigned_slot) | ||
.collect::<Vec<_>>(); | ||
if !unassigned_slots.is_empty() { | ||
if let Err(e) = | ||
state.executor_manager.unbind_tasks(unassigned_slots).await | ||
{ | ||
error!("Fail to unbind tasks: {}", e); | ||
} | ||
if_revive = true; | ||
} | ||
let failed_jobs = launch_tasks_futs | ||
.into_iter() | ||
.flat_map(|fut| fut.prepare_failed_jobs.into_keys()) | ||
.collect::<HashSet<String>>(); | ||
for job_id in failed_jobs { | ||
if let Err(e) = sender | ||
.post_event(QueryStageSchedulerEvent::JobCancel(job_id)) | ||
.await | ||
{ | ||
error!("Cancel job error due to {e}"); | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
error!("Fail to launch tasks: {}", e); | ||
|
@@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
async fn launch_tasks( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it makes sense to propagate
|
||
&self, | ||
bound_tasks: Vec<BoundTask>, | ||
) -> Result<Vec<ExecutorSlot>> { | ||
) -> Result<Vec<LaunchMultiTaskFut>> { | ||
// Put tasks to the same executor together | ||
// And put tasks belonging to the same stage together for creating MultiTaskDefinition | ||
let mut executor_stage_assignments: HashMap< | ||
|
@@ -275,62 +289,58 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
|
||
let mut join_handles = vec![]; | ||
for (executor_id, tasks) in executor_stage_assignments.into_iter() { | ||
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect(); | ||
// Total number of tasks to be launched for one executor | ||
let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); | ||
let n_tasks: usize = tasks.values().map(Vec::len).sum(); | ||
|
||
let state = self.clone(); | ||
let join_handle = tokio::spawn(async move { | ||
let success = match state | ||
match state | ||
.executor_manager | ||
.get_executor_metadata(&executor_id) | ||
.await | ||
{ | ||
Ok(executor) => { | ||
if let Err(e) = state | ||
match state | ||
.task_manager | ||
.launch_multi_task(&executor, tasks, &state.executor_manager) | ||
.await | ||
{ | ||
let err_msg = format!("Failed to launch new task: {e}"); | ||
error!("{}", err_msg.clone()); | ||
|
||
// It's OK to remove executor aggressively, | ||
// since if the executor is in healthy state, it will be registered again. | ||
state.remove_executor(&executor_id, Some(err_msg)).await; | ||
|
||
false | ||
} else { | ||
true | ||
Ok(prepare_failed_jobs) => LaunchMultiTaskFut::new( | ||
executor_id, | ||
0, | ||
prepare_failed_jobs, | ||
), | ||
Err(e) => { | ||
let err_msg = format!("Failed to launch new task: {e}"); | ||
error!("{}", err_msg.clone()); | ||
|
||
// It's OK to remove executor aggressively, | ||
// since if the executor is in healthy state, it will be registered again. | ||
state.remove_executor(&executor_id, Some(err_msg)).await; | ||
LaunchMultiTaskFut::new( | ||
executor_id, | ||
n_tasks, | ||
HashMap::new(), | ||
) | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
error!("Failed to launch new task, could not get executor metadata: {}", e); | ||
false | ||
LaunchMultiTaskFut::new(executor_id, n_tasks, HashMap::new()) | ||
} | ||
}; | ||
if success { | ||
vec![] | ||
} else { | ||
vec![(executor_id.clone(), n_tasks as u32)] | ||
} | ||
}); | ||
join_handles.push(join_handle); | ||
} | ||
|
||
let unassigned_executor_slots = | ||
futures::future::join_all(join_handles) | ||
.await | ||
.into_iter() | ||
.collect::<std::result::Result< | ||
Vec<Vec<ExecutorSlot>>, | ||
tokio::task::JoinError, | ||
>>()?; | ||
|
||
Ok(unassigned_executor_slots | ||
let launch_futs = futures::future::join_all(join_handles) | ||
.await | ||
.into_iter() | ||
.flatten() | ||
.collect::<Vec<ExecutorSlot>>()) | ||
.collect::<std::result::Result<Vec<_>, tokio::task::JoinError>>( | ||
)?; | ||
|
||
Ok(launch_futs) | ||
} | ||
|
||
pub(crate) async fn update_task_statuses( | ||
|
@@ -463,3 +473,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
); | ||
} | ||
} | ||
|
||
pub(crate) struct LaunchMultiTaskFut { | ||
pub executor_id: String, | ||
pub unassigned_num: usize, | ||
pub prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>, | ||
} | ||
|
||
impl LaunchMultiTaskFut { | ||
pub fn new( | ||
executor_id: String, | ||
unassigned_num: usize, | ||
prepare_failed_jobs: HashMap<String, Vec<TaskDescription>>, | ||
) -> Self { | ||
Self { | ||
executor_id, | ||
unassigned_num, | ||
prepare_failed_jobs, | ||
} | ||
} | ||
|
||
pub fn unassigned_slot(&self) -> Option<ExecutorSlot> { | ||
let fail_num: usize = self.prepare_failed_jobs.values().map(Vec::len).sum(); | ||
let slots = (self.unassigned_num + fail_num) as u32; | ||
if slots > 0 { | ||
Some((self.executor_id.clone(), slots)) | ||
} else { | ||
None | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -524,24 +524,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> | |
pub(crate) async fn launch_multi_task( | ||
&self, | ||
executor: &ExecutorMetadata, | ||
tasks: Vec<Vec<TaskDescription>>, | ||
tasks: HashMap<(String, usize), Vec<TaskDescription>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to change this parameter? if it is just job_id that we need, we can get that from |
||
executor_manager: &ExecutorManager, | ||
) -> Result<()> { | ||
) -> Result<HashMap<String, Vec<TaskDescription>>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure what would be better approach, do propagate the sender as parameter and cancel job or to return job_ids which tasks failed. anyway it does not look like we need to return anything but hash set failed job_is |
||
let mut multi_tasks = vec![]; | ||
for stage_tasks in tasks { | ||
match self.prepare_multi_task_definition(stage_tasks) { | ||
let mut prepare_failed_jobs = HashMap::<String, Vec<TaskDescription>>::new(); | ||
for ((job_id, _stage_id), stage_tasks) in tasks { | ||
if prepare_failed_jobs.contains_key(&job_id) { | ||
prepare_failed_jobs | ||
.entry(job_id) | ||
.or_default() | ||
.extend(stage_tasks); | ||
continue; | ||
} | ||
match self.prepare_multi_task_definition(stage_tasks.clone()) { | ||
Ok(stage_tasks) => multi_tasks.extend(stage_tasks), | ||
Err(e) => error!("Fail to prepare task definition: {:?}", e), | ||
Err(e) => { | ||
error!("Fail to prepare task definition: {:?}", e); | ||
prepare_failed_jobs.insert(job_id, stage_tasks); | ||
} | ||
} | ||
} | ||
|
||
if !multi_tasks.is_empty() { | ||
self.launcher | ||
.launch_tasks(executor, multi_tasks, executor_manager) | ||
.await | ||
} else { | ||
Ok(()) | ||
.await?; | ||
} | ||
|
||
Ok(prepare_failed_jobs) | ||
} | ||
|
||
#[allow(dead_code)] | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better if we fail job instead of canceling it
QueryStageSchedulerEvent::JobRunningFailed {...}
, not sure aboutqueued_at
propertyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it actually failed.