From 9e211740aacadb3b2f1e61b5922ec53b22fcba74 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Sat, 7 Dec 2024 20:34:56 +0800 Subject: [PATCH] support concurrency --- go.mod | 2 +- go.sum | 4 +- models/actions/run.go | 150 +-- models/actions/run_job.go | 119 ++- models/actions/run_job_list.go | 16 +- models/actions/run_list.go | 20 +- models/actions/task.go | 4 + models/migrations/migrations.go | 1 + models/migrations/v1_24/v314.go | 29 + routers/web/repo/actions/view.go | 38 +- services/actions/concurrency.go | 85 ++ services/actions/job_emitter.go | 197 +++- services/actions/job_emitter_test.go | 5 +- services/actions/notifier_helper.go | 33 +- services/actions/run.go | 138 +++ services/actions/schedule_tasks.go | 30 +- services/actions/workflow.go | 33 +- tests/integration/actions_concurrency_test.go | 855 ++++++++++++++++++ tests/integration/actions_runner_test.go | 19 +- 19 files changed, 1596 insertions(+), 182 deletions(-) create mode 100644 models/migrations/v1_24/v314.go create mode 100644 services/actions/concurrency.go create mode 100644 services/actions/run.go create mode 100644 tests/integration/actions_concurrency_test.go diff --git a/go.mod b/go.mod index 593e69acd3fcb..e47cb72733d50 100644 --- a/go.mod +++ b/go.mod @@ -315,7 +315,7 @@ replace github.com/hashicorp/go-version => github.com/6543/go-version v1.3.1 replace github.com/shurcooL/vfsgen => github.com/lunny/vfsgen v0.0.0-20220105142115-2c99e1ffdfa0 -replace github.com/nektos/act => gitea.com/gitea/act v0.261.3 +replace github.com/nektos/act => gitea.com/gitea/act v0.0.0-20250211025148-ec091ad26903 // TODO: the only difference is in `PutObject`: the fork doesn't use `NewVerifyingReader(r, sha256.New(), oid, expectedSize)`, need to figure out why replace github.com/charmbracelet/git-lfs-transfer => gitea.com/gitea/git-lfs-transfer v0.2.0 diff --git a/go.sum b/go.sum index bc0265c51fb26..dc6dc52bc4541 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= git.sr.ht/~mariusor/go-xsd-duration v0.0.0-20220703122237-02e73435a078 h1:cliQ4HHsCo6xi2oWZYKWW4bly/Ory9FuTpFPRxj/mAg= git.sr.ht/~mariusor/go-xsd-duration v0.0.0-20220703122237-02e73435a078/go.mod h1:g/V2Hjas6Z1UHUp4yIx6bATpNzJ7DYtD0FG3+xARWxs= -gitea.com/gitea/act v0.261.3 h1:BhiYpGJQKGq0XMYYICCYAN4KnsEWHyLbA6dxhZwFcV4= -gitea.com/gitea/act v0.261.3/go.mod h1:Pg5C9kQY1CEA3QjthjhlrqOC/QOT5NyWNjOjRHw23Ok= +gitea.com/gitea/act v0.0.0-20250211025148-ec091ad26903 h1:nVRg4Ws0ZdK9rm0kSv1IvovncKvoaOsZyEsdOLQJSBk= +gitea.com/gitea/act v0.0.0-20250211025148-ec091ad26903/go.mod h1:Pg5C9kQY1CEA3QjthjhlrqOC/QOT5NyWNjOjRHw23Ok= gitea.com/gitea/git-lfs-transfer v0.2.0 h1:baHaNoBSRaeq/xKayEXwiDQtlIjps4Ac/Ll4KqLMB40= gitea.com/gitea/git-lfs-transfer v0.2.0/go.mod h1:UrXUCm3xLQkq15fu7qlXHUMlrhdlXHoi13KH2Dfiits= gitea.com/go-chi/binding v0.0.0-20240430071103-39a851e106ed h1:EZZBtilMLSZNWtHHcgq2mt6NSGhJSZBuduAlinMEmso= diff --git a/models/actions/run.go b/models/actions/run.go index 60fbbcd3233ef..f945b97209f4b 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -20,7 +20,6 @@ import ( "code.gitea.io/gitea/modules/util" webhook_module "code.gitea.io/gitea/modules/webhook" - "github.com/nektos/act/pkg/jobparser" "xorm.io/builder" ) @@ -47,6 +46,8 @@ type ActionRun struct { TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + ConcurrencyGroup string + ConcurrencyCancel bool // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp @@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool { return run.ScheduleID > 0 } -func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { +func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { _, err := db.GetEngine(ctx).ID(repo.ID). SetExpr("num_action_runs", builder.Select("count(*)").From("action_run"). @@ -222,38 +223,8 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return err } - // Iterate over each job and attempt to cancel it. - for _, job := range jobs { - // Skip jobs that are already in a terminal state (completed, cancelled, etc.). - status := job.Status - if status.IsDone() { - continue - } - - // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. - if job.TaskID == 0 { - job.Status = StatusCancelled - job.Stopped = timeutil.TimeStampNow() - - // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } - - // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. - if n == 0 { - return fmt.Errorf("job has changed, try again") - } - - // Continue with the next job. - continue - } - - // If the job has an associated task, try to stop the task, effectively cancelling the job. - if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { - return err - } + if err := CancelJobs(ctx, jobs); err != nil { + return err } } @@ -261,80 +232,41 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return nil } -// InsertRun inserts a run -// The title will be cut off at 255 characters if it's longer than 255 characters. -func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { - ctx, committer, err := db.TxContext(ctx) - if err != nil { - return err - } - defer committer.Close() - - index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) - if err != nil { - return err - } - run.Index = index - run.Title = util.EllipsisDisplayString(run.Title, 255) +func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error { + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue + } - if err := db.Insert(ctx, run); err != nil { - return err - } + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() - if run.Repo == nil { - repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) - if err != nil { - return err - } - run.Repo = repo - } + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + return err + } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { - return err - } + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + return fmt.Errorf("job has changed, try again") + } - runJobs := make([]*ActionRunJob, 0, len(jobs)) - var hasWaiting bool - for _, v := range jobs { - id, job := v.Job() - needs := job.Needs() - if err := v.SetJob(id, job.EraseNeeds()); err != nil { - return err + // Continue with the next job. + continue } - payload, _ := v.Marshal() - status := StatusWaiting - if len(needs) > 0 || run.NeedApproval { - status = StatusBlocked - } else { - hasWaiting = true - } - job.Name = util.EllipsisDisplayString(job.Name, 255) - runJobs = append(runJobs, &ActionRunJob{ - RunID: run.ID, - RepoID: run.RepoID, - OwnerID: run.OwnerID, - CommitSHA: run.CommitSHA, - IsForkPullRequest: run.IsForkPullRequest, - Name: job.Name, - WorkflowPayload: payload, - JobID: id, - Needs: needs, - RunsOn: job.RunsOn(), - Status: status, - }) - } - if err := db.Insert(ctx, runJobs); err != nil { - return err - } - // if there is a job in the waiting status, increase tasks version. - if hasWaiting { - if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { return err } } - - return committer.Commit() + return nil } func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { @@ -426,7 +358,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } run.Repo = repo } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { + if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { return err } } @@ -435,3 +367,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } type ActionRunIndex db.ResourceIndex + +func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) { + if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel { + return false, nil + } + + concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: actionRun.RepoID, + ConcurrencyGroup: actionRun.ConcurrencyGroup, + Status: []Status{StatusWaiting, StatusRunning}, + }) + if err != nil { + return false, fmt.Errorf("find running and waiting runs: %w", err) + } + previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID }) + + return len(previousRuns) > 0, nil +} diff --git a/models/actions/run_job.go b/models/actions/run_job.go index de4b6aab66701..023c0dc77ed10 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -33,10 +33,17 @@ type ActionRunJob struct { RunsOn []string `xorm:"JSON TEXT"` TaskID int64 // the latest task of the job Status Status `xorm:"index"` - Started timeutil.TimeStamp - Stopped timeutil.TimeStamp - Created timeutil.TimeStamp `xorm:"created"` - Updated timeutil.TimeStamp `xorm:"updated index"` + + RawConcurrencyGroup string // raw concurrency.group + RawConcurrencyCancel string // raw concurrency.cancel-in-progress + IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty + ConcurrencyGroup string // evaluated concurrency.group + ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress + + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` } func init() { @@ -184,3 +191,107 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status { return StatusUnknown // it shouldn't happen } } + +func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) { + if job.RawConcurrencyGroup == "" { + return false, nil + } + if !job.IsConcurrencyEvaluated { + return false, ErrUnevaluatedConcurrency{ + Group: job.RawConcurrencyGroup, + CancelInProgress: job.RawConcurrencyCancel, + } + } + if job.ConcurrencyGroup == "" || job.ConcurrencyCancel { + return false, nil + } + + concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []Status{StatusRunning, StatusWaiting}, + }) + if err != nil { + return false, fmt.Errorf("count running and waiting jobs: %w", err) + } + if concurrentJobsNum > 0 { + return true, nil + } + + if err := job.LoadRun(ctx); err != nil { + return false, fmt.Errorf("load run: %w", err) + } + + return ShouldBlockRunByConcurrency(ctx, job.Run) +} + +func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error { + if job.RawConcurrencyGroup != "" { + if !job.IsConcurrencyEvaluated { + return ErrUnevaluatedConcurrency{ + Group: job.RawConcurrencyGroup, + CancelInProgress: job.RawConcurrencyCancel, + } + } + if job.ConcurrencyGroup != "" && job.ConcurrencyCancel { + // cancel previous jobs in the same concurrency group + previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + }) + if err != nil { + return fmt.Errorf("find previous jobs: %w", err) + } + previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) + if err := CancelJobs(ctx, previousJobs); err != nil { + return fmt.Errorf("cancel previous jobs: %w", err) + } + } + } + + if err := job.LoadRun(ctx); err != nil { + return fmt.Errorf("load run: %w", err) + } + if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel { + // cancel previous runs in the same concurrency group + runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.Run.ConcurrencyGroup, + Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + }) + if err != nil { + return fmt.Errorf("find runs: %w", err) + } + for _, run := range runs { + if run.ID == job.Run.ID { + continue + } + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + if err := CancelJobs(ctx, jobs); err != nil { + return fmt.Errorf("cancel run %d jobs: %w", run.ID, err) + } + } + } + + return nil +} + +type ErrUnevaluatedConcurrency struct { + Group string + CancelInProgress string +} + +func IsErrUnevaluatedConcurrency(err error) bool { + _, ok := err.(ErrUnevaluatedConcurrency) + return ok +} + +func (err ErrUnevaluatedConcurrency) Error() string { + return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress) +} diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go index 6c5d3b3252ebf..6b808b5abd888 100644 --- a/models/actions/run_job_list.go +++ b/models/actions/run_job_list.go @@ -48,12 +48,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err type FindRunJobOptions struct { db.ListOptions - RunID int64 - RepoID int64 - OwnerID int64 - CommitSHA string - Statuses []Status - UpdatedBefore timeutil.TimeStamp + RunID int64 + RepoID int64 + OwnerID int64 + CommitSHA string + Statuses []Status + UpdatedBefore timeutil.TimeStamp + ConcurrencyGroup string } func (opts FindRunJobOptions) ToConds() builder.Cond { @@ -76,5 +77,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond { if opts.UpdatedBefore > 0 { cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore}) } + if opts.ConcurrencyGroup != "" { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 4046c7d369436..2339e139147ca 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -63,14 +63,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowID string - Ref string // the commit/tag/… that caused this workflow - TriggerUserID int64 - TriggerEvent webhook_module.HookEventType - Approved bool // not util.OptionalBool, it works only when it's true - Status []Status + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status + ConcurrencyGroup string } func (opts FindRunOptions) ToConds() builder.Cond { @@ -99,6 +100,9 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.TriggerEvent != "" { cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent}) } + if len(opts.ConcurrencyGroup) > 0 { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/actions/task.go b/models/actions/task.go index 9f13ff94c9e4a..daf78809d05d5 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -320,6 +320,10 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask return nil, false, nil } + if err := CancelPreviousJobsByConcurrency(ctx, job); err != nil { + return nil, false, err + } + task.Job = job if err := committer.Commit(); err != nil { diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 87d674a440999..ca789a1825125 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -374,6 +374,7 @@ func prepareMigrationTasks() []*migration { // Gitea 1.23.0-rc0 ends at migration ID number 311 (database version 312) newMigration(312, "Add DeleteBranchAfterMerge to AutoMerge", v1_24.AddDeleteBranchAfterMergeForAutoMerge), newMigration(313, "Move PinOrder from issue table to a new table issue_pin", v1_24.MovePinOrderToTableIssuePin), + newMigration(314, "Add support for actions concurrency", v1_24.AddActionsConcurrency), } return preparedMigrations } diff --git a/models/migrations/v1_24/v314.go b/models/migrations/v1_24/v314.go new file mode 100644 index 0000000000000..d4ac6ea77fd19 --- /dev/null +++ b/models/migrations/v1_24/v314.go @@ -0,0 +1,29 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_24 //nolint + +import ( + "xorm.io/xorm" +) + +func AddActionsConcurrency(x *xorm.Engine) error { + type ActionRun struct { + ConcurrencyGroup string + ConcurrencyCancel bool + } + + if err := x.Sync(new(ActionRun)); err != nil { + return err + } + + type ActionRunJob struct { + RawConcurrencyGroup string + RawConcurrencyCancel string + IsConcurrencyEvaluated bool + ConcurrencyGroup string + ConcurrencyCancel bool + } + + return x.Sync(new(ActionRunJob)) +} diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index b27f8e0e7a602..d45ee8e0a4f8c 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -450,8 +450,35 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou job.Started = 0 job.Stopped = 0 + job.ConcurrencyGroup = "" + job.ConcurrencyCancel = false + job.IsConcurrencyEvaluated = false + if err := job.LoadRun(ctx); err != nil { + return err + } + vars, err := actions_model.GetVariablesOfRun(ctx, job.Run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) + } + if job.RawConcurrencyGroup != "" && job.Status != actions_model.StatusBlocked { + var err error + job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(ctx, job.Run, job, vars, nil) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + job.IsConcurrencyEvaluated = true + blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job) + if err != nil { + return err + } + if blockByConcurrency { + job.Status = actions_model.StatusBlocked + } + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped") + updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"} + _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...) return err }); err != nil { return err @@ -568,7 +595,14 @@ func Approve(ctx *context_module.Context) { return err } for _, job := range jobs { - if len(job.Needs) == 0 && job.Status.IsBlocked() { + blockJobByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job) + if err != nil { + if actions_model.IsErrUnevaluatedConcurrency(err) { + continue + } + return err + } + if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockJobByConcurrency { job.Status = actions_model.StatusWaiting _, err := actions_model.UpdateRunJob(ctx, job, nil, "status") if err != nil { diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go new file mode 100644 index 0000000000000..59b9dd570334e --- /dev/null +++ b/services/actions/concurrency.go @@ -0,0 +1,85 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/json" + api "code.gitea.io/gitea/modules/structs" + + "github.com/nektos/act/pkg/jobparser" + act_model "github.com/nektos/act/pkg/model" +) + +func EvaluateWorkflowConcurrency(ctx context.Context, run *actions_model.ActionRun, rc *act_model.RawConcurrency, vars map[string]string) (string, bool, error) { + if err := run.LoadAttributes(ctx); err != nil { + return "", false, fmt.Errorf("run LoadAttributes: %w", err) + } + + gitCtx := GenerateGiteaContext(run, nil) + jobResults := map[string]*jobparser.JobResult{"": {}} + inputs, err := getInputsFromRun(run) + if err != nil { + return "", false, fmt.Errorf("get inputs: %w", err) + } + + concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rc, "", nil, gitCtx, jobResults, vars, inputs) + if err != nil { + return "", false, fmt.Errorf("evaluate concurrency: %w", err) + } + + return concurrencyGroup, concurrencyCancel, nil +} + +func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string, jobResults map[string]*jobparser.JobResult) (string, bool, error) { + if err := actionRunJob.LoadAttributes(ctx); err != nil { + return "", false, fmt.Errorf("job LoadAttributes: %w", err) + } + + rawConcurrency := &act_model.RawConcurrency{ + Group: actionRunJob.RawConcurrencyGroup, + CancelInProgress: actionRunJob.RawConcurrencyCancel, + } + + gitCtx := GenerateGiteaContext(run, actionRunJob) + if jobResults == nil { + jobResults = map[string]*jobparser.JobResult{} + } + jobResults[actionRunJob.JobID] = &jobparser.JobResult{ + Needs: actionRunJob.Needs, + } + inputs, err := getInputsFromRun(run) + if err != nil { + return "", false, fmt.Errorf("get inputs: %w", err) + } + + singleWorkflows, err := jobparser.Parse(actionRunJob.WorkflowPayload) + if err != nil { + return "", false, fmt.Errorf("parse single workflow: %w", err) + } else if len(singleWorkflows) != 1 { + return "", false, fmt.Errorf("not single workflow") + } + _, singleWorkflowJob := singleWorkflows[0].Job() + + concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs) + if err != nil { + return "", false, fmt.Errorf("evaluate concurrency: %w", err) + } + + return concurrencyGroup, concurrencyCancel, nil +} + +func getInputsFromRun(run *actions_model.ActionRun) (map[string]any, error) { + if run.Event != "workflow_dispatch" { + return map[string]any{}, nil + } + var payload api.WorkflowDispatchPayload + if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil { + return nil, err + } + return payload.Inputs, nil +} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 1f859fcf70506..4515e40424b11 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -10,7 +10,9 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/queue" "github.com/nektos/act/pkg/jobparser" @@ -37,25 +39,146 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate for _, update := range items { - if err := checkJobsOfRun(ctx, update.RunID); err != nil { + if err := checkJobsByRunID(ctx, update.RunID); err != nil { + log.Error("check run %d: %v", update.RunID, err) ret = append(ret, update) } } return ret } -func checkJobsOfRun(ctx context.Context, runID int64) error { - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) +func checkJobsByRunID(ctx context.Context, runID int64) error { + run, err := actions_model.GetRunByID(ctx, runID) if err != nil { + return fmt.Errorf("get action run: %w", err) + } + + var jobsToCreateCommitStatus []*actions_model.ActionRunJob + + if err := db.WithTx(ctx, func(ctx context.Context) error { + // check jobs of the current run + if jobs, err := checkJobsOfRun(ctx, run); err != nil { + return err + } else { + jobsToCreateCommitStatus = append(jobsToCreateCommitStatus, jobs...) + } + + // check run (workflow-level) concurrency + concurrentRunIDs := make(container.Set[int64]) + concurrentRunIDs.Add(run.ID) + if run.ConcurrencyGroup != "" { + concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ + RepoID: run.RepoID, + ConcurrencyGroup: run.ConcurrencyGroup, + Status: []actions_model.Status{actions_model.StatusBlocked}, + }) + if err != nil { + return err + } + for _, concurrentRun := range concurrentRuns { + if concurrentRunIDs.Contains(concurrentRun.ID) { + continue + } + concurrentRunIDs.Add(concurrentRun.ID) + if concurrentRun.NeedApproval { + continue + } + if jobs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return err + } else { + jobsToCreateCommitStatus = append(jobsToCreateCommitStatus, jobs...) + } + updatedRun, err := actions_model.GetRunByID(ctx, concurrentRun.ID) + if err != nil { + return err + } + if updatedRun.Status == actions_model.StatusWaiting { + // only run one blocked action run in the same concurrency group + break + } + } + } + + // check job concurrency + runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return err + } + for _, job := range runJobs { + if job.Status.IsDone() && job.ConcurrencyGroup != "" { + waitingConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []actions_model.Status{actions_model.StatusWaiting}, + }) + if err != nil { + return err + } + if len(waitingConcurrentJobs) == 0 { + blockedConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []actions_model.Status{actions_model.StatusBlocked}, + }) + if err != nil { + return err + } + for _, concurrentJob := range blockedConcurrentJobs { + if concurrentRunIDs.Contains(concurrentJob.RunID) { + continue + } + concurrentRunIDs.Add(concurrentJob.RunID) + concurrentRun, err := actions_model.GetRunByID(ctx, concurrentJob.RunID) + if err != nil { + return err + } + if concurrentRun.NeedApproval { + continue + } + if jobs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return err + } else { + jobsToCreateCommitStatus = append(jobsToCreateCommitStatus, jobs...) + } + updatedJob, err := actions_model.GetRunJobByID(ctx, concurrentJob.ID) + if err != nil { + return err + } + if updatedJob.Status == actions_model.StatusWaiting { + break + } + } + } + } + } + + return nil + }); err != nil { return err } + + CreateCommitStatus(ctx, jobsToCreateCommitStatus...) + + return nil +} + +func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) ([]*actions_model.ActionRunJob, error) { + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return nil, err + } + + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return nil, fmt.Errorf("get run %d variables: %w", run.ID, err) + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) for _, job := range jobs { - idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + job.Run = run } - updates := newJobStatusResolver(jobs).Resolve() + updates := newJobStatusResolver(jobs, vars).Resolve(ctx) for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status @@ -68,19 +191,19 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { } return nil }); err != nil { - return err + return nil, err } - CreateCommitStatus(ctx, jobs...) - return nil + return jobs, nil } type jobStatusResolver struct { statuses map[int64]actions_model.Status needs map[int64][]int64 jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string } -func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { +func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) jobMap := make(map[int64]*actions_model.ActionRunJob) for _, job := range jobs { @@ -102,13 +225,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { statuses: statuses, needs: needs, jobMap: jobMap, + vars: vars, } } -func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for i := 0; i < len(r.statuses); i++ { - updated := r.resolve() + updated := r.resolve(ctx) if len(updated) == 0 { return ret } @@ -120,7 +244,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { return ret } -func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for id, status := range r.statuses { if status != actions_model.StatusBlocked { @@ -137,6 +261,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } } if allDone { + // check concurrency + blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars) + if err != nil { + log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err) + continue + } + + if blockedByJobConcurrency { + continue + } + if allSucceed { ret[id] = actions_model.StatusWaiting } else { @@ -160,3 +295,39 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } return ret } + +func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { + if actionRunJob.RawConcurrencyGroup == "" { + return false, nil + } + if err := actionRunJob.LoadAttributes(ctx); err != nil { + return false, err + } + + if !actionRunJob.IsConcurrencyEvaluated { + taskNeeds, err := FindTaskNeeds(ctx, actionRunJob) + if err != nil { + return false, fmt.Errorf("find task needs: %w", err) + } + jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) + for jobID, taskNeed := range taskNeeds { + jobResult := &jobparser.JobResult{ + Result: taskNeed.Result.String(), + Outputs: taskNeed.Outputs, + } + jobResults[jobID] = jobResult + } + + actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults) + if err != nil { + return false, fmt.Errorf("evaluate job concurrency: %w", err) + } + actionRunJob.IsConcurrencyEvaluated = true + + if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { + return false, fmt.Errorf("update run job: %w", err) + } + } + + return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob) +} diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 58c2dc3b242bb..5fe9c59dc32d9 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -4,6 +4,7 @@ package actions import ( + "context" "testing" actions_model "code.gitea.io/gitea/models/actions" @@ -129,8 +130,8 @@ jobs: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newJobStatusResolver(tt.jobs) - assert.Equal(t, tt.want, r.Resolve()) + r := newJobStatusResolver(tt.jobs, nil) + assert.Equal(t, tt.want, r.Resolve(context.Background())) }) } } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 2d8885dc323ab..1024568b8d700 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -332,27 +332,30 @@ func handleWorkflows( continue } - jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars)) + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content) if err != nil { - log.Error("jobparser.Parse: %v", err) + log.Error("ReadWorkflowRawConcurrency: %v", err) continue } - - // cancel running jobs if the event is push or pull_request_sync - if run.Event == webhook_module.HookEventPush || - run.Event == webhook_module.HookEventPullRequestSync { - if err := actions_model.CancelPreviousJobs( - ctx, - run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) + if wfRawConcurrency != nil { + wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) + if err != nil { + log.Error("EvaluateWorkflowConcurrency: %v", err) + continue } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } + } + + jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars)) + if err != nil { + log.Error("jobparser.Parse: %v", err) + continue } - if err := actions_model.InsertRun(ctx, run, jobs); err != nil { + if err := InsertRun(ctx, run, jobs); err != nil { log.Error("InsertRun: %v", err) continue } diff --git a/services/actions/run.go b/services/actions/run.go new file mode 100644 index 0000000000000..006952a5347c9 --- /dev/null +++ b/services/actions/run.go @@ -0,0 +1,138 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/util" + + "github.com/nektos/act/pkg/jobparser" +) + +// InsertRun inserts a run +// The title will be cut off at 255 characters if it's longer than 255 characters. +func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error { + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) + if err != nil { + return err + } + run.Index = index + run.Title = util.EllipsisDisplayString(run.Title, 255) + + // check run (workflow-level) concurrency + blockRunByConcurrency, err := actions_model.ShouldBlockRunByConcurrency(ctx, run) + if err != nil { + return err + } + if blockRunByConcurrency { + run.Status = actions_model.StatusBlocked + } + + if err := db.Insert(ctx, run); err != nil { + return err + } + + if run.Repo == nil { + repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + if err != nil { + return err + } + run.Repo = repo + } + + if err := actions_model.UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { + return err + } + + // query vars for evaluating job concurrency groups + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", run.ID, err) + } + + runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs)) + var hasWaiting bool + for _, v := range jobs { + id, job := v.Job() + needs := job.Needs() + if err := v.SetJob(id, job.EraseNeeds()); err != nil { + return err + } + payload, _ := v.Marshal() + status := actions_model.StatusWaiting + if len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked { + status = actions_model.StatusBlocked + } else { + hasWaiting = true + } + job.Name = util.EllipsisDisplayString(job.Name, 255) + runJob := &actions_model.ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + IsForkPullRequest: run.IsForkPullRequest, + Name: job.Name, + WorkflowPayload: payload, + JobID: id, + Needs: needs, + RunsOn: job.RunsOn(), + Status: status, + } + + // check job concurrency + if job.RawConcurrency != nil && job.RawConcurrency.Group != "" { + runJob.RawConcurrencyGroup = job.RawConcurrency.Group + runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress + // we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter + if runJob.Status != actions_model.StatusBlocked { + var err error + runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, run, runJob, vars, nil) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + runJob.IsConcurrencyEvaluated = true + // check if the job should be blocked by job concurrency + blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, runJob) + if err != nil { + return err + } + if blockByConcurrency { + runJob.Status = actions_model.StatusBlocked + } + } + } + + if err := db.Insert(ctx, runJob); err != nil { + return err + } + + runJobs = append(runJobs, runJob) + } + + run.Status = actions_model.AggregateJobStatus(runJobs) + if err := actions_model.UpdateRun(ctx, run, "status"); err != nil { + return err + } + + // if there is a job in the waiting status, increase tasks version. + if hasWaiting { + if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + } + + return committer.Commit() +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 18f3324fd2c26..48d35bf3f4b8e 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -52,20 +52,6 @@ func startTasks(ctx context.Context) error { // Loop through each spec and create a schedule task for it for _, row := range specs { - // cancel running jobs if the event is push - if row.Schedule.Event == webhook_module.HookEventPush { - // cancel running jobs of the same workflow - if err := actions_model.CancelPreviousJobs( - ctx, - row.RepoID, - row.Schedule.Ref, - row.Schedule.WorkflowID, - webhook_module.HookEventSchedule, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) - } - } - if row.Repo.IsArchived { // Skip if the repo is archived continue @@ -143,9 +129,23 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) if err != nil { return err } + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(cron.Content) + if err != nil { + return err + } + if wfRawConcurrency != nil { + wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) + if err != nil { + return err + } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } + } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := InsertRun(ctx, run, workflows); err != nil { return err } diff --git a/services/actions/workflow.go b/services/actions/workflow.go index 9aecad171b4c1..29dc7693fbfdc 100644 --- a/services/actions/workflow.go +++ b/services/actions/workflow.go @@ -190,7 +190,10 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re } // find workflow from commit - var workflows []*jobparser.SingleWorkflow + var ( + workflows []*jobparser.SingleWorkflow + wfRawConcurrency *model.RawConcurrency + ) for _, entry := range entries { if entry.Name() != workflowID { continue @@ -204,6 +207,10 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re if err != nil { return err } + wfRawConcurrency, err = jobparser.ReadWorkflowRawConcurrency(content) + if err != nil { + return err + } break } @@ -255,19 +262,23 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re Status: actions_model.StatusWaiting, } - // cancel running jobs of the same workflow - if err := actions_model.CancelPreviousJobs( - ctx, - run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, - ); err != nil { - log.Error("CancelRunningJobs: %v", err) + if wfRawConcurrency != nil { + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return err + } + wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) + if err != nil { + return err + } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := InsertRun(ctx, run, workflows); err != nil { return fmt.Errorf("InsertRun: %w", err) } diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go new file mode 100644 index 0000000000000..2fd5e689e24ea --- /dev/null +++ b/tests/integration/actions_concurrency_test.go @@ -0,0 +1,855 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "net/url" + "slices" + "testing" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + auth_model "code.gitea.io/gitea/models/auth" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + webhook_module "code.gitea.io/gitea/modules/webhook" + actions_service "code.gitea.io/gitea/services/actions" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "abc123", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusNoContent) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: + group: workflow-main-abc123-user2 +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: workflow-${{ gitea.ref_name }}-${{ vars.myvar }}-${{ gitea.event.pusher.username }} +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: + group: workflow-main-abc${{ 123 }}-${{ gitea.event.pusher.username }} +jobs: + wf3-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow3' +` + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + + // fetch and exec workflow1, workflow2 and workflow3 are blocked + task := runner.fetchTask(t) + _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch workflow2 or workflow3 + workflowNames := []string{"concurrent-workflow-2.yml", "concurrent-workflow-3.yml"} + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Contains(t, workflowNames, run.WorkflowID) + workflowNames = slices.DeleteFunc(workflowNames, func(wfn string) bool { return wfn == run.WorkflowID }) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch the last workflow (workflow2 or workflow3) + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, workflowNames[0], run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + }) +} + +func TestPullRequestWorkflowConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + // user2 is the owner of the base repo + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + // user4 is the owner of the forked repo + user4 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) + user4Token := getTokenForLoggedInUser(t, loginUser(t, user4.Name), auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiBaseRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + baseRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiBaseRepo.ID}) + user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user2APICtx)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // init the workflow + wfTreePath := ".gitea/workflows/pull.yml" + wfFileContent := `name: Pull Request +on: pull_request +concurrency: + group: pull-request-test + cancel-in-progress: ${{ !startsWith(gitea.head_ref, 'do-not-cancel/') }} +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test the pull' +` + opts1 := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, fmt.Sprintf("create %s", wfTreePath), wfFileContent) + createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts1) + // user2 creates a pull request + doAPICreateFile(user2APICtx, "user2-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/aaa", + Message: "create user2-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user2-fix")), + })(t) + doAPICreatePullRequest(user2APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, "bugfix/aaa")(t) + pr1Task1 := runner.fetchTask(t) + _, _, pr1Run1 := getTaskAndJobAndRunByTaskID(t, pr1Task1.Id) + assert.Equal(t, "pull-request-test", pr1Run1.ConcurrencyGroup) + assert.True(t, pr1Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr1Run1.Status) + + // user4 forks the repo + req := NewRequestWithJSON(t, "POST", fmt.Sprintf("/api/v1/repos/%s/%s/forks", baseRepo.OwnerName, baseRepo.Name), + &api.CreateForkOption{ + Name: util.ToPointer("actions-concurrency-fork"), + }).AddTokenAuth(user4Token) + resp := MakeRequest(t, req, http.StatusAccepted) + var apiForkRepo api.Repository + DecodeJSON(t, resp, &apiForkRepo) + forkRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiForkRepo.ID}) + user4APICtx := NewAPITestContext(t, user4.Name, forkRepo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user4APICtx)(t) + + // user4 creates a pull request from branch "bugfix/bbb" + doAPICreateFile(user4APICtx, "user4-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/bbb", + Message: "create user4-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:bugfix/bbb", user4.Name))(t) + // cannot fetch the task because an approval is required + runner.fetchNoTask(t) + // user2 approves the run + pr2Run1 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: baseRepo.ID, TriggerUserID: user4.ID}) + req = NewRequestWithValues(t, "POST", + fmt.Sprintf("/%s/%s/actions/runs/%d/approve", baseRepo.OwnerName, baseRepo.Name, pr2Run1.Index), + map[string]string{ + "_csrf": GetUserCSRFToken(t, user2Session), + }) + user2Session.MakeRequest(t, req, http.StatusOK) + // fetch the task and the previous task has been cancelled + pr2Task1 := runner.fetchTask(t) + _, _, pr2Run1 = getTaskAndJobAndRunByTaskID(t, pr2Task1.Id) + assert.Equal(t, "pull-request-test", pr2Run1.ConcurrencyGroup) + assert.True(t, pr2Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr2Run1.Status) + pr1Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr1Run1.ID}) + assert.Equal(t, actions_model.StatusCancelled, pr1Run1.Status) + + // user4 creates another pull request from branch "do-not-cancel/ccc" + doAPICreateFile(user4APICtx, "user4-fix2.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "do-not-cancel/ccc", + Message: "create user4-fix2.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix2")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:do-not-cancel/ccc", user4.Name))(t) + // cannot fetch the task because cancel-in-progress is false + runner.fetchNoTask(t) + runner.execTask(t, pr2Task1, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + pr2Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr2Run1.ID}) + assert.Equal(t, actions_model.StatusSuccess, pr2Run1.Status) + // fetch the task + pr3Task1 := runner.fetchTask(t) + _, _, pr3Run1 := getTaskAndJobAndRunByTaskID(t, pr3Task1.Id) + assert.Equal(t, "pull-request-test", pr3Run1.ConcurrencyGroup) + assert.False(t, pr3Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr3Run1.Status) + }) +} + +func TestJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner1 := newMockRunner() + runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"}) + runner2 := newMockRunner() + runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"}) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/version_var", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "v1.23.0", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusNoContent) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + steps: + - run: echo 'wf1-job1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +jobs: + wf2-job1: + runs-on: runner2 + outputs: + version: ${{ steps.version_step.outputs.app_version }} + steps: + - id: version_step + run: echo "app_version=v1.23.0" >> "$GITHUB_OUTPUT" + wf2-job2: + runs-on: runner1 + needs: [wf2-job1] + concurrency: + group: job-main-${{ needs.wf2-job1.outputs.version }} + steps: + - run: echo 'wf2-job2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +jobs: + wf3-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + cancel-in-progress: ${{ vars.version_var == 'v1.23.0' }} + steps: + - run: echo 'wf3-job1' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + + // fetch wf1-job1 + wf1Job1Task := runner1.fetchTask(t) + _, wf1Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf1Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf1Job1ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf1Job1ActionJob.Status) + // fetch and exec wf2-job1 + wf2Job1Task := runner2.fetchTask(t) + runner2.execTask(t, wf2Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "version": "v1.23.0", + }, + }) + // cannot fetch wf2-job2 because wf1-job1 is running + runner1.fetchNoTask(t) + // exec wf1-job1 + runner1.execTask(t, wf1Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // fetch wf2-job2 + wf2Job2Task := runner1.fetchTask(t) + _, wf2Job2ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf2Job2ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf2Job2ActionJob.Status) + // push workflow3 to trigger wf3-job1 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch wf3-job1 + wf3Job1Task := runner1.fetchTask(t) + _, wf3Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf3Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf3Job1ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf3Job1ActionJob.Status) + // wf2-job2 has been cancelled + _, wf2Job2ActionJob, _ = getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.Equal(t, actions_model.StatusCancelled, wf2Job2ActionJob.Status) + }) +} + +func TestMatrixConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + linuxRunner := newMockRunner() + linuxRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-linux-runner", []string{"linux-runner"}) + windowsRunner := newMockRunner() + windowsRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-windows-runner", []string{"windows-runner"}) + darwinRunner := newMockRunner() + darwinRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-darwin-runner", []string{"darwin-runner"}) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + job1: + runs-on: ${{ matrix.os }}-runner + strategy: + matrix: + os: [windows, linux] + concurrency: + group: job-os-${{ matrix.os }} + steps: + - run: echo 'job1' + job2: + runs-on: ${{ matrix.os }}-runner + strategy: + matrix: + os: [darwin, windows, linux] + concurrency: + group: job-os-${{ matrix.os }} + steps: + - run: echo 'job2' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + job1WinTask := windowsRunner.fetchTask(t) + job1LinuxTask := linuxRunner.fetchTask(t) + windowsRunner.fetchNoTask(t) + linuxRunner.fetchNoTask(t) + job2DarwinTask := darwinRunner.fetchTask(t) + _, job1WinJob, _ := getTaskAndJobAndRunByTaskID(t, job1WinTask.Id) + assert.Equal(t, "job1 (windows)", job1WinJob.Name) + assert.Equal(t, "job-os-windows", job1WinJob.ConcurrencyGroup) + _, job1LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job1LinuxTask.Id) + assert.Equal(t, "job1 (linux)", job1LinuxJob.Name) + assert.Equal(t, "job-os-linux", job1LinuxJob.ConcurrencyGroup) + _, job2DarwinJob, _ := getTaskAndJobAndRunByTaskID(t, job2DarwinTask.Id) + assert.Equal(t, "job2 (darwin)", job2DarwinJob.Name) + assert.Equal(t, "job-os-darwin", job2DarwinJob.ConcurrencyGroup) + windowsRunner.execTask(t, job1WinTask, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + linuxRunner.execTask(t, job1LinuxTask, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + job2WinTask := windowsRunner.fetchTask(t) + job2LinuxTask := linuxRunner.fetchTask(t) + _, job2WinJob, _ := getTaskAndJobAndRunByTaskID(t, job2WinTask.Id) + assert.Equal(t, "job2 (windows)", job2WinJob.Name) + assert.Equal(t, "job-os-windows", job2WinJob.ConcurrencyGroup) + _, job2LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job2LinuxTask.Id) + assert.Equal(t, "job2 (linux)", job2LinuxJob.Name) + assert.Equal(t, "job-os-linux", job2LinuxJob.ConcurrencyGroup) + }) +} + +func TestWorkflowDispatchConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml" + wf1FileContent := `name: workflow-dispatch-concurrency +on: + workflow_dispatch: + inputs: + appVersion: + description: 'APP version' + required: true + default: 'v1.23' + type: choice + options: + - v1.21 + - v1.22 + - v1.23 + cancel: + description: 'Cancel running workflows' + required: false + type: boolean + default: false +concurrency: + group: workflow-dispatch-${{ inputs.appVersion }} + cancel-in-progress: ${{ inputs.cancel }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'workflow dispatch job' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // run the workflow with appVersion=v1.21 and cancel=false + urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml") + req := NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.21", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false again + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + // run the workflow with appVersion=v1.22 and cancel=true + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + "cancel": "on", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + }) +} + +func TestScheduleConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + wf1TreePath := ".gitea/workflows/schedule-concurrency.yml" + wf1FileContent := `name: schedule-concurrency +on: + push: + schedule: + - cron: '@every 1m' +concurrency: + group: schedule-concurrency + cancel-in-progress: ${{ gitea.event_name == 'push' }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'schedule workflow' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // fetch the task triggered by push + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "schedule-concurrency", run1.ConcurrencyGroup) + assert.True(t, run1.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventPush), run1.TriggerEvent) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + + // trigger the task by schedule + spec := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID}) + spec.Next = timeutil.TimeStampNow() // manually update "Next" + assert.NoError(t, actions_model.UpdateScheduleSpec(context.Background(), spec, "next")) + assert.NoError(t, actions_service.StartScheduleTasks(context.Background())) + runner.fetchNoTask(t) // cannot fetch because task1 is not completed + runner.execTask(t, task1, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + _, _, run1 = getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, actions_model.StatusSuccess, run1.Status) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "schedule-concurrency", run2.ConcurrencyGroup) + assert.False(t, run2.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventSchedule), run2.TriggerEvent) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + + // trigger the task by schedule again + spec = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID}) + spec.Next = timeutil.TimeStampNow() // manually update "Next" + assert.NoError(t, actions_model.UpdateScheduleSpec(context.Background(), spec, "next")) + assert.NoError(t, actions_service.StartScheduleTasks(context.Background())) + runner.fetchNoTask(t) // cannot fetch because task2 is not completed + run3 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, Status: actions_model.StatusBlocked}) + assert.Equal(t, "schedule-concurrency", run3.ConcurrencyGroup) + assert.False(t, run3.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventSchedule), run3.TriggerEvent) + + // trigger the task by push + doAPICreateFile(httpContext, "doc.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "main", + Message: "create doc.txt", + Author: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Committer: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("doc")), + })(t) + + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, "schedule-concurrency", run4.ConcurrencyGroup) + assert.True(t, run4.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventPush), run4.TriggerEvent) + run3 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run3.ID}) + assert.Equal(t, actions_model.StatusCancelled, run3.Status) + }) +} + +func TestWorkflowAndJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner1 := newMockRunner() + runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"}) + runner2 := newMockRunner() + runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"}) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: + group: workflow-group-1 +jobs: + wf1-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf1-job1' + wf1-job2: + runs-on: runner2 + concurrency: + group: job-group-2 + steps: + - run: echo 'wf1-job2' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: workflow-group-1 +jobs: + wf2-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf2-job2' + wf2-job2: + runs-on: runner2 + concurrency: + group: job-group-2 + steps: + - run: echo 'wf2-job2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: + group: workflow-group-2 +jobs: + wf3-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf3-job1' +` + + wf4TreePath := ".gitea/workflows/concurrent-workflow-4.yml" + wf4FileContent := `name: concurrent-workflow-4 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-4.yml' +concurrency: + group: workflow-group-2 +jobs: + wf4-job1: + runs-on: runner2 + concurrency: + group: job-group-2 + cancel-in-progress: true + steps: + - run: echo 'wf4-job1' +` + + // push workflow 1, 2 and 3 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch wf1-job1 and wf1-job2 + w1j1Task := runner1.fetchTask(t) + w1j2Task := runner2.fetchTask(t) + // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" + // cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1" + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + _, w1j1Job, w1Run := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id) + assert.Equal(t, "job-group-1", w1j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-1", w1Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", w1Run.WorkflowID) + _, w1j2Job, _ := getTaskAndJobAndRunByTaskID(t, w1j2Task.Id) + assert.Equal(t, "job-group-2", w1j2Job.ConcurrencyGroup) + // exec wf1-job1 + runner1.execTask(t, w1j1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // fetch wf3-job1 + w3j1Task := runner1.fetchTask(t) + // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + _, w3j1Job, w3Run := getTaskAndJobAndRunByTaskID(t, w3j1Task.Id) + assert.Equal(t, "job-group-1", w3j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-2", w3Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-3.yml", w3Run.WorkflowID) + // push workflow-4 + opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf4TreePath), wf4FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4) + // exec wf1-job2 + runner2.execTask(t, w1j2Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // wf2-job2 + w2j2Task := runner2.fetchTask(t) + // cannot fetch wf2-job1 because it is blocked by wf3-job1's concurrency group "job-group-1" + // cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2" + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + _, w2j2Job, w2Run := getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) + assert.Equal(t, "job-group-2", w2j2Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-1", w2Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", w2Run.WorkflowID) + // exec wf3-job1 + runner1.execTask(t, w3j1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // fetch wf2-job1 + w2j1Task := runner1.fetchTask(t) + // fetch wf4-job1 + w4j1Task := runner2.fetchTask(t) + // all tasks have been fetched + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + _, w2j1Job, _ := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id) + assert.Equal(t, "job-group-1", w2j1Job.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status) + _, w2j2Job, w2Run = getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) + // wf2-job2 is cancelled because wf4-job1's cancel-in-progress is true + assert.Equal(t, actions_model.StatusCancelled, w2j2Job.Status) + assert.Equal(t, actions_model.StatusCancelled, w2Run.Status) + _, w4j1Job, w4Run := getTaskAndJobAndRunByTaskID(t, w4j1Task.Id) + assert.Equal(t, "job-group-2", w4j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-2", w4Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-4.yml", w4Run.WorkflowID) + }) +} + +func getTaskAndJobAndRunByTaskID(t *testing.T, taskID int64) (*actions_model.ActionTask, *actions_model.ActionRunJob, *actions_model.ActionRun) { + actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskID}) + actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID}) + actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: actionRunJob.RunID}) + return actionTask, actionRunJob, actionRun +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 355ea1705e23c..74b75df6022df 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -92,7 +92,20 @@ func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, run } func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { - fetchTimeout := 10 * time.Second + task := r.tryFetchTask(t, timeout...) + assert.NotNil(t, task, "failed to fetch a task") + return task +} + +func (r *mockRunner) fetchNoTask(t *testing.T, timeout ...time.Duration) { + task := r.tryFetchTask(t, timeout...) + assert.Nil(t, task, "a task is fetched") +} + +const defaultFetchTaskTimeout = 1 * time.Second + +func (r *mockRunner) tryFetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { + fetchTimeout := defaultFetchTaskTimeout if len(timeout) > 0 { fetchTimeout = timeout[0] } @@ -107,9 +120,9 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1 task = resp.Msg.Task break } - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) } - assert.NotNil(t, task, "failed to fetch a task") + return task }