Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement crashLoopBackOff for failed processes #64

Merged
merged 7 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ job "foo" {
}
```

Set `maxAttempts` to `-1` will restart the process "forever".

```hcl
job "foo" {
command = "/usr/local/bin/foo"
args = ["bar"]
maxAttempts = -1
canFail = false
workingDirectory = "/some/path"
}
```

You can append a custom environment to the process by setting `env`:

```hcl
Expand Down
2 changes: 1 addition & 1 deletion cmd/mittnitectl/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var jobStatusCmd = cobra.Command{
lipgloss.Left,
styleStatusLeftColumn.Render("allowed to fail:"),
styleHighlight.Render(fmt.Sprintf("%t", resp.Body.Config.CanFail)),
styleStatusAddendum.Render("(max restart attempts: "), styleHighlight.Render(fmt.Sprintf("%d", resp.Body.Config.MaxAttempts)), ")",
styleStatusAddendum.Render("(max restart attempts: "), styleHighlight.Render(fmt.Sprintf("%d", resp.Body.Config.GetMaxAttempts())), ")",
),
lipgloss.JoinHorizontal(
lipgloss.Left,
Expand Down
2 changes: 1 addition & 1 deletion internal/config/ignitionconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (ignitionConfig *Ignition) GenerateFromConfigDir(configDir string) error {
}

for _, job := range ignitionConfig.Jobs {
if job.MaxAttempts_ != 0 {
if job.MaxAttempts_ != nil {
log.Warnf("field max_attempts in job %s is deprecated in favor of maxAttempts", job.Name)
job.MaxAttempts = job.MaxAttempts_
}
Expand Down
17 changes: 15 additions & 2 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,28 @@ type JobConfig struct {
// optional fields for "normal" jobs
// these will be ignored if fields for lazy jobs are set
Watches []Watch `hcl:"watch" json:"watch"`
MaxAttempts_ int `hcl:"max_attempts" json:"-"` // deprecated
MaxAttempts int `hcl:"maxAttempts" json:"maxAttempts"`
MaxAttempts_ *int `hcl:"max_attempts" json:"-,omitempty"` // deprecated
MaxAttempts *int `hcl:"maxAttempts" json:"maxAttempts,omitempty"`
OneTime bool `hcl:"oneTime" json:"oneTime"`

// fields required for lazy activation
Laziness *Laziness `hcl:"lazy" json:"lazy"`
Listeners []Listener `hcl:"listen" json:"listen"`
}

func (jc *JobConfig) GetMaxAttempts() int {
maxAttempts := 3
if jc.MaxAttempts == nil {
return maxAttempts
}

maxAttempts = *jc.MaxAttempts
if maxAttempts < 0 {
maxAttempts = -1
}
return maxAttempts
}

type BootJobConfig struct {
BaseJobConfig `hcl:",squash"`

Expand Down
4 changes: 4 additions & 0 deletions pkg/proc/basejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (job *baseJob) Signal(sig os.Signal) {
)
}

func (job *baseJob) Reset() {
job.phase = JobPhase{}
}

func (job *baseJob) MarkForRestart() {
job.restart = true
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/proc/crashloopbackoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package proc

import (
"time"
)

// calculates the next backOff based on the current backOff
func calculateNextBackOff(currBackOff, maxBackoff time.Duration) time.Duration {
if currBackOff.Seconds() <= 1 {
return 2 * time.Second
}
next := time.Duration(2*currBackOff.Seconds()) * time.Second
if next.Seconds() > maxBackoff.Seconds() {
return maxBackoff
}
return next
}
51 changes: 44 additions & 7 deletions pkg/proc/job_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
log "github.com/sirupsen/logrus"
)

const (
// longest duration between two restarts
maxBackOff = 300 * time.Second
)

func (job *CommonJob) Init() {
job.restart = false
job.stop = false
Expand Down Expand Up @@ -45,12 +50,9 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {

l := log.WithField("job.name", job.Config.Name)

backOff := 1 * time.Second
attempts := 0
maxAttempts := job.Config.MaxAttempts

if maxAttempts <= 0 {
maxAttempts = 3
}
maxAttempts := job.Config.GetMaxAttempts()

p := make(chan *os.Process)
defer close(p)
Expand All @@ -62,6 +64,11 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
}()

for { // restart failed jobs as long mittnite is running
if job.stop {
return nil
}

job.ctx, job.interrupt = context.WithCancel(context.Background())
err := job.startOnce(ctx, p)
switch err {
case nil:
Expand All @@ -81,9 +88,18 @@ func (job *CommonJob) Run(ctx context.Context, _ chan<- error) error {
}

attempts++
if attempts < maxAttempts {
if maxAttempts == -1 || attempts < maxAttempts {
currBackOff := backOff
backOff = calculateNextBackOff(currBackOff, maxBackOff)

job.phase.Set(JobPhaseReasonCrashLooping)
l.WithField("job.maxAttempts", maxAttempts).WithField("job.usedAttempts", attempts).Info("remaining attempts")
l.
WithField("job.maxAttempts", maxAttempts).
WithField("job.usedAttempts", attempts).
WithField("job.nextRestartIn", currBackOff.String()).
Info("remaining attempts")

job.crashLoopSleep(currBackOff)
continue
}

Expand Down Expand Up @@ -175,11 +191,13 @@ func (job *CommonJob) IsRunning() bool {
func (job *CommonJob) Restart() {
job.restart = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Stop() {
job.stop = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
}

func (job *CommonJob) Status() *CommonJobStatus {
Expand Down Expand Up @@ -213,3 +231,22 @@ func (job *CommonJob) executeWatchCommand(watchCmd *config.WatchCommand) error {
Info("executing watch command")
return cmd.Run()
}

func (job *CommonJob) crashLoopSleep(duration time.Duration) {
timeout := make(chan bool)

go func() {
defer close(timeout)
<-time.After(duration)
timeout <- true
}()

for {
select {
case <-timeout:
return
case <-job.ctx.Done():
return
}
}
}
9 changes: 5 additions & 4 deletions pkg/proc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (r *Runner) Init() error {

func (r *Runner) exec() {
for i := range r.jobs {
r.startJob(r.jobs[i])
r.startJob(r.jobs[i], JobPhaseReasonUnknown)
}
}

Expand All @@ -155,7 +155,7 @@ func (r *Runner) jobExistsAndIsControllable(job *CommonJob) bool {

func (r *Runner) addAndStartJob(job Job) {
r.addJobIfNotExists(job)
r.startJob(job)
r.startJob(job, job.GetPhase().Reason)
}

func (r *Runner) addJobIfNotExists(job Job) {
Expand All @@ -167,9 +167,10 @@ func (r *Runner) addJobIfNotExists(job Job) {
r.jobs = append(r.jobs, job)
}

func (r *Runner) startJob(job Job) {
func (r *Runner) startJob(job Job, initialPhase JobPhaseReason) {
job.GetPhase().Set(initialPhase)
phase := job.GetPhase()
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) {
if phase.Is(JobPhaseReasonStopped) || phase.Is(JobPhaseReasonFailed) || phase.Is(JobPhaseReasonCrashLooping) {
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/proc/runner_api_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (r *Runner) apiV1JobMiddleware(next http.Handler) http.Handler {
func (r *Runner) apiV1StartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
}
writer.WriteHeader(http.StatusOK)
}

func (r *Runner) apiV1RestartJob(writer http.ResponseWriter, req *http.Request) {
job := req.Context().Value(contextKeyJob).(*CommonJob)
if !job.IsRunning() {
r.startJob(job)
r.startJob(job, JobPhaseReasonUnknown)
} else {
job.Restart()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/proc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewApi(listenAddress string) *Api {
type baseJob struct {
Config *config.BaseJobConfig

ctx context.Context
interrupt context.CancelFunc

cmd *exec.Cmd
restart bool
stop bool
Expand Down Expand Up @@ -97,6 +100,7 @@ type Job interface {
Init()
Run(context.Context, chan<- error) error
Watch()
Reset()

GetPhase() *JobPhase
GetName() string
Expand Down