Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent access #2 #719

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"fmt"
"log/slog"
"sync"

"github.com/deckhouse/deckhouse/pkg/log"

Expand Down Expand Up @@ -41,16 +42,17 @@ type KubernetesBindingsController interface {

// kubernetesHooksController is a main implementation of KubernetesHooksController
type kubernetesBindingsController struct {
// All hooks with 'kubernetes' bindings
BindingMonitorLinks map[string]*KubernetesBindingToMonitorLink

// bindings configurations
KubernetesBindings []htypes.OnKubernetesEventConfig

// dependencies
kubeEventsManager kubeeventsmanager.KubeEventsManager

logger *log.Logger

l sync.RWMutex
// All hooks with 'kubernetes' bindings
BindingMonitorLinks map[string]*KubernetesBindingToMonitorLink
}

// kubernetesHooksController should implement the KubernetesHooksController
Expand Down Expand Up @@ -83,10 +85,10 @@ func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExec
if err != nil {
return nil, fmt.Errorf("run monitor: %s", err)
}
c.BindingMonitorLinks[config.Monitor.Metadata.MonitorId] = &KubernetesBindingToMonitorLink{
c.setBindingMonitorLinks(config.Monitor.Metadata.MonitorId, &KubernetesBindingToMonitorLink{
MonitorId: config.Monitor.Metadata.MonitorId,
BindingConfig: config,
}
})
// Start monitor's informers to fill the cache.
c.kubeEventsManager.StartMonitor(config.Monitor.Metadata.MonitorId)

Expand All @@ -102,7 +104,7 @@ func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExec

func (c *kubernetesBindingsController) UpdateMonitor(monitorId string, kind, apiVersion string) error {
// Find binding for monitorId
link, ok := c.BindingMonitorLinks[monitorId]
link, ok := c.getBindingMonitorLinksById(monitorId)
if !ok {
return nil
}
Expand Down Expand Up @@ -152,10 +154,11 @@ func (c *kubernetesBindingsController) UpdateMonitor(monitorId string, kind, api

// UnlockEvents turns on eventCb for all monitors to emit events after Synchronization.
func (c *kubernetesBindingsController) UnlockEvents() {
for monitorID := range c.BindingMonitorLinks {
c.iterateBindingMonitorLinks(func(monitorID string) bool {
m := c.kubeEventsManager.GetMonitor(monitorID)
m.EnableKubeEventCb()
}
return false
})
}

// UnlockEventsFor turns on eventCb for matched monitor to emit events after Synchronization.
Expand All @@ -171,24 +174,53 @@ func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) {
// StopMonitors stops all monitors for the hook.
// TODO handle error!
func (c *kubernetesBindingsController) StopMonitors() {
for monitorID := range c.BindingMonitorLinks {
c.iterateBindingMonitorLinks(func(monitorID string) bool {
_ = c.kubeEventsManager.StopMonitor(monitorID)
}
return false
})
}

func (c *kubernetesBindingsController) CanHandleEvent(kubeEvent kemtypes.KubeEvent) bool {
for key := range c.BindingMonitorLinks {
if key == kubeEvent.MonitorId {
var canHandleEvent bool

c.iterateBindingMonitorLinks(func(monitorID string) bool {
if monitorID == kubeEvent.MonitorId {
canHandleEvent = true
return true
}
return false
})

return canHandleEvent
}

func (c *kubernetesBindingsController) iterateBindingMonitorLinks(doFn func(monitorID string) bool) {
c.l.RLock()
for monitorID := range c.BindingMonitorLinks {
if exit := doFn(monitorID); exit {
break
}
}
return false
c.l.RUnlock()
}

func (c *kubernetesBindingsController) getBindingMonitorLinksById(monitorId string) (*KubernetesBindingToMonitorLink, bool) {
c.l.RLock()
link, found := c.BindingMonitorLinks[monitorId]
c.l.RUnlock()
return link, found
}

func (c *kubernetesBindingsController) setBindingMonitorLinks(monitorId string, link *KubernetesBindingToMonitorLink) {
c.l.Lock()
c.BindingMonitorLinks[monitorId] = link
c.l.Unlock()
}

// HandleEvent receives event from KubeEventManager and returns a BindingExecutionInfo
// to help create a new task to run a hook.
func (c *kubernetesBindingsController) HandleEvent(kubeEvent kemtypes.KubeEvent) BindingExecutionInfo {
link, hasKey := c.BindingMonitorLinks[kubeEvent.MonitorId]
link, hasKey := c.getBindingMonitorLinksById(kubeEvent.MonitorId)
if !hasKey {
log.Error("Possible bug!!! Unknown kube event: no such monitor id registered", slog.String("monitorID", kubeEvent.MonitorId))
return BindingExecutionInfo{
Expand Down
19 changes: 16 additions & 3 deletions pkg/hook/controller/schedule_bindings_controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"sync"

bctx "github.com/flant/shell-operator/pkg/hook/binding_context"
htypes "github.com/flant/shell-operator/pkg/hook/types"
schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager"
Expand Down Expand Up @@ -29,14 +31,15 @@ type ScheduleBindingsController interface {

// scheduleHooksController is a main implementation of KubernetesHooksController
type scheduleBindingsController struct {
// dependencies
scheduleManager schedulemanager.ScheduleManager

l sync.RWMutex
// All hooks with 'kubernetes' bindings
ScheduleLinks map[string]*ScheduleBindingToCrontabLink

// bindings configurations
ScheduleBindings []htypes.ScheduleConfig
Comment on lines +34 to 42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// dependencies
scheduleManager schedulemanager.ScheduleManager
l sync.RWMutex
// All hooks with 'kubernetes' bindings
ScheduleLinks map[string]*ScheduleBindingToCrontabLink
// bindings configurations
ScheduleBindings []htypes.ScheduleConfig
// All hooks with 'kubernetes' bindings
ScheduleLinks map[string]*ScheduleBindingToCrontabLink
// bindings configurations
ScheduleBindings []htypes.ScheduleConfig
l sync.RWMutex
// dependencies
scheduleManager schedulemanager.ScheduleManager


// dependencies
scheduleManager schedulemanager.ScheduleManager
}

// kubernetesHooksController should implement the KubernetesHooksController
Expand All @@ -54,10 +57,14 @@ func (c *scheduleBindingsController) WithScheduleBindings(bindings []htypes.Sche
}

func (c *scheduleBindingsController) WithScheduleManager(scheduleManager schedulemanager.ScheduleManager) {
c.l.Lock()
c.scheduleManager = scheduleManager
c.l.Unlock()
}

func (c *scheduleBindingsController) CanHandleEvent(crontab string) bool {
c.l.RLock()
defer c.l.RUnlock()
for _, link := range c.ScheduleLinks {
if link.Crontab == crontab {
return true
Expand All @@ -69,6 +76,7 @@ func (c *scheduleBindingsController) CanHandleEvent(crontab string) bool {
func (c *scheduleBindingsController) HandleEvent(crontab string) []BindingExecutionInfo {
res := []BindingExecutionInfo{}

c.l.RLock()
for _, link := range c.ScheduleLinks {
if link.Crontab == crontab {
bc := bctx.BindingContext{
Expand All @@ -89,11 +97,13 @@ func (c *scheduleBindingsController) HandleEvent(crontab string) []BindingExecut
res = append(res, info)
}
}
c.l.RUnlock()

return res
}

func (c *scheduleBindingsController) EnableScheduleBindings() {
c.l.Lock()
for _, config := range c.ScheduleBindings {
c.ScheduleLinks[config.ScheduleEntry.Id] = &ScheduleBindingToCrontabLink{
BindingName: config.BindingName,
Expand All @@ -105,11 +115,14 @@ func (c *scheduleBindingsController) EnableScheduleBindings() {
}
c.scheduleManager.Add(config.ScheduleEntry)
}
c.l.Unlock()
}

func (c *scheduleBindingsController) DisableScheduleBindings() {
c.l.Lock()
for _, config := range c.ScheduleBindings {
c.scheduleManager.Remove(config.ScheduleEntry)
delete(c.ScheduleLinks, config.ScheduleEntry.Id)
}
c.l.Unlock()
}
13 changes: 6 additions & 7 deletions pkg/kube_events_manager/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ type resourceInformer struct {

// Events buffer for "Synchronization" mode: it stores events between CachedObjects call and enableKubeEventCb call
// to replay them when "Synchronization" hook is done.
eventBuf []kemtypes.KubeEvent
eventBufLock sync.Mutex

eventBuf []kemtypes.KubeEvent
// A callback function that define custom handling of Kubernetes events.
eventCb func(kemtypes.KubeEvent)
eventCbEnabled bool
eventBufLock sync.Mutex

// TODO resourceInformer should be stoppable (think of deleted namespaces and disabled modules in addon-operator)
ctx context.Context
Expand Down Expand Up @@ -163,20 +162,20 @@ func (ei *resourceInformer) getCachedObjects() []kemtypes.ObjectAndFilterResult
ei.cacheLock.RUnlock()

// Reset eventBuf if needed.
ei.eventBufLock.Lock()
if !ei.eventCbEnabled {
ei.eventBufLock.Lock()
ei.eventBuf = nil
ei.eventBufLock.Unlock()
}
ei.eventBufLock.Unlock()
return res
}

func (ei *resourceInformer) enableKubeEventCb() {
ei.eventBufLock.Lock()
defer ei.eventBufLock.Unlock()
if ei.eventCbEnabled {
return
}
ei.eventBufLock.Lock()
defer ei.eventBufLock.Unlock()
ei.eventCbEnabled = true
for _, kubeEvent := range ei.eventBuf {
// Handle saved kube events.
Expand Down
6 changes: 3 additions & 3 deletions pkg/task/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TaskMainQueue(tqs *queue.TaskQueueSet, format string) interface{} {
dq = dumpQueue{
Name: q.Name,
TasksCount: q.Length(),
Status: q.Status,
Status: q.GetStatus(),
Tasks: tasks,
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func TaskQueues(tqs *queue.TaskQueueSet, format string, showEmpty bool) interfac
mainQueue := dumpQueue{
Name: queue.Name,
TasksCount: queue.Length(),
Status: queue.Status,
Status: queue.GetStatus(),
Tasks: tasks,
}
result.Active = append(result.Active, mainQueue)
Expand All @@ -119,7 +119,7 @@ func TaskQueues(tqs *queue.TaskQueueSet, format string, showEmpty bool) interfac
result.Active = append(result.Active, dumpQueue{
Name: queue.Name,
TasksCount: queue.Length(),
Status: queue.Status,
Status: queue.GetStatus(),
Tasks: tasks,
})
}
Expand Down
39 changes: 26 additions & 13 deletions pkg/task/queue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func (q *TaskQueue) MeasureActionTime(action string) func() {
return q.measureActionFn
}

func (q *TaskQueue) GetStatus() string {
defer q.MeasureActionTime("GetStatus")()
q.m.RLock()
defer q.m.RUnlock()
return q.Status
}

func (q *TaskQueue) SetStatus(status string) {
q.m.Lock()
q.Status = status
q.m.Unlock()
}

func (q *TaskQueue) IsEmpty() bool {
defer q.MeasureActionTime("IsEmpty")()
q.m.RLock()
Expand Down Expand Up @@ -400,18 +413,18 @@ func (q *TaskQueue) Start() {

if q.Handler == nil {
log.Error("should set handler before start in queue", slog.String("name", q.Name))
q.Status = "no handler set"
q.SetStatus("no handler set")
return
}

go func() {
q.Status = ""
q.SetStatus("")
var sleepDelay time.Duration
for {
q.debugf("queue %s: wait for task, delay %d", q.Name, sleepDelay)
t := q.waitForTask(sleepDelay)
if t == nil {
q.Status = "stop"
q.SetStatus("stop")
log.Info("queue stopped", slog.String("name", q.Name))
return
}
Expand All @@ -422,14 +435,14 @@ func (q *TaskQueue) Start() {

// Now the task can be handled!
var nextSleepDelay time.Duration
q.Status = "run first task"
q.SetStatus("run first task")
taskRes := q.Handler(t)

// Check Done channel after long-running operation.
select {
case <-q.ctx.Done():
log.Info("queue stopped after task handling", slog.String("name", q.Name))
q.Status = "stop"
q.SetStatus("stop")
return
default:
}
Expand All @@ -439,7 +452,7 @@ func (q *TaskQueue) Start() {
// Exponential backoff delay before retry.
nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount())
t.IncrementFailureCount()
q.Status = fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String())
q.SetStatus(fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String()))
case Success, Keep:
// Insert new tasks right after the current task in reverse order.
q.withLock(func() {
Expand All @@ -461,16 +474,16 @@ func (q *TaskQueue) Start() {
q.addLast(newTask)
}
})
q.Status = ""
q.SetStatus("")
case Repeat:
// repeat a current task after a small delay
nextSleepDelay = q.DelayOnRepeat
q.Status = "repeat head task"
q.SetStatus("repeat head task")
}

if taskRes.DelayBeforeNextTask != 0 {
nextSleepDelay = taskRes.DelayBeforeNextTask
q.Status = fmt.Sprintf("sleep for %s", nextSleepDelay.String())
q.SetStatus(fmt.Sprintf("sleep for %s", nextSleepDelay.String()))
}

sleepDelay = nextSleepDelay
Expand Down Expand Up @@ -514,15 +527,15 @@ func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task {
q.cancelDelay = false
q.waitMu.Unlock()

origStatus := q.Status
origStatus := q.GetStatus()

defer func() {
checkTicker.Stop()
q.waitMu.Lock()
q.waitInProgress = false
q.cancelDelay = false
q.waitMu.Unlock()
q.Status = origStatus
q.SetStatus(origStatus)
}()

// Wait for the queued task with some delay.
Expand Down Expand Up @@ -565,10 +578,10 @@ func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task {
// Wait loop still in progress: update queue status.
waitTime := time.Since(waitBegin).Truncate(time.Second)
if sleepDelay == 0 {
q.Status = fmt.Sprintf("waiting for task %s", waitTime.String())
q.SetStatus(fmt.Sprintf("waiting for task %s", waitTime.String()))
} else {
delay := sleepDelay.Truncate(time.Second)
q.Status = fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String())
q.SetStatus(fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String()))
}
}
}
Expand Down
Loading