Skip to content

Commit d4ba9c7

Browse files
authored
feat: pause internal controllers (#1670)
* feat: pause internal controllers * improve controller active logic
1 parent 2bddfb6 commit d4ba9c7

File tree

3 files changed

+73
-40
lines changed

3 files changed

+73
-40
lines changed

cmd/hatchet-engine/engine/run.go

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -786,57 +786,61 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
786786
Fn: cleanupRetention,
787787
})
788788

789-
tasks, err := task.New(
790-
task.WithAlerter(sc.Alerter),
791-
task.WithMessageQueue(sc.MessageQueueV1),
792-
task.WithRepository(sc.EngineRepository),
793-
task.WithV1Repository(sc.V1),
794-
task.WithLogger(sc.Logger),
795-
task.WithPartition(p),
796-
task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
797-
task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
798-
)
789+
if isControllerActive(sc.PausedControllers, TaskController) {
790+
tasks, err := task.New(
791+
task.WithAlerter(sc.Alerter),
792+
task.WithMessageQueue(sc.MessageQueueV1),
793+
task.WithRepository(sc.EngineRepository),
794+
task.WithV1Repository(sc.V1),
795+
task.WithLogger(sc.Logger),
796+
task.WithPartition(p),
797+
task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
798+
task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
799+
)
800+
801+
if err != nil {
802+
return nil, fmt.Errorf("could not create tasks controller: %w", err)
803+
}
799804

800-
if err != nil {
801-
return nil, fmt.Errorf("could not create tasks controller: %w", err)
802-
}
805+
cleanupTasks, err := tasks.Start()
803806

804-
cleanupTasks, err := tasks.Start()
807+
if err != nil {
808+
return nil, fmt.Errorf("could not start tasks controller: %w", err)
809+
}
805810

806-
if err != nil {
807-
return nil, fmt.Errorf("could not start tasks controller: %w", err)
811+
teardown = append(teardown, Teardown{
812+
Name: "tasks controller",
813+
Fn: cleanupTasks,
814+
})
808815
}
809816

810-
teardown = append(teardown, Teardown{
811-
Name: "tasks controller",
812-
Fn: cleanupTasks,
813-
})
817+
if isControllerActive(sc.PausedControllers, OLAPController) {
818+
olap, err := olap.New(
819+
olap.WithAlerter(sc.Alerter),
820+
olap.WithMessageQueue(sc.MessageQueueV1),
821+
olap.WithRepository(sc.V1),
822+
olap.WithLogger(sc.Logger),
823+
olap.WithPartition(p),
824+
olap.WithTenantAlertManager(sc.TenantAlerter),
825+
olap.WithSamplingConfig(sc.Sampling),
826+
)
814827

815-
olap, err := olap.New(
816-
olap.WithAlerter(sc.Alerter),
817-
olap.WithMessageQueue(sc.MessageQueueV1),
818-
olap.WithRepository(sc.V1),
819-
olap.WithLogger(sc.Logger),
820-
olap.WithPartition(p),
821-
olap.WithTenantAlertManager(sc.TenantAlerter),
822-
olap.WithSamplingConfig(sc.Sampling),
823-
)
828+
if err != nil {
829+
return nil, fmt.Errorf("could not create olap controller: %w", err)
830+
}
824831

825-
if err != nil {
826-
return nil, fmt.Errorf("could not create olap controller: %w", err)
827-
}
832+
cleanupOlap, err := olap.Start()
828833

829-
cleanupOlap, err := olap.Start()
834+
if err != nil {
835+
return nil, fmt.Errorf("could not start olap controller: %w", err)
836+
}
830837

831-
if err != nil {
832-
return nil, fmt.Errorf("could not start olap controller: %w", err)
838+
teardown = append(teardown, Teardown{
839+
Name: "olap controller",
840+
Fn: cleanupOlap,
841+
})
833842
}
834843

835-
teardown = append(teardown, Teardown{
836-
Name: "olap controller",
837-
Fn: cleanupOlap,
838-
})
839-
840844
cleanup1, err := p.StartTenantWorkerPartition(ctx)
841845

842846
if err != nil {
@@ -1125,3 +1129,18 @@ func startPrometheus(l *zerolog.Logger, c shared.PrometheusConfigFile) Teardown
11251129
},
11261130
}
11271131
}
1132+
1133+
type ControllerName string
1134+
1135+
const (
1136+
OLAPController ControllerName = "olap"
1137+
TaskController ControllerName = "task"
1138+
)
1139+
1140+
func isControllerActive(pausedControllers map[string]bool, controllerName ControllerName) bool {
1141+
if isPaused, ok := pausedControllers[string(controllerName)]; !ok || !isPaused {
1142+
return true
1143+
}
1144+
1145+
return false
1146+
}

pkg/config/loader/loader.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,14 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
608608
services = strings.Split(cf.ServicesString, " ")
609609
}
610610

611+
pausedControllers := make(map[string]bool)
612+
613+
if cf.PausedControllers != "" {
614+
for _, controller := range strings.Split(cf.PausedControllers, " ") {
615+
pausedControllers[controller] = true
616+
}
617+
}
618+
611619
if cf.Runtime.Monitoring.TLSRootCAFile == "" {
612620
cf.Runtime.Monitoring.TLSRootCAFile = cf.TLS.TLSRootCAFile
613621
}
@@ -630,6 +638,7 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
630638
MessageQueue: mq,
631639
MessageQueueV1: mqv1,
632640
Services: services,
641+
PausedControllers: pausedControllers,
633642
InternalClientFactory: internalClientFactory,
634643
Logger: &l,
635644
TLSConfig: tls,

pkg/config/server/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type ServerConfigFile struct {
4747
// Used to bind the environment variable, since the array is not well supported
4848
ServicesString string `mapstructure:"servicesString" json:"servicesString,omitempty"`
4949

50+
PausedControllers string `mapstructure:"pausedControllers" json:"pausedControllers,omitempty"`
51+
5052
EnableDataRetention bool `mapstructure:"enableDataRetention" json:"enableDataRetention,omitempty" default:"true"`
5153

5254
EnableWorkerRetention bool `mapstructure:"enableWorkerRetention" json:"enableWorkerRetention,omitempty" default:"false"`
@@ -481,6 +483,8 @@ type ServerConfig struct {
481483

482484
Services []string
483485

486+
PausedControllers map[string]bool
487+
484488
EnableDataRetention bool
485489

486490
EnableWorkerRetention bool
@@ -548,6 +552,7 @@ func BindAllEnv(v *viper.Viper) {
548552
_ = v.BindEnv("runtime.grpcRateLimit", "SERVER_GRPC_RATE_LIMIT")
549553
_ = v.BindEnv("runtime.shutdownWait", "SERVER_SHUTDOWN_WAIT")
550554
_ = v.BindEnv("servicesString", "SERVER_SERVICES")
555+
_ = v.BindEnv("pausedControllers", "SERVER_PAUSED_CONTROLLERS")
551556
_ = v.BindEnv("enableDataRetention", "SERVER_ENABLE_DATA_RETENTION")
552557
_ = v.BindEnv("enableWorkerRetention", "SERVER_ENABLE_WORKER_RETENTION")
553558
_ = v.BindEnv("runtime.enforceLimits", "SERVER_ENFORCE_LIMITS")

0 commit comments

Comments
 (0)