Skip to content

Commit

Permalink
fix: improve que job indices for faster performances
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 17, 2025
1 parent e0d7a54 commit 9530826
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 32 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,19 +1647,24 @@ struct QueueStats {
#[derive(Deserialize)]
pub struct CountQueueJobsQuery {
all_workspaces: Option<bool>,
tags: Option<String>,
}

async fn count_queue_jobs(
Extension(db): Extension<DB>,
Path(w_id): Path<String>,
Query(cq): Query<CountQueueJobsQuery>,
) -> error::JsonResult<QueueStats> {
let tags = cq
.tags
.map(|t| t.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
Ok(Json(
sqlx::query_as!(
QueueStats,
"SELECT coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" FROM v2_as_queue WHERE (workspace_id = $1 OR $2) AND scheduled_for <= now()",
"SELECT coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" FROM v2_as_queue WHERE (workspace_id = $1 OR $2) AND scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3))",
w_id,
w_id == "admins" && cq.all_workspaces.unwrap_or(false),
tags.as_ref().map(|v| v.as_slice())
)
.fetch_one(&db)
.await?,
Expand Down
6 changes: 3 additions & 3 deletions backend/windmill-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ pub async fn connect(
// .after_connect(move |conn, _| {
// if worker_mode {
// Box::pin(async move {
// sqlx::query("SET enable_seqscan = OFF;")
// .execute(conn)
// .await?;
// // sqlx::query("SET enable_seqscan = OFF;")
// // .execute(conn)
// // .await?;
// Ok(())
// })
// } else {
Expand Down
3 changes: 3 additions & 0 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,12 +2108,15 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(

for query in queries.iter() {
// tracing::info!("Pulling job with query: {}", query);
// let instant = std::time::Instant::now();
let r = sqlx::query_as::<_, PulledJob>(query)
.bind(worker_name)
.fetch_optional(db)
.await?;

if let Some(pulled_job) = r {
// tracing::info!("pulled job: {:?}", instant.elapsed().as_micros());

highest_priority_job = Some(pulled_job);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ pub async fn run_worker(
tokio::task::spawn(
(async move {
tracing::info!(worker = %worker_name, hostname = %hostname, "vacuuming queue");
if let Err(e) = sqlx::query!("VACUUM (skip_locked) v2_job_queue, v2_job_runtime, v2_job_status")
if let Err(e) = sqlx::query!("VACUUM v2_job_queue, v2_job_runtime, v2_job_status")
.execute(&db2)
.await
{
Expand Down
22 changes: 12 additions & 10 deletions benchmarks/benchmark_oneoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async function verifyOutputs(uuids: string[], workspace: string) {
console.log(`Incorrect results: ${incorrectResults}`);
}

export const NON_TEST_TAGS = ["deno", "python", "go", "bash", "dedicated", "bun", "nativets", "flow"]
export async function main({
host,
email,
Expand Down Expand Up @@ -96,11 +97,11 @@ export async function main({
windmill.setClient(final_token, host);
const enc = (s: string) => new TextEncoder().encode(s);

async function getQueueCount() {
async function getQueueCount(tags?: string[]) {
return (
await (
await fetch(
config.server + "/api/w/" + config.workspace_id + "/jobs/queue/count",
config.server + "/api/w/" + config.workspace_id + "/jobs/queue/count" + (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""),
{ headers: { ["Authorization"]: "Bearer " + config.token } }
)
).json()
Expand Down Expand Up @@ -132,11 +133,11 @@ export async function main({
}

let pastJobs = 0;
async function getCompletedJobsCount(): Promise<number> {
async function getCompletedJobsCount(tags?: string[]): Promise<number> {
const completedJobs = (
await (
await fetch(
host + "/api/w/" + config.workspace_id + "/jobs/completed/count",
host + "/api/w/" + config.workspace_id + "/jobs/completed/count" + (tags && tags.length > 0 ? "?tags=" + tags.join(",") : ""),
{ headers: { ["Authorization"]: "Bearer " + config.token } }
)
).json()
Expand Down Expand Up @@ -208,8 +209,9 @@ export async function main({
}

let testOtherTag = false;
const otherTagTodo = 500000;
if (testOtherTag) {
const otherTagTodo = 2000000;

let parsed = JSON.parse(body);
parsed.tag = "test";
let nbody = JSON.stringify(parsed);
Expand Down Expand Up @@ -237,7 +239,7 @@ export async function main({
}
}

pastJobs = await getCompletedJobsCount();
pastJobs = await getCompletedJobsCount(NON_TEST_TAGS);

const response = await fetch(
config.server +
Expand Down Expand Up @@ -283,14 +285,14 @@ export async function main({
while (completedJobs < jobsSent) {
const loopStart = Date.now();
if (!didStart) {
const actual_queue = await getQueueCount();
if (actual_queue < jobsSent + (testOtherTag ? otherTagTodo : 0)) {
const actual_queue = await getQueueCount(NON_TEST_TAGS);
if (actual_queue < jobsSent) {
start = Date.now();
didStart = true;
}
} else {
const elapsed = start ? Date.now() - start : 0;
completedJobs = await getCompletedJobsCount();
completedJobs = await getCompletedJobsCount(NON_TEST_TAGS);
if (nStepsFlow > 0) {
completedJobs = Math.floor(completedJobs / (nStepsFlow + 1));
}
Expand Down Expand Up @@ -328,7 +330,7 @@ export async function main({
console.log(`avg. throughput (jobs/time): ${jobsSent / total_duration_sec}`);

console.log("completed jobs", completedJobs);
console.log("queue length:", await getQueueCount());
console.log("queue length:", await getQueueCount(NON_TEST_TAGS));

if (
!noVerify &&
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function warmUp(
token,
workspace,
kind: "noop",
jobs: 100000,
jobs: 50000,
});
}

Expand Down

0 comments on commit 9530826

Please sign in to comment.