From bd2f6561e3846f3103ee926ec5aa140554a024fe Mon Sep 17 00:00:00 2001 From: taraspos Date: Mon, 3 Feb 2025 15:22:40 +0000 Subject: [PATCH] Integrate changes from https://github.com/mcuadros/ofelia/pull/137 --- cli/config.go | 220 ++++++++++++------ cli/config_test.go | 14 +- cli/daemon.go | 47 ++-- cli/{docker-labels.go => docker_handler.go} | 82 +++++-- ...-labels_test.go => docker_handler_test.go} | 43 +++- cli/validate.go | 27 ++- core/common.go | 2 + core/cron_utils.go | 18 ++ core/execjob.go | 2 +- core/job.go | 18 ++ core/scheduler.go | 39 ++-- core/scheduler_test.go | 2 - go.mod | 5 +- go.sum | 10 +- ofelia.go | 17 +- 15 files changed, 389 insertions(+), 157 deletions(-) rename cli/{docker-labels.go => docker_handler.go} (72%) rename cli/{docker-labels_test.go => docker_handler_test.go} (78%) create mode 100644 core/cron_utils.go diff --git a/cli/config.go b/cli/config.go index 42a5f4528..48ea09f2d 100644 --- a/cli/config.go +++ b/cli/config.go @@ -1,27 +1,22 @@ package cli import ( - "os" + "fmt" - docker "github.com/fsouza/go-dockerclient" "github.com/mcuadros/ofelia/core" "github.com/mcuadros/ofelia/middlewares" - logging "github.com/op/go-logging" defaults "github.com/mcuadros/go-defaults" gcfg "gopkg.in/gcfg.v1" ) const ( - logFormat = "%{time} %{color} %{shortfile} ▶ %{level}%{color:reset} %{message}" jobExec = "job-exec" jobRun = "job-run" jobServiceRun = "job-service-run" jobLocal = "job-local" ) -var IsDockerEnv bool - // Config contains the configuration type Config struct { Global struct { @@ -33,113 +28,93 @@ type Config struct { RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"` ServiceJobs map[string]*RunServiceConfig `gcfg:"job-service-run" mapstructure:"job-service-run,squash"` LocalJobs map[string]*LocalJobConfig `gcfg:"job-local" mapstructure:"job-local,squash"` + + sh *core.Scheduler + dockerHandler *DockerHandler + logger core.Logger } -// BuildFromDockerLabels builds a scheduler using the config from a docker labels -func BuildFromDockerLabels(filterFlags ...string) (*core.Scheduler, error) { +func NewConfig(logger core.Logger) *Config { + // Initialize c := &Config{} - - d, err := c.buildDockerClient() - if err != nil { - return nil, err - } - - labels, err := getLabels(d, filterFlags) - if err != nil { - return nil, err - } - - if err := c.buildFromDockerLabels(labels); err != nil { - return nil, err - } - - return c.build() + c.ExecJobs = make(map[string]*ExecJobConfig) + c.RunJobs = make(map[string]*RunJobConfig) + c.ServiceJobs = make(map[string]*RunServiceConfig) + c.LocalJobs = make(map[string]*LocalJobConfig) + c.logger = logger + defaults.SetDefaults(c) + return c } // BuildFromFile builds a scheduler using the config from a file -func BuildFromFile(filename string) (*core.Scheduler, error) { - c := &Config{} - if err := gcfg.ReadFileInto(c, filename); err != nil { - return nil, err - } - - return c.build() +func BuildFromFile(filename string, logger core.Logger) (*Config, error) { + c := NewConfig(logger) + err := gcfg.ReadFileInto(c, filename) + return c, err } // BuildFromString builds a scheduler using the config from a string -func BuildFromString(config string) (*core.Scheduler, error) { - c := &Config{} +func BuildFromString(config string, logger core.Logger) (*Config, error) { + c := NewConfig(logger) if err := gcfg.ReadStringInto(c, config); err != nil { return nil, err } - - return c.build() + return c, nil } -func (c *Config) build() (*core.Scheduler, error) { - defaults.SetDefaults(c) +// Call this only once at app init +func (c *Config) InitializeApp() error { + if c.sh == nil { + return fmt.Errorf("scheduler is not initialized yet") + } - d, err := c.buildDockerClient() + // In order to support non dynamic job types such as Local or Run using labels + // lets parse the labels and merge the job lists + dockerLabels, err := c.dockerHandler.GetDockerLabels() if err != nil { - return nil, err + return err } - sh := core.NewScheduler(c.buildLogger()) - c.buildSchedulerMiddlewares(sh) + if err := c.buildFromDockerLabels(dockerLabels); err != nil { + return err + } for name, j := range c.ExecJobs { defaults.SetDefaults(j) - - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.RunJobs { defaults.SetDefaults(j) - - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.LocalJobs { defaults.SetDefaults(j) - j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.ServiceJobs { defaults.SetDefaults(j) j.Name = name - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.buildMiddlewares() - sh.AddJob(j) - } - - return sh, nil -} - -func (c *Config) buildDockerClient() (*docker.Client, error) { - d, err := docker.NewClientFromEnv() - if err != nil { - return nil, err + c.sh.AddJob(j) } - return d, nil + return nil } -func (c *Config) buildLogger() core.Logger { - stdout := logging.NewLogBackend(os.Stdout, "", 0) - // Set the backends to be used. - logging.SetBackend(stdout) - logging.SetFormatter(logging.MustStringFormatter(logFormat)) - - return logging.MustGetLogger("ofelia") +func (c *Config) JobsCount() int { + return len(c.ExecJobs) + len(c.RunJobs) + len(c.LocalJobs) + len(c.ServiceJobs) } func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) { @@ -148,6 +123,115 @@ func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) { sh.Use(middlewares.NewMail(&c.Global.MailConfig)) } +func (c *Config) dockerLabelsUpdate(labels map[string]map[string]string) { + // Get the current labels + var parsedLabelConfig Config + parsedLabelConfig.buildFromDockerLabels(labels) + + // Calculate the delta execJobs + for name, j := range c.ExecJobs { + found := false + for newJobsName, newJob := range parsedLabelConfig.ExecJobs { + // Check if the schedule has changed + if name == newJobsName { + found = true + // There is a slight race condition were a job can be canceled / restarted with different params + // so, lets take care of it by simply restarting + // For the hash to work properly, we must fill the fields before calling it + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + if newJob.Hash() != j.Hash() { + c.logger.Debugf("Job %s has changed, restarting", name) + // Remove from the scheduler + c.sh.RemoveJob(j) + // Add the job back to the scheduler + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + // Update the job config + c.ExecJobs[name] = newJob + } + break + } + } + if !found { + c.logger.Debugf("Job %s is not found, Removing", name) + // Remove the job + c.sh.RemoveJob(j) + delete(c.ExecJobs, name) + } + } + + // Check for aditions + for newJobsName, newJob := range parsedLabelConfig.ExecJobs { + found := false + for name := range c.ExecJobs { + if name == newJobsName { + found = true + break + } + } + if !found { + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + c.ExecJobs[newJobsName] = newJob + } + } + + for name, j := range c.RunJobs { + found := false + for newJobsName, newJob := range parsedLabelConfig.RunJobs { + // Check if the schedule has changed + if name == newJobsName { + found = true + // There is a slight race condition were a job can be canceled / restarted with different params + // so, lets take care of it by simply restarting + // For the hash to work properly, we must fill the fields before calling it + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + if newJob.Hash() != j.Hash() { + // Remove from the scheduler + c.sh.RemoveJob(j) + // Add the job back to the scheduler + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + // Update the job config + c.RunJobs[name] = newJob + } + break + } + } + if !found { + // Remove the job + c.sh.RemoveJob(j) + delete(c.RunJobs, name) + } + } + + // Check for aditions + for newJobsName, newJob := range parsedLabelConfig.RunJobs { + found := false + for name := range c.RunJobs { + if name == newJobsName { + found = true + break + } + } + if !found { + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + c.RunJobs[newJobsName] = newJob + } + } +} + // ExecJobConfig contains all configuration params needed to build a ExecJob type ExecJobConfig struct { core.ExecJob `mapstructure:",squash"` diff --git a/cli/config_test.go b/cli/config_test.go index 65e04804b..140e17600 100644 --- a/cli/config_test.go +++ b/cli/config_test.go @@ -17,8 +17,16 @@ type SuiteConfig struct{} var _ = Suite(&SuiteConfig{}) +type TestLogger struct{} + +func (*TestLogger) Criticalf(format string, args ...interface{}) {} +func (*TestLogger) Debugf(format string, args ...interface{}) {} +func (*TestLogger) Errorf(format string, args ...interface{}) {} +func (*TestLogger) Noticef(format string, args ...interface{}) {} +func (*TestLogger) Warningf(format string, args ...interface{}) {} + func (s *SuiteConfig) TestBuildFromString(c *C) { - sh, err := BuildFromString(` + conf, err := BuildFromString(` [job-exec "foo"] schedule = @every 10s @@ -33,10 +41,10 @@ func (s *SuiteConfig) TestBuildFromString(c *C) { [job-service-run "bob"] schedule = @every 10s - `) + `, &TestLogger{}) c.Assert(err, IsNil) - c.Assert(sh.Jobs, HasLen, 5) + c.Assert(conf.JobsCount(), Equals, 5) } func (s *SuiteConfig) TestJobDefaultsSet(c *C) { diff --git a/cli/daemon.go b/cli/daemon.go index 1ec5ea9f9..e8659214f 100644 --- a/cli/daemon.go +++ b/cli/daemon.go @@ -10,21 +10,16 @@ import ( // DaemonCommand daemon process type DaemonCommand struct { - ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` - DockerLabelsConfig bool `short:"d" long:"docker" description:"read configurations from docker labels"` - DockerFilters []string `short:"f" long:"docker-filter" description:"filter to select docker containers"` - - config *Config - scheduler *core.Scheduler - signals chan os.Signal - done chan bool + ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` + DockerFilters []string `short:"f" long:"docker-filter" description:"filter to select docker containers. https://docs.docker.com/reference/cli/docker/container/ls/#filter"` + scheduler *core.Scheduler + signals chan os.Signal + done chan bool + Logger core.Logger } // Execute runs the daemon func (c *DaemonCommand) Execute(args []string) error { - _, err := os.Stat("/.dockerenv") - IsDockerEnv = !os.IsNotExist(err) - if err := c.boot(); err != nil { return err } @@ -41,13 +36,29 @@ func (c *DaemonCommand) Execute(args []string) error { } func (c *DaemonCommand) boot() (err error) { - if c.DockerLabelsConfig { - c.scheduler, err = BuildFromDockerLabels(c.DockerFilters...) - } else { - c.scheduler, err = BuildFromFile(c.ConfigFile) + // Always try to read the config file, as there are options such as globals or some tasks that can be specified there and not in docker + config, err := BuildFromFile(c.ConfigFile, c.Logger) + if err != nil { + c.Logger.Debugf("Config file: %v not found", c.ConfigFile) + } + scheduler := core.NewScheduler(c.Logger) + + config.sh = scheduler + config.buildSchedulerMiddlewares(scheduler) + + config.dockerHandler, err = NewDockerHandler(config, c.DockerFilters, c.Logger) + if err != nil { + return err + } + + err = config.InitializeApp() + if err != nil { + c.Logger.Criticalf("Can't start the app: %v", err) } - return + c.scheduler = config.sh + + return err } func (c *DaemonCommand) start() error { @@ -67,7 +78,7 @@ func (c *DaemonCommand) setSignals() { go func() { sig := <-c.signals - c.scheduler.Logger.Warningf( + c.Logger.Warningf( "Signal received: %s, shutting down the process\n", sig, ) @@ -81,6 +92,6 @@ func (c *DaemonCommand) shutdown() error { return nil } - c.scheduler.Logger.Warningf("Waiting running jobs.") + c.Logger.Warningf("Waiting running jobs.") return c.scheduler.Stop() } diff --git a/cli/docker-labels.go b/cli/docker_handler.go similarity index 72% rename from cli/docker-labels.go rename to cli/docker_handler.go index 663a11c6d..78bc757be 100644 --- a/cli/docker-labels.go +++ b/cli/docker_handler.go @@ -8,7 +8,8 @@ import ( "time" docker "github.com/fsouza/go-dockerclient" - "github.com/mitchellh/mapstructure" + "github.com/go-viper/mapstructure/v2" + "github.com/mcuadros/ofelia/core" ) const ( @@ -25,26 +26,71 @@ var ( errFailedToListContainers = errors.New("failed to list containers") ) -func parseFilter(filter string) (key, value string, err error) { - parts := strings.SplitN(filter, "=", 2) - if len(parts) != 2 { - return "", "", errInvalidDockerFilter +type DockerHandler struct { + dockerClient *docker.Client + notifier dockerLabelsUpdate + logger core.Logger + filters []string +} + +type dockerLabelsUpdate interface { + dockerLabelsUpdate(map[string]map[string]string) +} + +// TODO: Implement an interface so the code does not have to use third parties directly +func (c *DockerHandler) GetInternalDockerClient() *docker.Client { + return c.dockerClient +} + +func (c *DockerHandler) buildDockerClient() (*docker.Client, error) { + d, err := docker.NewClientFromEnv() + if err != nil { + return nil, err } - return parts[0], parts[1], nil + + return d, nil } -func getLabels(d *docker.Client, filterFlags []string) (map[string]map[string]string, error) { - // sleep before querying containers - // because docker not always propagating labels in time - // so ofelia app can't find it's own container - if IsDockerEnv { - time.Sleep(1 * time.Second) +func NewDockerHandler(notifier dockerLabelsUpdate, dockerFilters []string, logger core.Logger) (*DockerHandler, error) { + c := &DockerHandler{ + filters: dockerFilters, + } + var err error + c.dockerClient, err = c.buildDockerClient() + c.notifier = notifier + c.logger = logger + if err != nil { + return nil, err } + // Do a sanity check on docker + _, err = c.dockerClient.Info() + if err != nil { + return nil, err + } + + go c.watch() + return c, nil +} + +func (c *DockerHandler) watch() { + // Poll for changes + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for range ticker.C { + labels, err := c.GetDockerLabels() + // Do not print or care if there is no container up right now + if err != nil && !errors.Is(err, errNoContainersMatchingFilters) { + c.logger.Debugf("%v", err) + } + c.notifier.dockerLabelsUpdate(labels) + } +} +func (c *DockerHandler) GetDockerLabels() (map[string]map[string]string, error) { var filters = map[string][]string{ "label": {requiredLabelFilter}, } - for _, f := range filterFlags { + for _, f := range c.filters { key, value, err := parseFilter(f) if err != nil { return nil, fmt.Errorf("%w: %s", err, f) @@ -52,7 +98,7 @@ func getLabels(d *docker.Client, filterFlags []string) (map[string]map[string]st filters[key] = append(filters[key], value) } - conts, err := d.ListContainers(docker.ListContainersOptions{Filters: filters}) + conts, err := c.dockerClient.ListContainers(docker.ListContainersOptions{Filters: filters}) if err != nil { return nil, fmt.Errorf("%w: %w", errFailedToListContainers, err) } else if len(conts) == 0 { @@ -79,6 +125,14 @@ func getLabels(d *docker.Client, filterFlags []string) (map[string]map[string]st return labels, nil } +func parseFilter(filter string) (key, value string, err error) { + parts := strings.SplitN(filter, "=", 2) + if len(parts) != 2 { + return "", "", errInvalidDockerFilter + } + return parts[0], parts[1], nil +} + func (c *Config) buildFromDockerLabels(labels map[string]map[string]string) error { execJobs := make(map[string]map[string]interface{}) localJobs := make(map[string]map[string]interface{}) diff --git a/cli/docker-labels_test.go b/cli/docker_handler_test.go similarity index 78% rename from cli/docker-labels_test.go rename to cli/docker_handler_test.go index 402138a6d..db84d226a 100644 --- a/cli/docker-labels_test.go +++ b/cli/docker_handler_test.go @@ -21,6 +21,26 @@ type TestDockerSuit struct { client *docker.Client } +func buildFromDockerLabels(dockerFilters ...string) (*Config, error) { + mockLogger := &TestLogger{} + c := &Config{ + sh: core.NewScheduler(mockLogger), + } + + var err error + c.dockerHandler, err = NewDockerHandler(c, dockerFilters, mockLogger) + if err != nil { + return nil, err + } + dockerLabels, err := c.dockerHandler.GetDockerLabels() + if err != nil { + return nil, err + } + + c.buildFromDockerLabels(dockerLabels) + return c, nil +} + func (s *TestDockerSuit) SetUpTest(c *check.C) { var err error s.server, err = testing.NewServer("127.0.0.1:0", nil, nil) @@ -60,11 +80,11 @@ func (s *TestDockerSuit) TestLabelsFilterJobsCount(c *check.C) { _, err := s.startTestContainersWithLabels(containersToStartWithLabels) c.Assert(err, check.IsNil) - scheduler, err := BuildFromDockerLabels("label=" + strings.Join(filterLabel, "=")) + conf, err := buildFromDockerLabels("label=" + strings.Join(filterLabel, "=")) c.Assert(err, check.IsNil) - c.Assert(scheduler, check.NotNil) + c.Assert(conf.sh, check.NotNil) - c.Assert(scheduler.Jobs, check.HasLen, 1) + c.Assert(conf.JobsCount(), check.Equals, 1) } func (s *TestDockerSuit) TestFilterErrorsLabel(c *check.C) { @@ -79,36 +99,35 @@ func (s *TestDockerSuit) TestFilterErrorsLabel(c *check.C) { c.Assert(err, check.IsNil) { - scheduler, err := BuildFromDockerLabels() - c.Assert(errors.Is(err, errNoContainersMatchingFilters), check.Equals, true) + conf, err := buildFromDockerLabels() c.Assert(strings.Contains(err.Error(), requiredLabelFilter), check.Equals, true) - c.Assert(scheduler, check.IsNil) + c.Assert(conf, check.IsNil) } customLabelFilter := []string{"label", "test=123"} { - scheduler, err := BuildFromDockerLabels(strings.Join(customLabelFilter, "=")) + conf, err := buildFromDockerLabels(strings.Join(customLabelFilter, "=")) c.Assert(errors.Is(err, errNoContainersMatchingFilters), check.Equals, true) c.Assert(err, check.ErrorMatches, fmt.Sprintf(`.*%s:.*%s.*`, "label", requiredLabel)) c.Assert(err, check.ErrorMatches, fmt.Sprintf(`.*%s:.*%s.*`, customLabelFilter[0], customLabelFilter[1])) - c.Assert(scheduler, check.IsNil) + c.Assert(conf, check.IsNil) } { customNameFilter := []string{"name", "test-name"} - scheduler, err := BuildFromDockerLabels(strings.Join(customLabelFilter, "="), strings.Join(customNameFilter, "=")) + conf, err := buildFromDockerLabels(strings.Join(customLabelFilter, "="), strings.Join(customNameFilter, "=")) c.Assert(errors.Is(err, errNoContainersMatchingFilters), check.Equals, true) c.Assert(err, check.ErrorMatches, fmt.Sprintf(`.*%s:.*%s.*`, "label", requiredLabel)) c.Assert(err, check.ErrorMatches, fmt.Sprintf(`.*%s:.*%s.*`, customLabelFilter[0], customLabelFilter[1])) c.Assert(err, check.ErrorMatches, fmt.Sprintf(`.*%s:.*%s.*`, customNameFilter[0], customNameFilter[1])) - c.Assert(scheduler, check.IsNil) + c.Assert(conf, check.IsNil) } { customBadFilter := "label-test" - scheduler, err := BuildFromDockerLabels(customBadFilter) + conf, err := buildFromDockerLabels(customBadFilter) c.Assert(errors.Is(err, errInvalidDockerFilter), check.Equals, true) - c.Assert(scheduler, check.IsNil) + c.Assert(conf, check.IsNil) } } diff --git a/cli/validate.go b/cli/validate.go index 85e4660a1..54017ad4f 100644 --- a/cli/validate.go +++ b/cli/validate.go @@ -1,30 +1,33 @@ package cli -import "fmt" +import ( + "github.com/mcuadros/ofelia/core" +) // ValidateCommand validates the config file type ValidateCommand struct { ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` + Logger core.Logger } // Execute runs the validation command func (c *ValidateCommand) Execute(args []string) error { - fmt.Printf("Validating %q ... ", c.ConfigFile) - config, err := BuildFromFile(c.ConfigFile) + c.Logger.Debugf("Validating %q ... ", c.ConfigFile) + config, err := BuildFromFile(c.ConfigFile, c.Logger) if err != nil { - fmt.Println("ERROR") + c.Logger.Errorf("ERROR") return err } - fmt.Println("OK") - fmt.Printf("Found %d jobs:\n", len(config.Jobs)) + c.Logger.Debugf("OK") + c.Logger.Debugf("Found %d jobs", len(config.sh.CronJobs())) - for _, j := range config.Jobs { - fmt.Printf( - "- name: %s schedule: %q command: %q\n", - j.GetName(), j.GetSchedule(), j.GetCommand(), - ) - } + // for _, j := range config.sh.CronJobs() { + // c.Logger.Debugf( + // "- name: %s schedule: %q command: %q\n", + // j.GetName(), j.GetSchedule(), j.GetCommand(), + // ) + // } return nil } diff --git a/core/common.go b/core/common.go index f9fc7f1c3..32bdb54d3 100644 --- a/core/common.go +++ b/core/common.go @@ -31,6 +31,8 @@ type Job interface { GetName() string GetSchedule() string GetCommand() string + GetCronJobID() int + SetCronJobID(int) Middlewares() []Middleware Use(...Middleware) Run(*Context) error diff --git a/core/cron_utils.go b/core/cron_utils.go new file mode 100644 index 000000000..88b52d786 --- /dev/null +++ b/core/cron_utils.go @@ -0,0 +1,18 @@ +package core + +// Implement the cron logger interface +type CronUtils struct { + Logger Logger +} + +func NewCronUtils(l Logger) *CronUtils { + return &CronUtils{Logger: l} +} + +func (c *CronUtils) Info(msg string, keysAndValues ...interface{}) { + c.Logger.Debugf(msg) // TODO, pass in the keysAndValues +} + +func (c *CronUtils) Error(err error, msg string, keysAndValues ...interface{}) { + c.Logger.Errorf("msg: %v, error: %v", msg, err) // TODO, pass in the keysAndValues +} diff --git a/core/execjob.go b/core/execjob.go index 9bea17bcc..d51b59b0e 100644 --- a/core/execjob.go +++ b/core/execjob.go @@ -9,7 +9,7 @@ import ( type ExecJob struct { BareJob `mapstructure:",squash"` - Client *docker.Client `json:"-"` + Client *docker.Client `json:"-" hash:"-"` Container string User string `default:"root"` TTY bool `default:"false"` diff --git a/core/job.go b/core/job.go index 90dcc8fd2..3b9563227 100644 --- a/core/job.go +++ b/core/job.go @@ -3,6 +3,8 @@ package core import ( "sync" "sync/atomic" + + "github.com/gohugoio/hashstructure" ) type BareJob struct { @@ -14,6 +16,7 @@ type BareJob struct { running int32 lock sync.Mutex history []*Execution + cronID int } func (j *BareJob) GetName() string { @@ -28,6 +31,14 @@ func (j *BareJob) GetCommand() string { return j.Command } +func (j *BareJob) GetCronJobID() int { + return j.cronID +} + +func (j *BareJob) SetCronJobID(id int) { + j.cronID = id +} + func (j *BareJob) Running() int32 { return atomic.LoadInt32(&j.running) } @@ -39,3 +50,10 @@ func (j *BareJob) NotifyStart() { func (j *BareJob) NotifyStop() { atomic.AddInt32(&j.running, -1) } + +// Returns a hash of all the job attributes. Used to detect changes +// unexported struct fields are ignored - https://pkg.go.dev/github.com/gohugoio/hashstructure#Hash +func (j *BareJob) Hash() uint64 { + hash, _ := hashstructure.Hash(j, nil) + return hash +} diff --git a/core/scheduler.go b/core/scheduler.go index d1ef8d9df..afafdf284 100644 --- a/core/scheduler.go +++ b/core/scheduler.go @@ -5,16 +5,15 @@ import ( "fmt" "sync" - "github.com/robfig/cron" + "github.com/robfig/cron/v3" ) var ( - ErrEmptyScheduler = errors.New("unable to start a empty scheduler.") - ErrEmptySchedule = errors.New("unable to add a job with a empty schedule.") + ErrEmptyScheduler = errors.New("unable to start a empty scheduler") + ErrEmptySchedule = errors.New("unable to add a job with a empty schedule") ) type Scheduler struct { - Jobs []Job Logger Logger middlewareContainer @@ -24,9 +23,10 @@ type Scheduler struct { } func NewScheduler(l Logger) *Scheduler { + cronUtils := NewCronUtils(l) return &Scheduler{ Logger: l, - cron: cron.New(), + cron: cron.New(cron.WithLogger(cronUtils), cron.WithChain(cron.Recover(cronUtils))), } } @@ -35,7 +35,7 @@ func (s *Scheduler) AddJob(j Job) error { return ErrEmptySchedule } - err := s.cron.AddJob(j.GetSchedule(), &jobWrapper{s, j}) + id, err := s.cron.AddJob(j.GetSchedule(), &jobWrapper{s, j}) if err != nil { s.Logger.Warningf("Failed to register job %q - %q - %q", j.GetName(), j.GetCommand(), j.GetSchedule()) return err @@ -43,29 +43,30 @@ func (s *Scheduler) AddJob(j Job) error { s.Logger.Noticef("New job registered %q - %q - %q", j.GetName(), j.GetCommand(), j.GetSchedule()) - s.Jobs = append(s.Jobs, j) + j.SetCronJobID(int(id)) // Cast to int in order to avoid pushing cron external to common + j.Use(s.Middlewares()...) + s.Logger.Noticef("New job registered %q - %q - %q - ID: %v", j.GetName(), j.GetCommand(), j.GetSchedule(), id) return nil } -func (s *Scheduler) Start() error { - if len(s.Jobs) == 0 { - return ErrEmptyScheduler - } +func (s *Scheduler) RemoveJob(j Job) error { + s.Logger.Noticef("Job deregistered (will not fire again) %q - %q - %q - ID: %v", j.GetName(), j.GetCommand(), j.GetSchedule(), j.GetCronJobID()) + s.cron.Remove(cron.EntryID(j.GetCronJobID())) + return nil +} + +func (s *Scheduler) CronJobs() []cron.Entry { + return s.cron.Entries() +} - s.Logger.Debugf("Starting scheduler with %d jobs", len(s.Jobs)) +func (s *Scheduler) Start() error { + s.Logger.Debugf("Starting scheduler with %d jobs", len(s.CronJobs())) - s.mergeMiddlewares() s.isRunning = true s.cron.Start() return nil } -func (s *Scheduler) mergeMiddlewares() { - for _, j := range s.Jobs { - j.Use(s.Middlewares()...) - } -} - func (s *Scheduler) Stop() error { s.wg.Wait() s.cron.Stop() diff --git a/core/scheduler_test.go b/core/scheduler_test.go index 31d957b3b..5be8b37b2 100644 --- a/core/scheduler_test.go +++ b/core/scheduler_test.go @@ -17,7 +17,6 @@ func (s *SuiteScheduler) TestAddJob(c *C) { sc := NewScheduler(&TestLogger{}) err := sc.AddJob(job) c.Assert(err, IsNil) - c.Assert(sc.Jobs, HasLen, 1) e := sc.cron.Entries() c.Assert(e, HasLen, 1) @@ -51,7 +50,6 @@ func (s *SuiteScheduler) TestMergeMiddlewaresSame(c *C) { sc := NewScheduler(&TestLogger{}) sc.Use(mA) sc.AddJob(job) - sc.mergeMiddlewares() m := job.Middlewares() c.Assert(m, HasLen, 1) diff --git a/go.mod b/go.mod index 6afa67068..030027c93 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,13 @@ require ( github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625 github.com/docker/docker v27.5.1+incompatible github.com/fsouza/go-dockerclient v1.12.0 + github.com/go-viper/mapstructure/v2 v2.2.1 github.com/gobs/args v0.0.0-20210311043657-b8c0b223be93 + github.com/gohugoio/hashstructure v0.3.0 github.com/jessevdk/go-flags v1.6.1 github.com/mcuadros/go-defaults v1.2.0 - github.com/mitchellh/mapstructure v1.5.0 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 - github.com/robfig/cron v1.2.0 + github.com/robfig/cron/v3 v3.0.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/gcfg.v1 v1.2.3 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df diff --git a/go.sum b/go.sum index bc1583476..d1c40ebcf 100644 --- a/go.sum +++ b/go.sum @@ -24,10 +24,14 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/fsouza/go-dockerclient v1.12.0 h1:S2f2crEUbBNCFiF06kR/GvioEB8EMsb3Td/bpawD+aU= github.com/fsouza/go-dockerclient v1.12.0/go.mod h1:YWUtjg8japrqD/80L98nTtCoxQFp5B5wrSsnyeB5lFo= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gobs/args v0.0.0-20210311043657-b8c0b223be93 h1:70jFzur8/dg4E5NKFMOPLAxk4wSyGm3vQ+7PuBEoHzE= github.com/gobs/args v0.0.0-20210311043657-b8c0b223be93/go.mod h1:ZpqkpUmnBz2Jz7hMGSPRbHtYC82FP/IZ1Y7A2riYH0s= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gohugoio/hashstructure v0.3.0 h1:orHavfqnBv0ffQmobOp41Y9HKEMcjrR/8EFAzpngmGs= +github.com/gohugoio/hashstructure v0.3.0/go.mod h1:8ohPTAfQLTs2WdzB6k9etmQYclDUeNsIHGPAFejbsEA= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -47,8 +51,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= @@ -75,8 +77,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= diff --git a/ofelia.go b/ofelia.go index 731ae9a41..4a71d6cb2 100644 --- a/ofelia.go +++ b/ofelia.go @@ -6,15 +6,28 @@ import ( "github.com/jessevdk/go-flags" "github.com/mcuadros/ofelia/cli" + "github.com/mcuadros/ofelia/core" + "github.com/op/go-logging" ) var version string var build string +const logFormat = "%{time} %{color} %{shortfile} ▶ %{level} %{color:reset} %{message}" + +func buildLogger() core.Logger { + stdout := logging.NewLogBackend(os.Stdout, "", 0) + // Set the backends to be used. + logging.SetBackend(stdout) + logging.SetFormatter(logging.MustStringFormatter(logFormat)) + return logging.MustGetLogger("ofelia") +} + func main() { + logger := buildLogger() parser := flags.NewNamedParser("ofelia", flags.Default) - parser.AddCommand("daemon", "daemon process", "", &cli.DaemonCommand{}) - parser.AddCommand("validate", "validates the config file", "", &cli.ValidateCommand{}) + parser.AddCommand("daemon", "daemon process", "", &cli.DaemonCommand{Logger: logger}) + parser.AddCommand("validate", "validates the config file", "", &cli.ValidateCommand{Logger: logger}) if _, err := parser.Parse(); err != nil { if _, ok := err.(*flags.Error); ok {