From 1069ad39992940e32e5d8566ef2283970525be1a Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Tue, 18 Feb 2025 13:17:10 +0100 Subject: [PATCH] fix: improve v2 migration finalizer to avoid deadlocks --- backend/src/main.rs | 13 ++++- backend/windmill-api/src/db.rs | 84 +++++++++++++++++++++++++++++---- backend/windmill-api/src/lib.rs | 8 ++-- 3 files changed, 91 insertions(+), 14 deletions(-) diff --git a/backend/src/main.rs b/backend/src/main.rs index 432095580d931..58d22b6eee5e0 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -21,7 +21,7 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, }; -use tokio::{fs::File, io::AsyncReadExt}; +use tokio::{fs::File, io::AsyncReadExt, task::JoinHandle}; use uuid::Uuid; use windmill_api::HTTP_CLIENT; @@ -372,6 +372,7 @@ async fn windmill_main() -> anyhow::Result<()> { let is_agent = mode == Mode::Agent; + let mut migration_handle: Option> = None; #[cfg(feature = "parquet")] let disable_s3_store = std::env::var("DISABLE_S3_STORE") .ok() @@ -384,7 +385,7 @@ async fn windmill_main() -> anyhow::Result<()> { if !skip_migration { // migration code to avoid break - windmill_api::migrate_db(&db).await?; + migration_handle = windmill_api::migrate_db(&db).await?; } else { tracing::info!("SKIP_MIGRATION set, skipping db migration...") } @@ -682,6 +683,14 @@ Windmill Community Edition {GIT_VERSION} loop { tokio::select! { biased; + Some(_) = async { if let Some(jh) = migration_handle.take() { + tracing::info!("migration job finished"); + Some(jh.await) + } else { + None + }} => { + continue; + }, _ = monitor_killpill_rx.recv() => { tracing::info!("received killpill for monitor job"); break; diff --git a/backend/windmill-api/src/db.rs b/backend/windmill-api/src/db.rs index b8d15160c7dab..9b694ad3bfcc9 100644 --- a/backend/windmill-api/src/db.rs +++ b/backend/windmill-api/src/db.rs @@ -15,6 +15,7 @@ use sqlx::{ Executor, PgConnection, Pool, Postgres, }; +use tokio::task::JoinHandle; use windmill_audit::audit_ee::{AuditAuthor, AuditAuthorable}; use windmill_common::{ db::{Authable, Authed}, @@ -170,7 +171,7 @@ impl Migrate for CustomMigrator { } } -pub async fn migrate(db: &DB) -> Result<(), Error> { +pub async fn migrate(db: &DB) -> Result>, Error> { let migrator = db.acquire().await?; let mut custom_migrator = CustomMigrator { inner: migrator }; @@ -225,9 +226,10 @@ pub async fn migrate(db: &DB) -> Result<(), Error> { } }); - if !has_done_migration(db, "v2_finalize_disable_sync_III").await { + let mut jh = None; + if !has_done_migration(db, "v2_finalize_job_completed").await { let db2 = db.clone(); - let _ = tokio::task::spawn(async move { + let v2jh = tokio::task::spawn(async move { loop { if !*MIN_VERSION_IS_AT_LEAST_1_461.read().await { tracing::info!("Waiting for all workers to be at least version 1.461 before applying v2 finalize migration, sleeping for 5s..."); @@ -245,9 +247,10 @@ pub async fn migrate(db: &DB) -> Result<(), Error> { break; } }); + jh = Some(v2jh) } - Ok(()) + Ok(jh) } async fn fix_flow_versioning_migration( @@ -373,29 +376,91 @@ async fn v2_finalize(db: &DB) -> Result<(), Error> { run_windmill_migration!("v2_finalize_disable_sync_III", db, |tx| { tx.execute( r#" + LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE; ALTER TABLE v2_job_queue DISABLE ROW LEVEL SECURITY; + "#, + ) + .await?; + }); + + run_windmill_migration!("v2_finalize_disable_sync_III_2", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE; ALTER TABLE v2_job_completed DISABLE ROW LEVEL SECURITY; + "#, + ) + .await?; + }); + run_windmill_migration!("v2_finalize_disable_sync_III_3", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job IN ACCESS EXCLUSIVE MODE; DROP FUNCTION IF EXISTS v2_job_after_update CASCADE; + "#, + ) + .await?; + }); + + run_windmill_migration!("v2_finalize_disable_sync_III_4", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE; DROP FUNCTION IF EXISTS v2_job_completed_before_insert CASCADE; - DROP FUNCTION IF EXISTS v2_job_completed_before_update CASCADE; + DROP FUNCTION IF EXISTS v2_job_completed_before_update CASCADE; + "#, + ) + .await?; + }); + + run_windmill_migration!("v2_finalize_disable_sync_III_5", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE; DROP FUNCTION IF EXISTS v2_job_queue_after_insert CASCADE; DROP FUNCTION IF EXISTS v2_job_queue_before_insert CASCADE; - DROP FUNCTION IF EXISTS v2_job_queue_before_update CASCADE; + DROP FUNCTION IF EXISTS v2_job_queue_before_update CASCADE; + "#, + ) + .await?; + }); + + run_windmill_migration!("v2_finalize_disable_sync_III_6", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job_runtime IN ACCESS EXCLUSIVE MODE; DROP FUNCTION IF EXISTS v2_job_runtime_before_insert CASCADE; - DROP FUNCTION IF EXISTS v2_job_runtime_before_update CASCADE; + DROP FUNCTION IF EXISTS v2_job_runtime_before_update CASCADE; + "#, + ) + .await?; + }); + + run_windmill_migration!("v2_finalize_disable_sync_III_7", db, |tx| { + tx.execute( + r#" + LOCK TABLE v2_job_status IN ACCESS EXCLUSIVE MODE; DROP FUNCTION IF EXISTS v2_job_status_before_insert CASCADE; - DROP FUNCTION IF EXISTS v2_job_status_before_update CASCADE; + DROP FUNCTION IF EXISTS v2_job_status_before_update CASCADE; + "#, + ) + .await?; + }); + run_windmill_migration!("v2_finalize_disable_sync_III_8", db, |tx| { + tx.execute( + r#" DROP VIEW IF EXISTS completed_job, completed_job_view, job, queue, queue_view CASCADE; - "#, ) .await?; }); + run_windmill_migration!("v2_finalize_job_queue", db, |tx| { tx.execute( r#" + LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE; ALTER TABLE v2_job_queue DROP COLUMN IF EXISTS __parent_job CASCADE, DROP COLUMN IF EXISTS __created_by CASCADE, @@ -434,6 +499,7 @@ async fn v2_finalize(db: &DB) -> Result<(), Error> { run_windmill_migration!("v2_finalize_job_completed", db, |tx| { tx.execute( r#" + LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE; ALTER TABLE v2_job_completed DROP COLUMN IF EXISTS __parent_job CASCADE, DROP COLUMN IF EXISTS __created_by CASCADE, diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index b4c262b9eb87c..4199494ed4f32 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -34,6 +34,7 @@ use http::HeaderValue; use reqwest::Client; #[cfg(feature = "oauth2")] use std::collections::HashMap; +use tokio::task::JoinHandle; use windmill_common::global_settings::load_value_from_global_settings; use windmill_common::global_settings::EMAIL_DOMAIN_SETTING; use windmill_common::worker::HUB_CACHE_DIR; @@ -641,7 +642,8 @@ async fn openapi_json() -> &'static str { include_str!("../openapi-deref.json") } -pub async fn migrate_db(db: &DB) -> anyhow::Result<()> { - db::migrate(db).await?; - Ok(()) +pub async fn migrate_db(db: &DB) -> anyhow::Result>> { + db::migrate(db) + .await + .map_err(|e| anyhow::anyhow!("Error migrating db: {e:#}")) }