Skip to content

Commit

Permalink
migration to remove unecessary triggers and fields after job v2
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 13, 2025
1 parent 4fa897f commit 029e2c7
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 89 deletions.
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
841642097f07cc1f765ef74059d66aae2eba2c1d
703a03ac430b6f603a5a189e5b4ff9a42bb2bd7f
178 changes: 90 additions & 88 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,27 +225,27 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
}
});

// if !has_done_migration(db, "v2_finalize_disable_sync").await {
// let db2 = db.clone();
// let _ = tokio::task::spawn(async move {
// loop {
// if !*MIN_VERSION_IS_AT_LEAST_1_461.read().await {
// tracing::info!("Waiting for all workers to be at least version 1.461 before applying v2 finalize migration, sleeping for 5s...");
// tokio::time::sleep(Duration::from_secs(5)).await;
// continue;
// }
// if let Err(err) = v2_finalize(&db2).await {
// tracing::error!(
// "{err:#}: Could not apply v2 finalize migration, retry in 30s.."
// );
// tokio::time::sleep(Duration::from_secs(30)).await;
// continue;
// }
// tracing::info!("v2 finalization step successfully applied.");
// break;
// }
// });
// }
if !has_done_migration(db, "v2_finalize_disable_sync_II").await {
let db2 = db.clone();
let _ = tokio::task::spawn(async move {
loop {
if !*MIN_VERSION_IS_AT_LEAST_1_461.read().await {
tracing::info!("Waiting for all workers to be at least version 1.461 before applying v2 finalize migration, sleeping for 5s...");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
if let Err(err) = v2_finalize(&db2).await {
tracing::error!(
"{err:#}: Could not apply v2 finalize migration, retry in 30s.."
);
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
tracing::info!("v2 finalization step successfully applied.");
break;
}
});
}

Ok(())
}
Expand Down Expand Up @@ -329,9 +329,12 @@ macro_rules! run_windmill_migration {
sqlx::migrate::MigrateError::Execute(e)
})?
.unwrap_or(false);

if !r {
tracing::info!("PG {migration_job_name} lock already acquired by another server or worker, retrying in 5s. (look for the advisory lock in pg_lock with granted = true)");
drop($tx);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
$tx = db.begin().await?;
}
}
tracing::info!("acquired lock for {migration_job_name}");
Expand Down Expand Up @@ -367,21 +370,21 @@ macro_rules! run_windmill_migration {
}

async fn v2_finalize(db: &DB) -> Result<(), Error> {
run_windmill_migration!("v2_finalize_disable_sync", db, |tx| {
run_windmill_migration!("v2_finalize_disable_sync_II", db, |tx| {
tx.execute(
r#"
DROP FUNCTION v2_job_after_update CASCADE;
DROP FUNCTION v2_job_completed_before_insert CASCADE;
DROP FUNCTION v2_job_completed_before_update CASCADE;
DROP FUNCTION v2_job_queue_after_insert CASCADE;
DROP FUNCTION v2_job_queue_before_insert CASCADE;
DROP FUNCTION v2_job_queue_before_update CASCADE;
DROP FUNCTION v2_job_runtime_before_insert CASCADE;
DROP FUNCTION v2_job_runtime_before_update CASCADE;
DROP FUNCTION v2_job_status_before_insert CASCADE;
DROP FUNCTION v2_job_status_before_update CASCADE;
DROP VIEW completed_job, completed_job_view, job, queue, queue_view CASCADE;
DROP FUNCTION IF EXISTS v2_job_after_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_completed_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_completed_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_after_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_queue_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_runtime_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_runtime_before_update CASCADE;
DROP FUNCTION IF EXISTS v2_job_status_before_insert CASCADE;
DROP FUNCTION IF EXISTS v2_job_status_before_update CASCADE;
DROP VIEW IF EXISTS completed_job, completed_job_view, job, queue, queue_view CASCADE;
"#,
)
.await?;
Expand All @@ -390,68 +393,67 @@ async fn v2_finalize(db: &DB) -> Result<(), Error> {
tx.execute(
r#"
ALTER TABLE v2_job_queue
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __last_ping CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __flow_status CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __same_worker CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __pre_run_error CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __mem_peak CASCADE,
DROP COLUMN __root_job CASCADE,
DROP COLUMN __leaf_jobs CASCADE,
DROP COLUMN __concurrent_limit CASCADE,
DROP COLUMN __concurrency_time_window_s CASCADE,
DROP COLUMN __timeout CASCADE,
DROP COLUMN __flow_step_id CASCADE,
DROP COLUMN __cache_ttl CASCADE;
DROP COLUMN IF EXISTS __parent_job CASCADE,
DROP COLUMN IF EXISTS __created_by CASCADE,
DROP COLUMN IF EXISTS __script_hash CASCADE,
DROP COLUMN IF EXISTS __script_path CASCADE,
DROP COLUMN IF EXISTS __args CASCADE,
DROP COLUMN IF EXISTS __logs CASCADE,
DROP COLUMN IF EXISTS __raw_code CASCADE,
DROP COLUMN IF EXISTS __canceled CASCADE,
DROP COLUMN IF EXISTS __last_ping CASCADE,
DROP COLUMN IF EXISTS __job_kind CASCADE,
DROP COLUMN IF EXISTS __env_id CASCADE,
DROP COLUMN IF EXISTS __schedule_path CASCADE,
DROP COLUMN IF EXISTS __permissioned_as CASCADE,
DROP COLUMN IF EXISTS __flow_status CASCADE,
DROP COLUMN IF EXISTS __raw_flow CASCADE,
DROP COLUMN IF EXISTS __is_flow_step CASCADE,
DROP COLUMN IF EXISTS __language CASCADE,
DROP COLUMN IF EXISTS __same_worker CASCADE,
DROP COLUMN IF EXISTS __raw_lock CASCADE,
DROP COLUMN IF EXISTS __pre_run_error CASCADE,
DROP COLUMN IF EXISTS __email CASCADE,
DROP COLUMN IF EXISTS __visible_to_owner CASCADE,
DROP COLUMN IF EXISTS __mem_peak CASCADE,
DROP COLUMN IF EXISTS __root_job CASCADE,
DROP COLUMN IF EXISTS __leaf_jobs CASCADE,
DROP COLUMN IF EXISTS __concurrent_limit CASCADE,
DROP COLUMN IF EXISTS __concurrency_time_window_s CASCADE,
DROP COLUMN IF EXISTS __timeout CASCADE,
DROP COLUMN IF EXISTS __flow_step_id CASCADE,
DROP COLUMN IF EXISTS __cache_ttl CASCADE;
"#,
)
.await?;
});
run_windmill_migration!("v2_finalize_job_completed", db, |tx| {
tx.execute(
r#"
LOCK TABLE v2_job_queue IN ACCESS EXCLUSIVE MODE;
ALTER TABLE v2_job_completed
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __created_at CASCADE,
DROP COLUMN __success CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __is_skipped CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __tag CASCADE,
DROP COLUMN __priority CASCADE;
DROP COLUMN IF EXISTS __parent_job CASCADE,
DROP COLUMN IF EXISTS __created_by CASCADE,
DROP COLUMN IF EXISTS __created_at CASCADE,
DROP COLUMN IF EXISTS __success CASCADE,
DROP COLUMN IF EXISTS __script_hash CASCADE,
DROP COLUMN IF EXISTS __script_path CASCADE,
DROP COLUMN IF EXISTS __args CASCADE,
DROP COLUMN IF EXISTS __logs CASCADE,
DROP COLUMN IF EXISTS __raw_code CASCADE,
DROP COLUMN IF EXISTS __canceled CASCADE,
DROP COLUMN IF EXISTS __job_kind CASCADE,
DROP COLUMN IF EXISTS __env_id CASCADE,
DROP COLUMN IF EXISTS __schedule_path CASCADE,
DROP COLUMN IF EXISTS __permissioned_as CASCADE,
DROP COLUMN IF EXISTS __raw_flow CASCADE,
DROP COLUMN IF EXISTS __is_flow_step CASCADE,
DROP COLUMN IF EXISTS __language CASCADE,
DROP COLUMN IF EXISTS __is_skipped CASCADE,
DROP COLUMN IF EXISTS __raw_lock CASCADE,
DROP COLUMN IF EXISTS __email CASCADE,
DROP COLUMN IF EXISTS __visible_to_owner CASCADE,
DROP COLUMN IF EXISTS __tag CASCADE,
DROP COLUMN IF EXISTS __priority CASCADE;
"#,
)
.await?;
Expand Down

0 comments on commit 029e2c7

Please sign in to comment.