Skip to content

Commit

Permalink
feat: improve early stop (#4257)
Browse files Browse the repository at this point in the history
  • Loading branch information
HugoCasa authored Aug 20, 2024
1 parent 776e978 commit bcde2e6
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 75 deletions.
8 changes: 8 additions & 0 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
}
.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1152,6 +1153,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
}
.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand All @@ -1166,6 +1168,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
}
.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1270,6 +1273,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
concurrency_time_window_s: None,
}.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1319,6 +1323,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
concurrency_time_window_s: None,
}.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1354,6 +1359,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
concurrency_time_window_s: None,
}.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand All @@ -1369,6 +1375,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
],
}.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1411,6 +1418,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
concurrency_time_window_s: None,
}.into(),
stop_after_if: Default::default(),
stop_after_all_iters_if: Default::default(),
summary: Default::default(),
suspend: Default::default(),
retry: None,
Expand Down
4 changes: 4 additions & 0 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,7 @@ mod tests {
tag_override: None,
}),
stop_after_if: None,
stop_after_all_iters_if: None,
summary: None,
suspend: Default::default(),
retry: None,
Expand Down Expand Up @@ -1172,6 +1173,7 @@ mod tests {
expr: "foo = 'bar'".to_string(),
skip_if_stopped: false,
}),
stop_after_all_iters_if: None,
summary: None,
suspend: Default::default(),
retry: None,
Expand All @@ -1198,6 +1200,7 @@ mod tests {
expr: "previous.isEmpty()".to_string(),
skip_if_stopped: false,
}),
stop_after_all_iters_if: None,
summary: None,
suspend: Default::default(),
retry: None,
Expand All @@ -1223,6 +1226,7 @@ mod tests {
expr: "previous.isEmpty()".to_string(),
skip_if_stopped: false,
}),
stop_after_all_iters_if: None,
summary: None,
suspend: Default::default(),
retry: None,
Expand Down
3 changes: 3 additions & 0 deletions backend/windmill-common/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ pub struct FlowModule {
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_after_if: Option<StopAfterIf>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_after_all_iters_if: Option<StopAfterIf>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub suspend: Option<Suspend>,
Expand Down Expand Up @@ -580,6 +582,7 @@ pub fn add_virtual_items_if_necessary(modules: &mut Vec<FlowModule>) {
id: format!("{}-v", modules[modules.len() - 1].id),
value: crate::worker::to_raw_value(&FlowModuleValue::Identity),
stop_after_if: None,
stop_after_all_iters_if: None,
summary: Some("Virtual module needed for suspend/sleep when last module".to_string()),
mock: None,
retry: None,
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
},
),
stop_after_if: None,
stop_after_all_iters_if: None,
summary: None,
suspend: None,
mock: None,
Expand Down
76 changes: 72 additions & 4 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,27 @@ pub async fn update_flow_status_after_job_completion_internal<
// "UPDATE FLOW STATUS 2: {module_index:#?} {module_status:#?} {old_status:#?} "
// );

let (skip_loop_failures, parallelism) = if matches!(
let (is_loop, skip_loop_failures, parallelism) = if matches!(
module_status,
FlowStatusModule::InProgress { iterator: Some(_), .. }
) {
let (loop_failures, parallelism) =
compute_skip_loop_failures_and_parallelism(flow, old_status.step, db).await?;
(loop_failures.unwrap_or(false), parallelism)
(true, loop_failures.unwrap_or(false), parallelism)
} else {
(false, None)
(false, false, None)
};

let is_branch_all = matches!(
module_status,
FlowStatusModule::InProgress { branchall: Some(_), .. }
);

// 0 length flows are not failure steps
let is_failure_step =
old_status.step >= old_status.modules.len() as i32 && old_status.modules.len() > 0;

let (mut stop_early, skip_if_stop_early, continue_on_error) = if let Some(se) =
let (mut stop_early, mut skip_if_stop_early, continue_on_error) = if let Some(se) =
stop_early_override
{
//do not stop early if module is a flow step
Expand Down Expand Up @@ -280,7 +285,18 @@ pub async fn update_flow_status_after_job_completion_internal<
.map_err(|e| Error::InternalErr(format!("retrieval of stop_early_expr from state: {e:#}")))?;

let stop_early = success
&& !is_branch_all
&& if let Some(expr) = r.stop_early_expr.clone() {
let all_iters = match &module_status {
FlowStatusModule::InProgress { flow_jobs: Some(flow_jobs), .. }
if expr.contains("all_iters") =>
{
Some(Arc::new(
retrieve_flow_jobs_results(db, w_id, flow_jobs).await?,
))
}
_ => None,
};
compute_bool_from_expr(
expr,
Marc::new(
Expand All @@ -290,6 +306,7 @@ pub async fn update_flow_status_after_job_completion_internal<
.to_owned(),
),
result.clone(),
all_iters,
None,
Some(client),
None,
Expand Down Expand Up @@ -735,6 +752,50 @@ pub async fn update_flow_status_after_job_completion_internal<
_ => result.clone(),
};

match &new_status {
Some(FlowStatusModule::Success { .. }) if is_loop || is_branch_all => {
let r_after_all_iters = sqlx::query_as::<_, SkipIfStopped>(
"SELECT
raw_flow->'modules'->$1::int->'stop_after_all_iters_if'->>'expr' as stop_early_expr,
(raw_flow->'modules'->$1::int->'stop_after_all_iters_if'->>'skip_if_stopped')::bool as skip_if_stopped,
NULL as continue_on_error,
args
FROM queue
WHERE id = $2"
)
.bind(old_status.step)
.bind(flow)
.fetch_one(db)
.await
.map_err(|e| Error::InternalErr(format!("retrieval of stop_early_expr from state: {e:#}")))?;
if let Some(expr) = r_after_all_iters.stop_early_expr {
let should_stop = compute_bool_from_expr(
expr,
Marc::new(
r_after_all_iters
.args
.map(|x| x.0)
.unwrap_or_else(|| serde_json::from_str("{}").unwrap())
.to_owned(),
),
nresult.clone(),
None,
None,
Some(client),
None,
None,
)
.await?;

if should_stop {
stop_early = should_stop;
skip_if_stop_early = r_after_all_iters.skip_if_stopped.unwrap_or(false);
}
}
}
_ => {}
}

if old_status.retry.fail_count > 0
&& matches!(&new_status, Some(FlowStatusModule::Success { .. }))
{
Expand Down Expand Up @@ -1186,13 +1247,17 @@ async fn compute_bool_from_expr(
expr: String,
flow_args: Marc<HashMap<String, Box<RawValue>>>,
result: Arc<Box<RawValue>>,
all_iters: Option<Arc<Box<RawValue>>>,
by_id: Option<IdContext>,
client: Option<&AuthedClient>,
resumes: Option<(Arc<Box<RawValue>>, Arc<Box<RawValue>>, Arc<Box<RawValue>>)>,
ctx: Option<Vec<(String, String)>>,
) -> error::Result<bool> {
let mut context = HashMap::with_capacity(if resumes.is_some() { 7 } else { 3 });
context.insert("result".to_string(), result.clone());
if let Some(all_iters) = all_iters {
context.insert("all_iters".to_string(), all_iters);
}
context.insert("previous_result".to_string(), result.clone());

if let Some(resumes) = resumes {
Expand Down Expand Up @@ -1568,6 +1633,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
arc_flow_job_args.clone(),
Arc::new(to_raw_value(&json!("{}"))),
None,
None,
Some(client),
None,
Some(vec![(
Expand Down Expand Up @@ -2982,6 +3048,7 @@ async fn compute_next_flow_transform(
b.expr.to_string(),
arc_flow_job_args.clone(),
arc_last_job_result.clone(),
None,
Some(idcontext.clone()),
Some(client),
Some((resumes.clone(), resume.clone(), approvers.clone())),
Expand Down Expand Up @@ -3258,6 +3325,7 @@ fn is_simple_modules(modules: &Vec<FlowModule>, flow: &FlowValue) -> bool {
&& modules[0].cache_ttl.is_none()
&& modules[0].retry.is_none()
&& modules[0].stop_after_if.is_none()
&& modules[0].stop_after_all_iters_if.is_none()
&& (modules[0].mock.is_none() || modules[0].mock.as_ref().is_some_and(|m| !m.enabled))
&& flow.failure_module.is_none();
is_simple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,12 @@
{#if !$selectedId.includes('failure')}
<Tab value="runtime">Runtime</Tab>
<Tab value="cache" active={Boolean(flowModule.cache_ttl)}>Cache</Tab>
<Tab value="early-stop" active={Boolean(flowModule.stop_after_if)}>
<Tab
value="early-stop"
active={Boolean(
flowModule.stop_after_if || flowModule.stop_after_all_iters_if
)}
>
Early Stop
</Tab>
<Tab value="suspend" active={Boolean(flowModule.suspend)}>Suspend</Tab>
Expand Down
Loading

0 comments on commit bcde2e6

Please sign in to comment.