Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track workspace runnables used in flows #5369

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE flow_workspace_runnables;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE flow_workspace_runnables (
flow_path VARCHAR(255) NOT NULL,
runnable_path VARCHAR(255) NOT NULL,
script_hash BIGINT NULL,
runnable_is_flow BOOLEAN NOT NULL,
workspace_id VARCHAR(50) NOT NULL,
FOREIGN KEY (workspace_id, flow_path) REFERENCES flow (workspace_id, path) ON DELETE CASCADE
);

CREATE UNIQUE INDEX flow_workspace_without_hash_unique_idx ON flow_workspace_runnables (flow_path, runnable_path, runnable_is_flow, workspace_id) WHERE script_hash IS NULL;
CREATE UNIQUE INDEX flow_workspace_with_hash_unique_idx ON flow_workspace_runnables (flow_path, runnable_path, script_hash, runnable_is_flow, workspace_id) WHERE script_hash IS NOT NULL;
CREATE INDEX flow_workspace_runnable_path_is_flow_idx ON flow_workspace_runnables (runnable_path, runnable_is_flow, workspace_id);
13 changes: 3 additions & 10 deletions backend/windmill-api/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::{
args::WebhookArgs,
db::{ApiAuthed, DB},
users::fetch_api_authed,
utils::RunnableKind,
};

const KEEP_LAST: i64 = 20;
Expand Down Expand Up @@ -139,7 +140,7 @@ pub struct KafkaTriggerConfig {
pub struct SqsTriggerConfig {
pub queue_url: String,
pub aws_resource_path: String,
pub message_attributes: Option<Vec<String>>
pub message_attributes: Option<Vec<String>>,
}

#[cfg(all(feature = "enterprise", feature = "nats"))]
Expand Down Expand Up @@ -300,8 +301,7 @@ async fn set_config(
#[cfg(feature = "postgres_trigger")]
let nc = if let TriggerKind::Postgres = nc.trigger_kind {
set_postgres_trigger_config(&w_id, authed.clone(), &db, user_db.clone(), nc).await?
}
else {
} else {
nc
};

Expand Down Expand Up @@ -362,13 +362,6 @@ struct Capture {
trigger_extra: Option<SqlxJson<Box<serde_json::value::RawValue>>>,
}

#[derive(Deserialize)]
#[serde(rename_all = "lowercase")]
enum RunnableKind {
Script,
Flow,
}

#[derive(Deserialize)]
struct ListCapturesQuery {
trigger_kind: Option<TriggerKind>,
Expand Down
36 changes: 35 additions & 1 deletion backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::db::ApiAuthed;
use crate::triggers::{
get_triggers_count_internal, list_tokens_internal, TriggersCount, TruncatedTokenWithEmail,
};
use crate::utils::WithStarredInfoQuery;
use crate::utils::{RunnableKind, WithStarredInfoQuery};
use crate::{
db::DB,
schedule::clear_schedule,
Expand Down Expand Up @@ -65,6 +65,10 @@ pub fn workspaced_service() -> Router {
.route("/list_paths", get(list_paths))
.route("/history/p/*path", get(get_flow_history))
.route("/get_latest_version/*path", get(get_latest_version))
.route(
"/list_paths_from_workspace_runnable/:runnable_kind/*path",
get(list_paths_from_workspace_runnable),
)
.route(
"/history_update/v/:version/p/*path",
post(update_flow_history),
Expand Down Expand Up @@ -324,6 +328,28 @@ async fn check_path_conflict<'c>(
return Ok(());
}

async fn list_paths_from_workspace_runnable(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Path((w_id, runnable_kind, path)): Path<(String, RunnableKind, StripPath)>,
) -> JsonResult<Vec<String>> {
let mut tx = user_db.begin(&authed).await?;
let runnables = sqlx::query_scalar!(
r#"SELECT f.path
FROM flow_workspace_runnables fwr
JOIN flow f
ON fwr.flow_path = f.path AND fwr.workspace_id = f.workspace_id
WHERE fwr.runnable_path = $1 AND fwr.runnable_is_flow = $2 AND fwr.workspace_id = $3"#,
path.to_path(),
matches!(runnable_kind, RunnableKind::Flow),
w_id
)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
Ok(Json(runnables))
}

async fn create_flow(
authed: ApiAuthed,
Extension(db): Extension<DB>,
Expand Down Expand Up @@ -769,6 +795,14 @@ async fn update_flow(
)
.execute(&mut *tx)
.await?;
} else {
sqlx::query!(
"DELETE FROM flow_workspace_runnables WHERE flow_path = $1 AND workspace_id = $2",
flow_path,
w_id
)
.execute(&mut *tx)
.await?;
}

let version = sqlx::query_scalar!(
Expand Down
16 changes: 16 additions & 0 deletions backend/windmill-api/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2478,6 +2478,22 @@ async fn update_username_in_workpsace<'c>(
.execute(&mut **tx)
.await?;

sqlx::query!(
r#"UPDATE flow_workspace_runnables SET flow_path = REGEXP_REPLACE(flow_path,'u/' || $2 || '/(.*)','u/' || $1 || '/\1') WHERE flow_path LIKE ('u/' || $2 || '/%') AND workspace_id = $3"#,
new_username,
old_username,
w_id
).execute(&mut **tx)
.await?;

sqlx::query!(
r#"UPDATE flow_workspace_runnables SET runnable_path = REGEXP_REPLACE(runnable_path,'u/' || $2 || '/(.*)','u/' || $1 || '/\1') WHERE runnable_path LIKE ('u/' || $2 || '/%') AND workspace_id = $3"#,
new_username,
old_username,
w_id
).execute(&mut **tx)
.await?;

sqlx::query!(
r#"UPDATE flow_node SET path = REGEXP_REPLACE(path,'u/' || $2 || '/(.*)','u/' || $1 || '/\1') WHERE path LIKE ('u/' || $2 || '/%') AND workspace_id = $3"#,
new_username,
Expand Down
7 changes: 7 additions & 0 deletions backend/windmill-api/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ pub struct WithStarredInfoQuery {
pub with_starred_info: Option<bool>,
}

#[derive(Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RunnableKind {
Script,
Flow,
}

pub async fn require_super_admin(db: &DB, email: &str) -> error::Result<()> {
let is_admin = is_super_admin_email(db, email).await?;

Expand Down
8 changes: 8 additions & 0 deletions backend/windmill-api/src/workspaces_extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ pub(crate) async fn change_workspace_id(
.execute(&mut *tx)
.await?;

sqlx::query!(
"UPDATE flow_workspace_runnables SET workspace_id = $1 WHERE workspace_id = $2",
&rw.new_id,
&old_id
)
.execute(&mut *tx)
.await?;

sqlx::query!(
"UPDATE flow_node SET workspace_id = $1 WHERE workspace_id = $2",
&rw.new_id,
Expand Down
21 changes: 21 additions & 0 deletions backend/windmill-worker/src/worker_lockfiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,27 @@ async fn lock_modules<'c>(
}
.into();
}
FlowModuleValue::Script { path, hash, .. } => {
sqlx::query!(
"INSERT INTO flow_workspace_runnables (flow_path, runnable_path, script_hash, runnable_is_flow, workspace_id) VALUES ($1, $2, $3, FALSE, $4) ON CONFLICT DO NOTHING",
job_path,
path,
hash.map(|h| h.0),
job.workspace_id
)
.execute(&mut *tx)
.await?;
}
FlowModuleValue::Flow { path, .. } => {
sqlx::query!(
"INSERT INTO flow_workspace_runnables (flow_path, runnable_path, runnable_is_flow, workspace_id) VALUES ($1, $2, TRUE, $3) ON CONFLICT DO NOTHING",
job_path,
path,
job.workspace_id
)
.execute(&mut *tx)
.await?;
}
_ => (),
};
modified_ids.extend(nmodified_ids);
Expand Down
Loading