Skip to content

Commit

Permalink
fix: improve que job indices for faster performances
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 17, 2025
1 parent 5e22690 commit 85c56e9
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 33 deletions.

This file was deleted.

6 changes: 3 additions & 3 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,9 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.execute(db)
.await?;

sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (tag, priority DESC NULLS LAST, scheduled_for) WHERE running = false")
.execute(db)
.await?;
// sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (tag, priority DESC NULLS LAST, scheduled_for) WHERE running = false")
// .execute(db)
// .await?;

sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS queue_sort")
.execute(db)
Expand Down
7 changes: 5 additions & 2 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1800,13 +1800,13 @@ async fn list_jobs(
}
sqlc.unwrap().limit(per_page).offset(offset).query()?
};
let mut tx = user_db.begin(&authed).await?;
let mut tx: Transaction<'_, Postgres> = user_db.begin(&authed).await?;

#[cfg(feature = "prometheus")]
let start = Instant::now();

#[cfg(feature = "prometheus")]
if _api_list_jobs_query_duration.is_some() {
if _api_list_jobs_query_duration.is_some() || true {
tracing::info!("list_jobs query: {}", sql);
}

Expand Down Expand Up @@ -4695,6 +4695,7 @@ struct BatchInfo {
flow_value: Option<FlowValue>,
path: Option<String>,
rawscript: Option<BatchRawScript>,
tag: Option<String>,
}

#[tracing::instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -4863,6 +4864,8 @@ async fn add_batch_jobs(
} else {
format!("{}", language.as_str())
}
} else if let Some(tag) = batch_info.tag {
tag
} else {
format!("{}", language.as_str())
};
Expand Down
64 changes: 49 additions & 15 deletions benchmarks/benchmark_oneoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ export async function main({
await createBenchScript(kind, workspace);
}

pastJobs = await getCompletedJobsCount();

const jobsSent = jobs;
console.log(`Bulk creating ${jobsSent} jobs`);
Expand Down Expand Up @@ -208,11 +207,43 @@ export async function main({
throw new Error("Unknown script pattern " + kind);
}

const response = await fetch(
config.server +
let testOtherTag = false;
const otherTagTodo = 500000;
if (testOtherTag) {
let parsed = JSON.parse(body);
parsed.tag = "test";
let nbody = JSON.stringify(parsed);
let response2 = await fetch(
config.server +
"/api/w/" +
config.workspace_id +
`/jobs/add_batch_jobs/${jobsSent}`,
`/jobs/add_batch_jobs/${otherTagTodo}`,
{
method: "POST",
headers: {
["Authorization"]: "Bearer " + config.token,
"Content-Type": "application/json",
},
body: nbody,
}
);
if (!response2.ok) {
throw new Error(
"Failed to create jobs: " +
response2.statusText +
" " +
(await response2.text())
);
}
}

pastJobs = await getCompletedJobsCount();

const response = await fetch(
config.server +
"/api/w/" +
config.workspace_id +
`/jobs/add_batch_jobs/${jobsSent}`,
{
method: "POST",
headers: {
Expand All @@ -222,20 +253,24 @@ export async function main({
body,
}
);





if (!response.ok) {
throw new Error(
"Failed to create jobs: " +
response.statusText +
" " +
(await response.text())
response.statusText +
" " +
(await response.text())
);
}
const uuids = await response.json();
const end_create = Date.now();
const create_duration = end_create - start_create;
console.log(
`Jobs successfully added to the queue in ${
create_duration / 1000
`Jobs successfully added to the queue in ${create_duration / 1000
}s. Windmill will start pulling them\n`
);
let start = Date.now();
Expand All @@ -249,7 +284,7 @@ export async function main({
const loopStart = Date.now();
if (!didStart) {
const actual_queue = await getQueueCount();
if (actual_queue < jobsSent) {
if (actual_queue < jobsSent + otherTagTodo) {
start = Date.now();
didStart = true;
}
Expand All @@ -263,9 +298,9 @@ export async function main({
const instThr =
lastElapsed > 0
? (
((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) *
1000
).toFixed(2)
((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) *
1000
).toFixed(2)
: 0;

lastElapsed = elapsed;
Expand All @@ -275,8 +310,7 @@ export async function main({
enc(
`elapsed: ${(elapsed / 1000).toFixed(
2
)} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${
jobsSent - completedJobs
)} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${jobsSent - completedJobs
} \r`
)
);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function warmUp(
token,
workspace,
kind: "noop",
jobs: 50000,
jobs: 100000,
});
}

Expand Down

0 comments on commit 85c56e9

Please sign in to comment.