Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Dynamic docker labels handling #319

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 152 additions & 68 deletions cli/config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
package cli

import (
"os"
"fmt"

docker "github.com/fsouza/go-dockerclient"
"github.com/mcuadros/ofelia/core"
"github.com/mcuadros/ofelia/middlewares"
logging "github.com/op/go-logging"

defaults "github.com/mcuadros/go-defaults"
gcfg "gopkg.in/gcfg.v1"
)

const (
logFormat = "%{time} %{color} %{shortfile} ▶ %{level}%{color:reset} %{message}"
jobExec = "job-exec"
jobRun = "job-run"
jobServiceRun = "job-service-run"
jobLocal = "job-local"
)

var IsDockerEnv bool

// Config contains the configuration
type Config struct {
Global struct {
@@ -33,113 +28,93 @@ type Config struct {
RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"`
ServiceJobs map[string]*RunServiceConfig `gcfg:"job-service-run" mapstructure:"job-service-run,squash"`
LocalJobs map[string]*LocalJobConfig `gcfg:"job-local" mapstructure:"job-local,squash"`

sh *core.Scheduler
dockerHandler *DockerHandler
logger core.Logger
}

// BuildFromDockerLabels builds a scheduler using the config from a docker labels
func BuildFromDockerLabels(filterFlags ...string) (*core.Scheduler, error) {
func NewConfig(logger core.Logger) *Config {
// Initialize
c := &Config{}

d, err := c.buildDockerClient()
if err != nil {
return nil, err
}

labels, err := getLabels(d, filterFlags)
if err != nil {
return nil, err
}

if err := c.buildFromDockerLabels(labels); err != nil {
return nil, err
}

return c.build()
c.ExecJobs = make(map[string]*ExecJobConfig)
c.RunJobs = make(map[string]*RunJobConfig)
c.ServiceJobs = make(map[string]*RunServiceConfig)
c.LocalJobs = make(map[string]*LocalJobConfig)
c.logger = logger
defaults.SetDefaults(c)
return c
}

// BuildFromFile builds a scheduler using the config from a file
func BuildFromFile(filename string) (*core.Scheduler, error) {
c := &Config{}
if err := gcfg.ReadFileInto(c, filename); err != nil {
return nil, err
}

return c.build()
func BuildFromFile(filename string, logger core.Logger) (*Config, error) {
c := NewConfig(logger)
err := gcfg.ReadFileInto(c, filename)
return c, err
}

// BuildFromString builds a scheduler using the config from a string
func BuildFromString(config string) (*core.Scheduler, error) {
c := &Config{}
func BuildFromString(config string, logger core.Logger) (*Config, error) {
c := NewConfig(logger)
if err := gcfg.ReadStringInto(c, config); err != nil {
return nil, err
}

return c.build()
return c, nil
}

func (c *Config) build() (*core.Scheduler, error) {
defaults.SetDefaults(c)
// Call this only once at app init
func (c *Config) InitializeApp() error {
if c.sh == nil {
return fmt.Errorf("scheduler is not initialized yet")
}

d, err := c.buildDockerClient()
// In order to support non dynamic job types such as Local or Run using labels
// lets parse the labels and merge the job lists
dockerLabels, err := c.dockerHandler.GetDockerLabels()
if err != nil {
return nil, err
return err
}

sh := core.NewScheduler(c.buildLogger())
c.buildSchedulerMiddlewares(sh)
if err := c.buildFromDockerLabels(dockerLabels); err != nil {
return err
}

for name, j := range c.ExecJobs {
defaults.SetDefaults(j)

j.Client = d
j.Client = c.dockerHandler.GetInternalDockerClient()
j.Name = name
j.buildMiddlewares()
sh.AddJob(j)
c.sh.AddJob(j)
}

for name, j := range c.RunJobs {
defaults.SetDefaults(j)

j.Client = d
j.Client = c.dockerHandler.GetInternalDockerClient()
j.Name = name
j.buildMiddlewares()
sh.AddJob(j)
c.sh.AddJob(j)
}

for name, j := range c.LocalJobs {
defaults.SetDefaults(j)

j.Name = name
j.buildMiddlewares()
sh.AddJob(j)
c.sh.AddJob(j)
}

for name, j := range c.ServiceJobs {
defaults.SetDefaults(j)
j.Name = name
j.Client = d
j.Client = c.dockerHandler.GetInternalDockerClient()
j.buildMiddlewares()
sh.AddJob(j)
}

return sh, nil
}

func (c *Config) buildDockerClient() (*docker.Client, error) {
d, err := docker.NewClientFromEnv()
if err != nil {
return nil, err
c.sh.AddJob(j)
}

return d, nil
return nil
}

func (c *Config) buildLogger() core.Logger {
stdout := logging.NewLogBackend(os.Stdout, "", 0)
// Set the backends to be used.
logging.SetBackend(stdout)
logging.SetFormatter(logging.MustStringFormatter(logFormat))

return logging.MustGetLogger("ofelia")
func (c *Config) JobsCount() int {
return len(c.ExecJobs) + len(c.RunJobs) + len(c.LocalJobs) + len(c.ServiceJobs)
}

func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
@@ -148,6 +123,115 @@ func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
sh.Use(middlewares.NewMail(&c.Global.MailConfig))
}

func (c *Config) dockerLabelsUpdate(labels map[string]map[string]string) {
// Get the current labels
var parsedLabelConfig Config
parsedLabelConfig.buildFromDockerLabels(labels)

// Calculate the delta execJobs
for name, j := range c.ExecJobs {
found := false
for newJobsName, newJob := range parsedLabelConfig.ExecJobs {
// Check if the schedule has changed
if name == newJobsName {
found = true
// There is a slight race condition were a job can be canceled / restarted with different params
// so, lets take care of it by simply restarting
// For the hash to work properly, we must fill the fields before calling it
defaults.SetDefaults(newJob)
newJob.Client = c.dockerHandler.GetInternalDockerClient()
newJob.Name = newJobsName
if newJob.Hash() != j.Hash() {
c.logger.Debugf("Job %s has changed, restarting", name)
// Remove from the scheduler
c.sh.RemoveJob(j)
// Add the job back to the scheduler
newJob.buildMiddlewares()
c.sh.AddJob(newJob)
// Update the job config
c.ExecJobs[name] = newJob
}
break
}
}
if !found {
c.logger.Debugf("Job %s is not found, Removing", name)
// Remove the job
c.sh.RemoveJob(j)
delete(c.ExecJobs, name)
}
}

// Check for aditions
for newJobsName, newJob := range parsedLabelConfig.ExecJobs {
found := false
for name := range c.ExecJobs {
if name == newJobsName {
found = true
break
}
}
if !found {
defaults.SetDefaults(newJob)
newJob.Client = c.dockerHandler.GetInternalDockerClient()
newJob.Name = newJobsName
newJob.buildMiddlewares()
c.sh.AddJob(newJob)
c.ExecJobs[newJobsName] = newJob
}
}

for name, j := range c.RunJobs {
found := false
for newJobsName, newJob := range parsedLabelConfig.RunJobs {
// Check if the schedule has changed
if name == newJobsName {
found = true
// There is a slight race condition were a job can be canceled / restarted with different params
// so, lets take care of it by simply restarting
// For the hash to work properly, we must fill the fields before calling it
defaults.SetDefaults(newJob)
newJob.Client = c.dockerHandler.GetInternalDockerClient()
newJob.Name = newJobsName
if newJob.Hash() != j.Hash() {
// Remove from the scheduler
c.sh.RemoveJob(j)
// Add the job back to the scheduler
newJob.buildMiddlewares()
c.sh.AddJob(newJob)
// Update the job config
c.RunJobs[name] = newJob
}
break
}
}
if !found {
// Remove the job
c.sh.RemoveJob(j)
delete(c.RunJobs, name)
}
}

// Check for aditions
for newJobsName, newJob := range parsedLabelConfig.RunJobs {
found := false
for name := range c.RunJobs {
if name == newJobsName {
found = true
break
}
}
if !found {
defaults.SetDefaults(newJob)
newJob.Client = c.dockerHandler.GetInternalDockerClient()
newJob.Name = newJobsName
newJob.buildMiddlewares()
c.sh.AddJob(newJob)
c.RunJobs[newJobsName] = newJob
}
}
}

// ExecJobConfig contains all configuration params needed to build a ExecJob
type ExecJobConfig struct {
core.ExecJob `mapstructure:",squash"`
14 changes: 11 additions & 3 deletions cli/config_test.go
Original file line number Diff line number Diff line change
@@ -17,8 +17,16 @@ type SuiteConfig struct{}

var _ = Suite(&SuiteConfig{})

type TestLogger struct{}

func (*TestLogger) Criticalf(format string, args ...interface{}) {}
func (*TestLogger) Debugf(format string, args ...interface{}) {}
func (*TestLogger) Errorf(format string, args ...interface{}) {}
func (*TestLogger) Noticef(format string, args ...interface{}) {}
func (*TestLogger) Warningf(format string, args ...interface{}) {}

func (s *SuiteConfig) TestBuildFromString(c *C) {
sh, err := BuildFromString(`
conf, err := BuildFromString(`
[job-exec "foo"]
schedule = @every 10s

@@ -33,10 +41,10 @@ func (s *SuiteConfig) TestBuildFromString(c *C) {

[job-service-run "bob"]
schedule = @every 10s
`)
`, &TestLogger{})

c.Assert(err, IsNil)
c.Assert(sh.Jobs, HasLen, 5)
c.Assert(conf.JobsCount(), Equals, 5)
}

func (s *SuiteConfig) TestJobDefaultsSet(c *C) {
Loading
Loading