Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tag filtering to external JWT authentication #4425

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e093d51a219ce4ff0562a02db24ec402554a1f05
3d37b6c31155265d8d026ae9d6ced0b433078f87
2 changes: 1 addition & 1 deletion backend/windmill-api/src/concurrency_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn get_concurrent_intervals(
Query(iq): Query<ExtendedJobsParams>,
Query(lq): Query<ListCompletedQuery>,
) -> JsonResult<ExtendedJobs> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

if lq.success.is_some() && lq.running.is_some_and(|x| x) {
return Err(error::Error::BadRequest(
Expand Down
37 changes: 28 additions & 9 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::add_webhook_allowed_origin;
use crate::concurrency_groups::join_concurrency_key;
use crate::db::ApiAuthed;

use crate::users::get_scope_tags;
use crate::utils::content_plain;
use crate::{
db::DB,
Expand Down Expand Up @@ -1248,13 +1249,18 @@ pub fn list_queue_jobs_query(
lq: &ListQueueQuery,
fields: &[&str],
join_outstanding_wait_times: bool,
tags: Option<Vec<&str>>,
) -> SqlBuilder {
let sqlb = SqlBuilder::select_from("queue")
let mut sqlb = SqlBuilder::select_from("queue")
.fields(fields)
.order_by("created_at", lq.order_desc.unwrap_or(true))
.limit(1000)
.clone();

if let Some(tags) = tags {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

filter_list_queue_query(sqlb, lq, w_id, join_outstanding_wait_times)
}

Expand Down Expand Up @@ -1310,6 +1316,7 @@ async fn list_queue_jobs(
"workspace_id",
],
false,
get_scope_tags(&authed),
)
.sql()?;
let mut tx = user_db.begin(&authed).await?;
Expand Down Expand Up @@ -1498,6 +1505,10 @@ async fn list_filtered_uuids(

sqlb.and_where_is_null("schedule_path");

if let Some(tags) = get_scope_tags(&authed) {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

sqlb = filter_list_queue_query(sqlb, &lq, w_id.as_str(), false);

let sql = sqlb.query()?;
Expand Down Expand Up @@ -1557,7 +1568,7 @@ async fn list_jobs(
Query(lq): Query<ListCompletedQuery>,
Extension(_api_list_jobs_query_duration): Extension<Option<Histo>>,
) -> error::JsonResult<Vec<Job>> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

let (per_page, offset) = paginate(pagination);
let lqc = lq.clone();
Expand All @@ -1575,6 +1586,7 @@ async fn list_jobs(
&ListCompletedQuery { order_desc: Some(true), ..lqc },
UnifiedJob::completed_job_fields(),
true,
get_scope_tags(&authed),
))
} else {
None
Expand All @@ -1590,6 +1602,7 @@ async fn list_jobs(
&ListQueueQuery { order_desc: Some(true), ..lq.into() },
UnifiedJob::queued_job_fields(),
true,
get_scope_tags(&authed),
);

if let Some(sqlc) = sqlc {
Expand Down Expand Up @@ -1640,7 +1653,7 @@ pub async fn resume_suspended_flow_as_owner(
Path((_w_id, flow_id)): Path<(String, Uuid)>,
QueryOrBody(value): QueryOrBody<serde_json::Value>,
) -> error::Result<StatusCode> {
check_scopes(&authed, || format!("resumeflow"))?;
check_scopes(&authed, || format!("jobs:resumeflow"))?;
let value = value.unwrap_or(serde_json::Value::Null);
let mut tx = db.begin().await?;

Expand Down Expand Up @@ -3847,7 +3860,7 @@ async fn run_preview_script(
#[cfg(feature = "enterprise")]
check_license_key_valid().await?;

check_scopes(&authed, || format!("runscript"))?;
check_scopes(&authed, || format!("jobs:runscript"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -3917,7 +3930,7 @@ async fn run_bundle_preview_script(

check_license_key_valid().await?;

check_scopes(&authed, || format!("runscript"))?;
check_scopes(&authed, || format!("jobs:runscript"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -4403,7 +4416,7 @@ async fn run_preview_flow_job(
Query(run_query): Query<RunJobQuery>,
Json(raw_flow): Json<PreviewFlow>,
) -> error::Result<(StatusCode, String)> {
check_scopes(&authed, || format!("runflow"))?;
check_scopes(&authed, || format!("jobs:runflow"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -4843,14 +4856,19 @@ pub fn list_completed_jobs_query(
lq: &ListCompletedQuery,
fields: &[&str],
join_outstanding_wait_times: bool,
tags: Option<Vec<&str>>,
) -> SqlBuilder {
let sqlb = SqlBuilder::select_from("completed_job")
let mut sqlb = SqlBuilder::select_from("completed_job")
.fields(fields)
.order_by("created_at", lq.order_desc.unwrap_or(true))
.offset(offset)
.limit(per_page)
.clone();

if let Some(tags) = tags {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

filter_list_completed_query(sqlb, lq, w_id, join_outstanding_wait_times)
}
#[derive(Deserialize, Clone)]
Expand Down Expand Up @@ -4895,7 +4913,7 @@ async fn list_completed_jobs(
Query(pagination): Query<Pagination>,
Query(lq): Query<ListCompletedQuery>,
) -> error::JsonResult<Vec<ListableCompletedJob>> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

let (per_page, offset) = paginate(pagination);

Expand Down Expand Up @@ -4937,6 +4955,7 @@ async fn list_completed_jobs(
"'CompletedJob' as type",
],
false,
get_scope_tags(&authed),
)
.sql()?;
let mut tx = user_db.begin(&authed).await?;
Expand Down Expand Up @@ -5161,7 +5180,7 @@ async fn delete_completed_job<'a>(
Extension(user_db): Extension<UserDB>,
Path((w_id, id)): Path<(String, Uuid)>,
) -> error::Result<Response> {
check_scopes(&authed, || format!("deletejob"))?;
check_scopes(&authed, || format!("jobs:deletejob"))?;

let mut tx = user_db.begin(&authed).await?;

Expand Down
28 changes: 23 additions & 5 deletions backend/windmill-api/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,12 @@ where
{
if let Some(authed) = cache.get_authed(workspace_id.clone(), &token).await {
parts.extensions.insert(authed.clone());
if authed.scopes.is_some()
&& (path_vec.len() < 3
|| (path_vec[4] != "jobs" && path_vec[4] != "jobs_u"))
if authed.scopes.as_ref().is_some_and(|scopes| {
scopes
.iter()
.any(|s| s.starts_with("jobs:") || s.starts_with("run:"))
}) && (path_vec.len() < 3
|| (path_vec[4] != "jobs" && path_vec[4] != "jobs_u"))
{
BRUTE_FORCE_COUNTER.increment().await;
return Err((
Expand Down Expand Up @@ -681,15 +684,30 @@ pub fn check_scopes<F>(authed: &ApiAuthed, required: F) -> error::Result<()>
where
F: FnOnce() -> String,
{
if let Some(scopes) = &authed.scopes {
if authed.scopes.as_ref().is_some_and(|scopes| {
scopes
.iter()
.any(|s| s.starts_with("jobs:") || s.starts_with("run:"))
}) {
let req = &required();
if !scopes.contains(req) {
if !authed.scopes.as_ref().unwrap().contains(req) {
return Err(Error::BadRequest(format!("missing required scope: {req}")));
}
}
Ok(())
}

pub fn get_scope_tags(authed: &ApiAuthed) -> Option<Vec<&str>> {
authed
.scopes
.as_ref()?
.iter()
.find_map(|s| match s.split(":").collect::<Vec<_>>().as_slice() {
["if_jobs", "filter_tags", tags] => Some(tags.split(",").collect::<Vec<_>>()),
_ => None,
})
}

#[derive(Clone, Debug)]
pub struct OptAuthed(pub Option<ApiAuthed>);

Expand Down
1 change: 1 addition & 0 deletions backend/windmill-common/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct JWTAuthClaims {
pub workspace_id: String,
pub exp: usize,
pub job_id: Option<String>,
pub scopes: Option<Vec<String>>,
}

#[derive(Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub async fn create_token_for_owner(
exp: (chrono::Utc::now() + chrono::Duration::seconds(expires_in as i64)).timestamp()
as usize,
job_id: Some(job_id.to_string()),
scopes: None,
};

let token = jsonwebtoken::encode(
Expand Down
Loading