Skip to content

Commit

Permalink
fix: improve que job indices for faster performances
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 17, 2025
1 parent fe33729 commit 5cdc1b8
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 14 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,23 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("v2_improve_v2_queued_jobs_indices", &db, |tx| {
sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for, tag) WHERE running = false")
.execute(db)
.await?;

sqlx::query!("CREATE INDEX CONCURRENTLY queue_sort_2_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for) WHERE running = false")
.execute(db)
.await?;

sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS queue_sort")
.execute(db)
.await?;

sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS queue_sort_2")
.execute(db)
.await?;
});
Ok(())
}

Expand Down
26 changes: 13 additions & 13 deletions backend/windmill-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,26 +281,26 @@ pub async fn connect_db(
pub async fn connect(
database_url: &str,
max_connections: u32,
worker_mode: bool,
_worker_mode: bool,
) -> Result<sqlx::Pool<sqlx::Postgres>, error::Error> {
use std::time::Duration;

sqlx::postgres::PgPoolOptions::new()
.min_connections((max_connections / 5).clamp(3, max_connections))
.max_connections(max_connections)
.max_lifetime(Duration::from_secs(30 * 60)) // 30 mins
.after_connect(move |conn, _| {
if worker_mode {
Box::pin(async move {
sqlx::query("SET enable_seqscan = OFF;")
.execute(conn)
.await?;
Ok(())
})
} else {
Box::pin(async move { Ok(()) })
}
})
// .after_connect(move |conn, _| {
// if worker_mode {
// Box::pin(async move {
// sqlx::query("SET enable_seqscan = OFF;")
// .execute(conn)
// .await?;
// Ok(())
// })
// } else {
// Box::pin(async move { Ok(()) })
// }
// })
.connect_with(
sqlx::postgres::PgConnectOptions::from_str(database_url)?.statement_cache_capacity(400),
)
Expand Down

0 comments on commit 5cdc1b8

Please sign in to comment.