diff --git a/backend/.sqlx/query-8263fe28097e094cbdbdcd16668d9347206d2deaa99f9a4541101e610f84a50a.json b/backend/.sqlx/query-8263fe28097e094cbdbdcd16668d9347206d2deaa99f9a4541101e610f84a50a.json deleted file mode 100644 index aa744f6f08cd4..0000000000000 --- a/backend/.sqlx/query-8263fe28097e094cbdbdcd16668d9347206d2deaa99f9a4541101e610f84a50a.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for) WHERE running = false", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "8263fe28097e094cbdbdcd16668d9347206d2deaa99f9a4541101e610f84a50a" -} diff --git a/backend/windmill-api/src/db.rs b/backend/windmill-api/src/db.rs index 106b29a81055b..a1e260b7d2167 100644 --- a/backend/windmill-api/src/db.rs +++ b/backend/windmill-api/src/db.rs @@ -660,9 +660,9 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> { .execute(db) .await?; - sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (tag, priority DESC NULLS LAST, scheduled_for) WHERE running = false") - .execute(db) - .await?; + // sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (tag, priority DESC NULLS LAST, scheduled_for) WHERE running = false") + // .execute(db) + // .await?; sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS queue_sort") .execute(db) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 6b0efa485d6cf..c678af5465997 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1800,13 +1800,13 @@ async fn list_jobs( } sqlc.unwrap().limit(per_page).offset(offset).query()? }; - let mut tx = user_db.begin(&authed).await?; + let mut tx: Transaction<'_, Postgres> = user_db.begin(&authed).await?; #[cfg(feature = "prometheus")] let start = Instant::now(); #[cfg(feature = "prometheus")] - if _api_list_jobs_query_duration.is_some() { + if _api_list_jobs_query_duration.is_some() || true { tracing::info!("list_jobs query: {}", sql); } @@ -4695,6 +4695,7 @@ struct BatchInfo { flow_value: Option, path: Option, rawscript: Option, + tag: Option, } #[tracing::instrument(level = "trace", skip_all)] @@ -4863,6 +4864,8 @@ async fn add_batch_jobs( } else { format!("{}", language.as_str()) } + } else if let Some(tag) = batch_info.tag { + tag } else { format!("{}", language.as_str()) }; diff --git a/benchmarks/benchmark_oneoff.ts b/benchmarks/benchmark_oneoff.ts index 41321b3ec66b7..ccfbfc09d9917 100644 --- a/benchmarks/benchmark_oneoff.ts +++ b/benchmarks/benchmark_oneoff.ts @@ -152,7 +152,6 @@ export async function main({ await createBenchScript(kind, workspace); } - pastJobs = await getCompletedJobsCount(); const jobsSent = jobs; console.log(`Bulk creating ${jobsSent} jobs`); @@ -208,11 +207,43 @@ export async function main({ throw new Error("Unknown script pattern " + kind); } - const response = await fetch( - config.server + + let testOtherTag = false; + const otherTagTodo = 500000; + if (testOtherTag) { + let parsed = JSON.parse(body); + parsed.tag = "test"; + let nbody = JSON.stringify(parsed); + let response2 = await fetch( + config.server + "/api/w/" + config.workspace_id + - `/jobs/add_batch_jobs/${jobsSent}`, + `/jobs/add_batch_jobs/${otherTagTodo}`, + { + method: "POST", + headers: { + ["Authorization"]: "Bearer " + config.token, + "Content-Type": "application/json", + }, + body: nbody, + } + ); + if (!response2.ok) { + throw new Error( + "Failed to create jobs: " + + response2.statusText + + " " + + (await response2.text()) + ); + } + } + + pastJobs = await getCompletedJobsCount(); + + const response = await fetch( + config.server + + "/api/w/" + + config.workspace_id + + `/jobs/add_batch_jobs/${jobsSent}`, { method: "POST", headers: { @@ -222,20 +253,24 @@ export async function main({ body, } ); + + + + + if (!response.ok) { throw new Error( "Failed to create jobs: " + - response.statusText + - " " + - (await response.text()) + response.statusText + + " " + + (await response.text()) ); } const uuids = await response.json(); const end_create = Date.now(); const create_duration = end_create - start_create; console.log( - `Jobs successfully added to the queue in ${ - create_duration / 1000 + `Jobs successfully added to the queue in ${create_duration / 1000 }s. Windmill will start pulling them\n` ); let start = Date.now(); @@ -249,7 +284,7 @@ export async function main({ const loopStart = Date.now(); if (!didStart) { const actual_queue = await getQueueCount(); - if (actual_queue < jobsSent) { + if (actual_queue < jobsSent + otherTagTodo) { start = Date.now(); didStart = true; } @@ -263,9 +298,9 @@ export async function main({ const instThr = lastElapsed > 0 ? ( - ((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) * - 1000 - ).toFixed(2) + ((completedJobs - lastCompletedJobs) / (elapsed - lastElapsed)) * + 1000 + ).toFixed(2) : 0; lastElapsed = elapsed; @@ -275,8 +310,7 @@ export async function main({ enc( `elapsed: ${(elapsed / 1000).toFixed( 2 - )} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${ - jobsSent - completedJobs + )} | jobs executed: ${completedJobs}/${jobsSent} (thr: inst ${instThr} - avg ${avgThr}) | remaining: ${jobsSent - completedJobs } \r` ) ); diff --git a/benchmarks/benchmark_suite.ts b/benchmarks/benchmark_suite.ts index e03148ce99934..7828c62d9b73d 100644 --- a/benchmarks/benchmark_suite.ts +++ b/benchmarks/benchmark_suite.ts @@ -27,7 +27,7 @@ async function warmUp( token, workspace, kind: "noop", - jobs: 50000, + jobs: 100000, }); }