Skip to content

Commit

Permalink
fix: improve v2 migration finalizer to avoid deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 18, 2025
1 parent b4088fa commit 1069ad3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 14 deletions.
13 changes: 11 additions & 2 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use tokio::{fs::File, io::AsyncReadExt};
use tokio::{fs::File, io::AsyncReadExt, task::JoinHandle};
use uuid::Uuid;
use windmill_api::HTTP_CLIENT;

Expand Down Expand Up @@ -372,6 +372,7 @@ async fn windmill_main() -> anyhow::Result<()> {

let is_agent = mode == Mode::Agent;

let mut migration_handle: Option<JoinHandle<()>> = None;
#[cfg(feature = "parquet")]
let disable_s3_store = std::env::var("DISABLE_S3_STORE")
.ok()
Expand All @@ -384,7 +385,7 @@ async fn windmill_main() -> anyhow::Result<()> {

if !skip_migration {
// migration code to avoid break
windmill_api::migrate_db(&db).await?;
migration_handle = windmill_api::migrate_db(&db).await?;
} else {
tracing::info!("SKIP_MIGRATION set, skipping db migration...")
}
Expand Down Expand Up @@ -682,6 +683,14 @@ Windmill Community Edition {GIT_VERSION}
loop {
tokio::select! {
biased;
Some(_) = async { if let Some(jh) = migration_handle.take() {
tracing::info!("migration job finished");
Some(jh.await)
} else {
None
}} => {
continue;
},
_ = monitor_killpill_rx.recv() => {
tracing::info!("received killpill for monitor job");
break;
Expand Down
84 changes: 75 additions & 9 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use sqlx::{
Executor, PgConnection, Pool, Postgres,
};

use tokio::task::JoinHandle;
use windmill_audit::audit_ee::{AuditAuthor, AuditAuthorable};
use windmill_common::{
db::{Authable, Authed},
Expand Down Expand Up @@ -170,7 +171,7 @@ impl Migrate for CustomMigrator {
}
}

pub async fn migrate(db: &DB) -> Result<(), Error> {
pub async fn migrate(db: &DB) -> Result<Option<JoinHandle<()>>, Error> {
let migrator = db.acquire().await?;
let mut custom_migrator = CustomMigrator { inner: migrator };

Expand Down Expand Up @@ -225,9 +226,10 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
}
});

if !has_done_migration(db, "v2_finalize_disable_sync_III").await {
let mut jh = None;
if !has_done_migration(db, "v2_finalize_job_completed").await {
let db2 = db.clone();
let _ = tokio::task::spawn(async move {
let v2jh = tokio::task::spawn(async move {
loop {
if !*MIN_VERSION_IS_AT_LEAST_1_461.read().await {
tracing::info!("Waiting for all workers to be at least version 1.461 before applying v2 finalize migration, sleeping for 5s...");
Expand All @@ -245,9 +247,10 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
break;
}
});
jh = Some(v2jh)
}

Ok(())
Ok(jh)
}

async fn fix_flow_versioning_migration(
Expand Down Expand Up @@ -373,29 +376,91 @@ async fn v2_finalize(db: &DB) -> Result<(), Error> {
run_windmill_migration!("v2_finalize_disable_sync_III", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE;
ALTER TABLE v2_job_queue DISABLE ROW LEVEL SECURITY;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_2", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE;
ALTER TABLE v2_job_completed DISABLE ROW LEVEL SECURITY;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_3", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job IN ACCESS EXCLUSIVE MODE;
DROP FUNCTION IF EXISTS v2_job_after_update CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_4", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE;
DROP FUNCTION IF EXISTS v2_job_completed_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_completed_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_completed_before_update CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_5", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE;
DROP FUNCTION IF EXISTS v2_job_queue_after_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_update CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_6", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_runtime IN ACCESS EXCLUSIVE MODE;
DROP FUNCTION IF EXISTS v2_job_runtime_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_runtime_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_runtime_before_update CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_7", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_status IN ACCESS EXCLUSIVE MODE;
DROP FUNCTION IF EXISTS v2_job_status_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_status_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_status_before_update CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_disable_sync_III_8", db, |tx| {
tx.execute(
r#"
DROP VIEW IF EXISTS completed_job, completed_job_view, job, queue, queue_view CASCADE;
"#,
)
.await?;
});

run_windmill_migration!("v2_finalize_job_queue", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE;
ALTER TABLE v2_job_queue
DROP COLUMN IF EXISTS __parent_job CASCADE,
DROP COLUMN IF EXISTS __created_by CASCADE,
Expand Down Expand Up @@ -434,6 +499,7 @@ async fn v2_finalize(db: &DB) -> Result<(), Error> {
run_windmill_migration!("v2_finalize_job_completed", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE;
ALTER TABLE v2_job_completed
DROP COLUMN IF EXISTS __parent_job CASCADE,
DROP COLUMN IF EXISTS __created_by CASCADE,
Expand Down
8 changes: 5 additions & 3 deletions backend/windmill-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use http::HeaderValue;
use reqwest::Client;
#[cfg(feature = "oauth2")]
use std::collections::HashMap;
use tokio::task::JoinHandle;
use windmill_common::global_settings::load_value_from_global_settings;
use windmill_common::global_settings::EMAIL_DOMAIN_SETTING;
use windmill_common::worker::HUB_CACHE_DIR;
Expand Down Expand Up @@ -641,7 +642,8 @@ async fn openapi_json() -> &'static str {
include_str!("../openapi-deref.json")
}

pub async fn migrate_db(db: &DB) -> anyhow::Result<()> {
db::migrate(db).await?;
Ok(())
pub async fn migrate_db(db: &DB) -> anyhow::Result<Option<JoinHandle<()>>> {
db::migrate(db)
.await
.map_err(|e| anyhow::anyhow!("Error migrating db: {e:#}"))
}

0 comments on commit 1069ad3

Please sign in to comment.