Skip to content

Commit c1f5f0b

Browse files
committed
fix: use new orchestrator queue
1 parent aacdf9b commit c1f5f0b

File tree

8 files changed

+288
-57
lines changed

8 files changed

+288
-57
lines changed

.github/workflows/build.yml

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# This workflow will build a golang project
2+
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
3+
4+
name: Build Snapshot Relaese
5+
6+
on:
7+
push:
8+
branches: ["main"]
9+
workflow_dispatch:
10+
11+
jobs:
12+
build:
13+
runs-on: ubuntu-latest
14+
steps:
15+
- name: Build
16+
uses: goreleaser/goreleaser-action@v5
17+
with:
18+
distribution: goreleaser
19+
version: latest
20+
args: release --snapshot --clean
21+
22+
- name: Upload Artifacts
23+
uses: actions/upload-artifact@v3
24+
with:
25+
name: backrest-snapshot-builds
26+
path: |
27+
dist/*.tar.gz
28+
dist/*.zip

.github/workflows/build-and-test.yml .github/workflows/test.yml

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This workflow will build a golang project
22
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
33

4-
name: Build and Test
4+
name: Test
55

66
on:
77
push:
@@ -11,7 +11,7 @@ on:
1111
workflow_dispatch:
1212

1313
jobs:
14-
build-nix:
14+
test-nix:
1515
runs-on: ubuntu-latest
1616
steps:
1717
- uses: actions/checkout@v3
@@ -29,17 +29,16 @@ jobs:
2929
with:
3030
node-version: "20"
3131

32+
- name: Generate
33+
run: go generate ./...
34+
3235
- name: Build
33-
uses: goreleaser/goreleaser-action@v5
34-
with:
35-
distribution: goreleaser
36-
version: latest
37-
args: release --snapshot --clean
36+
run: go build ./...
3837

3938
- name: Test
4039
run: PATH=$(pwd):$PATH go test ./... --race
4140

42-
build-win:
41+
test-win:
4342
runs-on: windows-latest
4443
steps:
4544
- uses: actions/checkout@v3

internal/orchestrator/orchestrator.go

+21-28
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/garethgeorge/backrest/internal/config"
1414
"github.com/garethgeorge/backrest/internal/hook"
1515
"github.com/garethgeorge/backrest/internal/oplog"
16+
"github.com/garethgeorge/backrest/internal/queue"
1617
"github.com/garethgeorge/backrest/internal/rotatinglog"
1718
"go.uber.org/zap"
1819
"google.golang.org/protobuf/proto"
@@ -40,7 +41,7 @@ type Orchestrator struct {
4041
config *v1.Config
4142
OpLog *oplog.OpLog
4243
repoPool *resticRepoPool
43-
taskQueue taskQueue
44+
taskQueue *queue.TimePriorityQueue[scheduledTask]
4445
hookExecutor *hook.HookExecutor
4546
logStore *rotatinglog.RotatingLog
4647

@@ -59,10 +60,8 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
5960
OpLog: oplog,
6061
config: cfg,
6162
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
62-
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
63-
taskQueue: newTaskQueue(func() time.Time {
64-
return o.curTime()
65-
}),
63+
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
64+
taskQueue: queue.NewTimePriorityQueue[scheduledTask](),
6665
hookExecutor: hook.NewHookExecutor(oplog, logStore),
6766
logStore: logStore,
6867
}
@@ -194,29 +193,23 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta
194193
running.cancel()
195194
}
196195

197-
tasks := o.taskQueue.Reset()
198-
remaining := make([]scheduledTask, 0, len(tasks))
199-
200-
for _, t := range tasks {
201-
if t.task.OperationId() == operationId {
202-
if err := t.task.Cancel(status); err != nil {
203-
return fmt.Errorf("cancel task %q: %w", t.task.Name(), err)
204-
}
205-
206-
// check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run.
207-
if nextTime := t.task.Next(t.runAt); nextTime != nil {
208-
remaining = append(remaining, scheduledTask{
209-
task: t.task,
210-
runAt: *nextTime,
211-
})
212-
}
213-
} else {
214-
remaining = append(remaining, *t)
215-
}
196+
allTasks := o.taskQueue.GetAll()
197+
idx := slices.IndexFunc(allTasks, func(t scheduledTask) bool {
198+
return t.task.OperationId() == operationId
199+
})
200+
if idx == -1 {
201+
return nil
216202
}
217203

218-
o.taskQueue.Push(remaining...)
219-
204+
t := allTasks[idx]
205+
o.taskQueue.Remove(t)
206+
if err := t.task.Cancel(status); err != nil {
207+
return fmt.Errorf("cancel task %q: %w", t.task.Name(), err)
208+
}
209+
if nextTime := t.task.Next(t.runAt.Add(1 * time.Second)); nextTime != nil {
210+
t.runAt = *nextTime
211+
o.taskQueue.Enqueue(*nextTime, t.priority, t)
212+
}
220213
return nil
221214
}
222215

@@ -231,7 +224,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
231224
}
232225

233226
t := o.taskQueue.Dequeue(mainCtx)
234-
if t == nil {
227+
if t.task == nil {
235228
continue
236229
}
237230

@@ -272,7 +265,7 @@ func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(erro
272265
return
273266
}
274267
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339)))
275-
o.taskQueue.Push(scheduledTask{
268+
o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{
276269
task: t,
277270
runAt: *nextRun,
278271
priority: priority,

internal/orchestrator/scheduledtaskheap.go

+11
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ type scheduledTask struct {
164164
config *v1.Config
165165
}
166166

167+
func (s *scheduledTask) Less(other *scheduledTask) bool {
168+
if s.priority != other.priority {
169+
return s.priority > other.priority
170+
}
171+
return s.runAt.Before(other.runAt)
172+
}
173+
174+
func (s scheduledTask) Eq(other scheduledTask) bool {
175+
return s.task == other.task && s.runAt.Equal(other.runAt) && s.priority == other.priority && s.config == other.config
176+
}
177+
167178
type scheduledTaskHeap struct {
168179
tasks []*scheduledTask
169180
comparator func(i, j *scheduledTask) bool

internal/queue/genheap_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ func (v val) Less(other val) bool {
1313
return v.v < other.v
1414
}
1515

16+
func (v val) Eq(other val) bool {
17+
return v.v == other.v
18+
}
19+
1620
func TestGenericHeapInit(t *testing.T) {
21+
t.Parallel()
1722
genHeap := genericHeap[val]{{v: 3}, {v: 2}, {v: 1}}
1823
heap.Init(&genHeap)
1924

@@ -30,6 +35,7 @@ func TestGenericHeapInit(t *testing.T) {
3035
}
3136

3237
func TestGenericHeapPushPop(t *testing.T) {
38+
t.Parallel()
3339
genHeap := genericHeap[val]{} // empty heap
3440
heap.Push(&genHeap, val{v: 3})
3541
heap.Push(&genHeap, val{v: 2})

internal/queue/timepriorityqueue.go

+60-7
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
)
99

1010
// TimePriorityQueue is a priority queue that dequeues elements at (or after) a specified time, and prioritizes elements based on a priority value. It is safe for concurrent use.
11-
type TimePriorityQueue[T any] struct {
11+
type TimePriorityQueue[T equals[T]] struct {
1212
mu sync.Mutex
1313
tqueue TimeQueue[priorityEntry[T]]
1414
ready genericHeap[priorityEntry[T]]
1515
}
1616

17-
func NewTimePriorityQueue[T any]() *TimePriorityQueue[T] {
17+
func NewTimePriorityQueue[T equals[T]]() *TimePriorityQueue[T] {
1818
return &TimePriorityQueue[T]{
1919
tqueue: TimeQueue[priorityEntry[T]]{},
2020
ready: genericHeap[priorityEntry[T]]{},
@@ -23,33 +23,80 @@ func NewTimePriorityQueue[T any]() *TimePriorityQueue[T] {
2323

2424
func (t *TimePriorityQueue[T]) Len() int {
2525
t.mu.Lock()
26+
t.tqueue.mu.Lock()
2627
defer t.mu.Unlock()
27-
return t.tqueue.Len() + t.ready.Len()
28+
defer t.tqueue.mu.Unlock()
29+
return t.tqueue.heap.Len() + t.ready.Len()
2830
}
2931

3032
func (t *TimePriorityQueue[T]) Peek() T {
3133
t.mu.Lock()
34+
t.tqueue.mu.Lock()
3235
defer t.mu.Unlock()
36+
defer t.tqueue.mu.Unlock()
3337

3438
if t.ready.Len() > 0 {
3539
return t.ready.Peek().v
3640
}
37-
return t.tqueue.Peek().v
41+
if t.tqueue.heap.Len() > 0 {
42+
return t.tqueue.heap.Peek().v.v
43+
}
44+
var zero T
45+
return zero
3846
}
3947

4048
func (t *TimePriorityQueue[T]) Reset() []T {
4149
t.mu.Lock()
50+
t.tqueue.mu.Lock()
4251
defer t.mu.Unlock()
52+
defer t.tqueue.mu.Unlock()
53+
4354
var res []T
4455
for t.ready.Len() > 0 {
4556
res = append(res, heap.Pop(&t.ready).(priorityEntry[T]).v)
4657
}
47-
for t.tqueue.Len() > 0 {
58+
for t.tqueue.heap.Len() > 0 {
4859
res = append(res, heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]).v.v)
4960
}
5061
return res
5162
}
5263

64+
func (t *TimePriorityQueue[T]) GetAll() []T {
65+
t.mu.Lock()
66+
t.tqueue.mu.Lock()
67+
defer t.mu.Unlock()
68+
defer t.tqueue.mu.Unlock()
69+
res := make([]T, 0, t.tqueue.heap.Len()+t.ready.Len())
70+
for _, entry := range t.tqueue.heap {
71+
res = append(res, entry.v.v)
72+
}
73+
for _, entry := range t.ready {
74+
res = append(res, entry.v)
75+
}
76+
return res
77+
}
78+
79+
func (t *TimePriorityQueue[T]) Remove(v T) {
80+
t.mu.Lock()
81+
t.tqueue.mu.Lock()
82+
defer t.mu.Unlock()
83+
defer t.tqueue.mu.Unlock()
84+
85+
for idx := 0; idx < t.tqueue.heap.Len(); idx++ {
86+
if t.tqueue.heap[idx].v.v.Eq(v) {
87+
heap.Remove(&t.tqueue.heap, idx)
88+
return
89+
}
90+
}
91+
92+
for idx := 0; idx < t.ready.Len(); idx++ {
93+
if t.ready[idx].v.Eq(v) {
94+
heap.Remove(&t.ready, idx)
95+
return
96+
}
97+
}
98+
}
99+
53100
func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) {
54101
t.mu.Lock()
55102
t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v})
@@ -59,15 +106,17 @@ func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) {
59106
func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T {
60107
t.mu.Lock()
61108
for {
109+
t.tqueue.mu.Lock()
62110
for t.tqueue.heap.Len() > 0 {
63-
thead := t.tqueue.Peek() // peek at the head of the time queue
111+
thead := t.tqueue.heap.Peek() // peek at the head of the time queue
64112
if thead.at.Before(time.Now()) {
65113
tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]])
66114
heap.Push(&t.ready, tqe.v)
67115
} else {
68116
break
69117
}
70118
}
119+
t.tqueue.mu.Unlock()
71120
if t.ready.Len() > 0 {
72121
defer t.mu.Unlock()
73122
return heap.Pop(&t.ready).(priorityEntry[T]).v
@@ -80,7 +129,7 @@ func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T {
80129
}
81130
}
82131

83-
type priorityEntry[T any] struct {
132+
type priorityEntry[T equals[T]] struct {
84133
at time.Time
85134
priority int
86135
v T
@@ -89,3 +138,7 @@ type priorityEntry[T any] struct {
89138
func (t priorityEntry[T]) Less(other priorityEntry[T]) bool {
90139
return t.priority > other.priority
91140
}
141+
142+
func (t priorityEntry[T]) Eq(other priorityEntry[T]) bool {
143+
return t.at == other.at && t.priority == other.priority && t.v.Eq(other.v)
144+
}

0 commit comments

Comments
 (0)