Skip to content

Commit 07cffcb

Browse files
committed
fix: revert orchestrator changes
1 parent ff2bfb9 commit 07cffcb

File tree

2 files changed

+254
-25
lines changed

2 files changed

+254
-25
lines changed

internal/orchestrator/orchestrator.go

+34-25
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/garethgeorge/backrest/internal/config"
1414
"github.com/garethgeorge/backrest/internal/hook"
1515
"github.com/garethgeorge/backrest/internal/oplog"
16-
"github.com/garethgeorge/backrest/internal/queue"
1716
"github.com/garethgeorge/backrest/internal/rotatinglog"
1817
"go.uber.org/zap"
1918
"google.golang.org/protobuf/proto"
@@ -41,10 +40,14 @@ type Orchestrator struct {
4140
config *v1.Config
4241
OpLog *oplog.OpLog
4342
repoPool *resticRepoPool
44-
taskQueue *queue.TimePriorityQueue[scheduledTask]
43+
taskQueue taskQueue
4544
hookExecutor *hook.HookExecutor
4645
logStore *rotatinglog.RotatingLog
47-
runningTask atomic.Pointer[taskExecutionInfo]
46+
47+
// now for the purpose of testing; used by Run() to get the current time.
48+
now func() time.Time
49+
50+
runningTask atomic.Pointer[taskExecutionInfo]
4851
}
4952

5053
func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) {
@@ -56,8 +59,10 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
5659
OpLog: oplog,
5760
config: cfg,
5861
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
59-
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
60-
taskQueue: queue.NewTimePriorityQueue[scheduledTask](),
62+
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
63+
taskQueue: newTaskQueue(func() time.Time {
64+
return o.curTime()
65+
}),
6166
hookExecutor: hook.NewHookExecutor(oplog, logStore),
6267
logStore: logStore,
6368
}
@@ -99,6 +104,13 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
99104
return o, nil
100105
}
101106

107+
func (o *Orchestrator) curTime() time.Time {
108+
if o.now != nil {
109+
return o.now()
110+
}
111+
return time.Now()
112+
}
113+
102114
func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
103115
o.mu.Lock()
104116
defer o.mu.Unlock()
@@ -109,13 +121,12 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
109121
return fmt.Errorf("failed to update repo pool config: %w", err)
110122
}
111123

112-
return o.scheduleDefaultTasks(cfg)
124+
return o.ScheduleDefaultTasks(cfg)
113125
}
114126

115127
// rescheduleTasksIfNeeded checks if any tasks need to be rescheduled based on config changes.
116-
func (o *Orchestrator) scheduleDefaultTasks(config *v1.Config) error {
128+
func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error {
117129
zap.L().Info("scheduling default tasks, waiting for task queue reset.")
118-
119130
removedTasks := o.taskQueue.Reset()
120131
for _, t := range removedTasks {
121132
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
@@ -184,22 +195,28 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta
184195
}
185196

186197
tasks := o.taskQueue.Reset()
198+
remaining := make([]scheduledTask, 0, len(tasks))
199+
187200
for _, t := range tasks {
188201
if t.task.OperationId() == operationId {
189202
if err := t.task.Cancel(status); err != nil {
190203
return fmt.Errorf("cancel task %q: %w", t.task.Name(), err)
191204
}
192205

193-
nextTime := t.task.Next(t.runAt)
194-
if nextTime == nil {
195-
continue
206+
// check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run.
207+
if nextTime := t.task.Next(t.runAt); nextTime != nil {
208+
remaining = append(remaining, scheduledTask{
209+
task: t.task,
210+
runAt: *nextTime,
211+
})
196212
}
197-
198-
t.runAt = *nextTime
213+
} else {
214+
remaining = append(remaining, *t)
199215
}
200-
o.taskQueue.Enqueue(t.runAt, t.priority, t) // requeue the task.
201216
}
202217

218+
o.taskQueue.Push(remaining...)
219+
203220
return nil
204221
}
205222

@@ -214,7 +231,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
214231
}
215232

216233
t := o.taskQueue.Dequeue(mainCtx)
217-
if t.task == nil {
234+
if t == nil {
218235
continue
219236
}
220237

@@ -250,12 +267,12 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
250267
}
251268

252269
func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) {
253-
nextRun := t.Next(time.Now())
270+
nextRun := t.Next(o.curTime())
254271
if nextRun == nil {
255272
return
256273
}
257274
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339)))
258-
o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{
275+
o.taskQueue.Push(scheduledTask{
259276
task: t,
260277
runAt: *nextRun,
261278
priority: priority,
@@ -324,11 +341,3 @@ type taskExecutionInfo struct {
324341
operationId int64
325342
cancel func()
326343
}
327-
328-
type scheduledTask struct {
329-
task Task
330-
runAt time.Time
331-
priority int
332-
callbacks []func(error)
333-
config *v1.Config
334-
}
+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package orchestrator
2+
3+
import (
4+
"container/heap"
5+
"context"
6+
"sync"
7+
"time"
8+
9+
v1 "github.com/garethgeorge/backrest/gen/go/v1"
10+
)
11+
12+
var taskQueueDefaultPollInterval = 3 * time.Minute
13+
14+
type taskQueue struct {
15+
dequeueMu sync.Mutex
16+
mu sync.Mutex
17+
heap scheduledTaskHeapByTime
18+
notify chan struct{}
19+
ready scheduledTaskHeapByPriorityThenTime
20+
pollInterval time.Duration
21+
22+
Now func() time.Time
23+
}
24+
25+
func newTaskQueue(now func() time.Time) taskQueue {
26+
return taskQueue{
27+
heap: scheduledTaskHeapByTime{},
28+
ready: scheduledTaskHeapByPriorityThenTime{},
29+
pollInterval: taskQueueDefaultPollInterval,
30+
Now: now,
31+
}
32+
}
33+
34+
func (t *taskQueue) curTime() time.Time {
35+
if t.Now != nil {
36+
return t.Now()
37+
}
38+
return time.Now()
39+
}
40+
41+
func (t *taskQueue) Push(tasks ...scheduledTask) {
42+
t.mu.Lock()
43+
defer t.mu.Unlock()
44+
45+
for _, task := range tasks {
46+
task := task
47+
if task.task == nil {
48+
panic("task cannot be nil")
49+
}
50+
heap.Push(&t.heap, &task)
51+
}
52+
53+
if t.notify != nil {
54+
t.notify <- struct{}{}
55+
}
56+
}
57+
58+
func (t *taskQueue) Reset() []*scheduledTask {
59+
t.mu.Lock()
60+
defer t.mu.Unlock()
61+
62+
oldTasks := t.heap.tasks
63+
oldTasks = append(oldTasks, t.ready.tasks...)
64+
t.heap.tasks = nil
65+
t.ready.tasks = nil
66+
67+
if t.notify != nil {
68+
t.notify <- struct{}{}
69+
}
70+
return oldTasks
71+
}
72+
73+
func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
74+
t.dequeueMu.Lock()
75+
defer t.dequeueMu.Unlock()
76+
77+
t.mu.Lock()
78+
defer t.mu.Unlock()
79+
t.notify = make(chan struct{}, 10)
80+
defer func() {
81+
close(t.notify)
82+
t.notify = nil
83+
}()
84+
85+
for {
86+
first, ok := t.heap.Peek().(*scheduledTask)
87+
if !ok { // no tasks in heap.
88+
if t.ready.Len() > 0 {
89+
return heap.Pop(&t.ready).(*scheduledTask)
90+
}
91+
t.mu.Unlock()
92+
select {
93+
case <-ctx.Done():
94+
t.mu.Lock()
95+
return nil
96+
case <-t.notify:
97+
}
98+
t.mu.Lock()
99+
continue
100+
}
101+
102+
now := t.curTime()
103+
104+
// if there's a task in the ready queue AND the first task isn't ready yet then immediately return the ready task.
105+
ready, ok := t.ready.Peek().(*scheduledTask)
106+
if ok && now.Before(first.runAt) {
107+
heap.Pop(&t.ready)
108+
return ready
109+
}
110+
111+
t.mu.Unlock()
112+
d := first.runAt.Sub(now)
113+
if t.pollInterval > 0 && d > t.pollInterval {
114+
// A poll interval may be set to work around clock changes
115+
// e.g. when a laptop wakes from sleep or the system clock is adjusted.
116+
d = t.pollInterval
117+
}
118+
timer := time.NewTimer(d)
119+
120+
select {
121+
case <-timer.C:
122+
t.mu.Lock()
123+
if t.heap.Len() == 0 {
124+
break
125+
}
126+
127+
for {
128+
first, ok := t.heap.Peek().(*scheduledTask)
129+
if !ok {
130+
break
131+
}
132+
if first.runAt.After(t.curTime()) {
133+
// task is not yet ready to run
134+
break
135+
}
136+
heap.Pop(&t.heap) // remove the task from the heap
137+
heap.Push(&t.ready, first)
138+
}
139+
140+
if t.ready.Len() == 0 {
141+
break
142+
}
143+
return heap.Pop(&t.ready).(*scheduledTask)
144+
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
145+
t.mu.Lock()
146+
if !timer.Stop() {
147+
<-timer.C
148+
}
149+
case <-ctx.Done():
150+
t.mu.Lock()
151+
if !timer.Stop() {
152+
<-timer.C
153+
}
154+
return nil
155+
}
156+
}
157+
}
158+
159+
type scheduledTask struct {
160+
task Task
161+
runAt time.Time
162+
priority int
163+
callbacks []func(error)
164+
config *v1.Config
165+
}
166+
167+
type scheduledTaskHeap struct {
168+
tasks []*scheduledTask
169+
comparator func(i, j *scheduledTask) bool
170+
}
171+
172+
func (h *scheduledTaskHeap) Len() int {
173+
return len(h.tasks)
174+
}
175+
176+
func (h *scheduledTaskHeap) Swap(i, j int) {
177+
h.tasks[i], h.tasks[j] = h.tasks[j], h.tasks[i]
178+
}
179+
180+
func (h *scheduledTaskHeap) Push(x interface{}) {
181+
h.tasks = append(h.tasks, x.(*scheduledTask))
182+
}
183+
184+
func (h *scheduledTaskHeap) Pop() interface{} {
185+
old := h.tasks
186+
n := len(old)
187+
x := old[n-1]
188+
h.tasks = old[0 : n-1]
189+
return x
190+
}
191+
192+
func (h *scheduledTaskHeap) Peek() interface{} {
193+
if len(h.tasks) == 0 {
194+
return nil
195+
}
196+
return h.tasks[0]
197+
}
198+
199+
type scheduledTaskHeapByTime struct {
200+
scheduledTaskHeap
201+
}
202+
203+
var _ heap.Interface = &scheduledTaskHeapByTime{}
204+
205+
func (h *scheduledTaskHeapByTime) Less(i, j int) bool {
206+
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
207+
}
208+
209+
type scheduledTaskHeapByPriorityThenTime struct {
210+
scheduledTaskHeap
211+
}
212+
213+
var _ heap.Interface = &scheduledTaskHeapByPriorityThenTime{}
214+
215+
func (h *scheduledTaskHeapByPriorityThenTime) Less(i, j int) bool {
216+
if h.tasks[i].priority != h.tasks[j].priority {
217+
return h.tasks[i].priority > h.tasks[j].priority
218+
}
219+
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
220+
}

0 commit comments

Comments
 (0)