Skip to content

Commit ae58a8f

Browse files
authored
feat(fxcore): Added core tasks system (#326)
1 parent 9e3a6d0 commit ae58a8f

17 files changed

+799
-304
lines changed

fxcore/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ modules:
134134
liveness:
135135
expose: true # to expose health check liveness route, disabled by default
136136
path: /livez # health check liveness route path (default /livez)
137+
tasks:
138+
expose: true # to expose tasks route, disabled by default
139+
path: /tasks/:name # tasks route path (default /tasks/:name)
137140
debug:
138141
config:
139142
expose: true # to expose debug config route

fxcore/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/ankorstore/yokai/healthcheck v1.1.0
1515
github.com/ankorstore/yokai/httpserver v1.6.0
1616
github.com/ankorstore/yokai/log v1.2.0
17-
github.com/ankorstore/yokai/trace v1.3.0
17+
github.com/ankorstore/yokai/trace v1.4.0
1818
github.com/arl/statsviz v0.6.0
1919
github.com/labstack/echo/v4 v4.13.3
2020
github.com/labstack/gommon v0.4.2

fxcore/go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ github.com/ankorstore/yokai/log v1.2.0 h1:jiuDiC0dtqIGIOsFQslUHYoFJ1qjI+rOMa6dI1
2222
github.com/ankorstore/yokai/log v1.2.0/go.mod h1:MVvUcms1AYGo0BT6l88B9KJdvtK6/qGKdgyKVXfbmyc=
2323
github.com/ankorstore/yokai/trace v1.3.0 h1:0ji32oymIcxTmH5h6GRWLo5ypwBbWrZkXRf9rWF9070=
2424
github.com/ankorstore/yokai/trace v1.3.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
25+
github.com/ankorstore/yokai/trace v1.4.0 h1:AdEQs/4TEuqOJ9p/EfsQmrtmkSG3pcmE7r/l+FQFxY8=
26+
github.com/ankorstore/yokai/trace v1.4.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
2527
github.com/arl/statsviz v0.6.0 h1:jbW1QJkEYQkufd//4NDYRSNBpwJNrdzPahF7ZmoGdyE=
2628
github.com/arl/statsviz v0.6.0/go.mod h1:0toboo+YGSUXDaS4g1D5TVS4dXs7S7YYT5J/qnW2h8s=
2729
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

fxcore/info.go

+57
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package fxcore
22

33
import (
4+
"fmt"
5+
"sort"
6+
47
"github.com/ankorstore/yokai/config"
58
"github.com/ankorstore/yokai/log"
69
"github.com/ankorstore/yokai/trace"
@@ -129,3 +132,57 @@ func (i *FxCoreModuleInfo) Data() map[string]interface{} {
129132
"extra": i.ExtraInfos,
130133
}
131134
}
135+
136+
// FxModuleInfoRegistry is the registry collecting info about registered modules.
137+
type FxModuleInfoRegistry struct {
138+
infos map[string]FxModuleInfo
139+
}
140+
141+
// FxModuleInfoRegistryParam allows injection of the required dependencies in [NewFxModuleInfoRegistry].
142+
type FxModuleInfoRegistryParam struct {
143+
fx.In
144+
Infos []any `group:"core-module-infos"`
145+
}
146+
147+
// NewFxModuleInfoRegistry returns a new [FxModuleInfoRegistry].
148+
func NewFxModuleInfoRegistry(p FxModuleInfoRegistryParam) *FxModuleInfoRegistry {
149+
infos := make(map[string]FxModuleInfo)
150+
151+
for _, info := range p.Infos {
152+
if castInfo, ok := info.(FxModuleInfo); ok {
153+
infos[castInfo.Name()] = castInfo
154+
}
155+
}
156+
157+
return &FxModuleInfoRegistry{
158+
infos: infos,
159+
}
160+
}
161+
162+
func (r *FxModuleInfoRegistry) Names() []string {
163+
names := make([]string, len(r.infos))
164+
165+
i := 0
166+
for name := range r.infos {
167+
names[i] = name
168+
i++
169+
}
170+
171+
sort.Strings(names)
172+
173+
return names
174+
}
175+
176+
// All returns a map of all registered [FxModuleInfo].
177+
func (r *FxModuleInfoRegistry) All() map[string]FxModuleInfo {
178+
return r.infos
179+
}
180+
181+
// Find returns a [FxModuleInfo] by name.
182+
func (r *FxModuleInfoRegistry) Find(name string) (FxModuleInfo, error) {
183+
if info, ok := r.infos[name]; ok {
184+
return info, nil
185+
}
186+
187+
return nil, fmt.Errorf("fx module info with name %s was not found", name)
188+
}

fxcore/info_test.go

+78-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,17 @@ import (
88
"github.com/stretchr/testify/assert"
99
)
1010

11-
func TestNewFxCoreModuleInfo(t *testing.T) {
11+
type testModuleInfo struct{}
12+
13+
func (i *testModuleInfo) Name() string {
14+
return "test"
15+
}
16+
17+
func (i *testModuleInfo) Data() map[string]interface{} {
18+
return map[string]interface{}{}
19+
}
20+
21+
func TestFxCoreModuleInfo(t *testing.T) {
1222
t.Setenv("APP_ENV", "test")
1323

1424
cfg, err := config.NewDefaultConfigFactory().Create(
@@ -54,3 +64,70 @@ func TestNewFxCoreModuleInfo(t *testing.T) {
5464
info.Data(),
5565
)
5666
}
67+
68+
func TestFxModuleInfoRegistry(t *testing.T) {
69+
t.Parallel()
70+
71+
createRegistry := func(tb testing.TB) *fxcore.FxModuleInfoRegistry {
72+
tb.Helper()
73+
74+
cfg, err := config.NewDefaultConfigFactory().Create(
75+
config.WithFilePaths("./testdata/config"),
76+
)
77+
assert.NoError(tb, err)
78+
79+
return fxcore.NewFxModuleInfoRegistry(fxcore.FxModuleInfoRegistryParam{
80+
Infos: []interface{}{
81+
&testModuleInfo{},
82+
fxcore.NewFxCoreModuleInfo(fxcore.FxCoreModuleInfoParam{
83+
Config: cfg,
84+
ExtraInfos: []fxcore.FxExtraInfo{},
85+
}),
86+
"invalid",
87+
},
88+
})
89+
}
90+
91+
t.Run("test type", func(t *testing.T) {
92+
t.Parallel()
93+
94+
registry := createRegistry(t)
95+
96+
assert.IsType(t, &fxcore.FxModuleInfoRegistry{}, registry)
97+
})
98+
99+
t.Run("test all", func(t *testing.T) {
100+
t.Parallel()
101+
102+
registry := createRegistry(t)
103+
104+
assert.Len(t, registry.All(), 2)
105+
})
106+
107+
t.Run("test names", func(t *testing.T) {
108+
t.Parallel()
109+
110+
registry := createRegistry(t)
111+
112+
assert.Equal(t, []string{fxcore.ModuleName, "test"}, registry.Names())
113+
})
114+
115+
t.Run("test find", func(t *testing.T) {
116+
t.Parallel()
117+
118+
registry := createRegistry(t)
119+
120+
testInfo, err := registry.Find("test")
121+
assert.NoError(t, err)
122+
assert.Equal(t, "test", testInfo.Name())
123+
124+
coreInfo, err := registry.Find(fxcore.ModuleName)
125+
assert.NoError(t, err)
126+
assert.Equal(t, fxcore.ModuleName, coreInfo.Name())
127+
128+
invalidInfo, err := registry.Find("invalid")
129+
assert.Error(t, err)
130+
assert.Equal(t, "fx module info with name invalid was not found", err.Error())
131+
assert.Nil(t, invalidInfo)
132+
})
133+
}

fxcore/module.go

+56-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"embed"
66
"fmt"
7+
"io"
78
"net/http"
89
"strconv"
910

@@ -37,6 +38,7 @@ const (
3738
DefaultHealthCheckStartupPath = "/healthz"
3839
DefaultHealthCheckLivenessPath = "/livez"
3940
DefaultHealthCheckReadinessPath = "/readyz"
41+
DefaultTasksPath = "/tasks"
4042
DefaultDebugConfigPath = "/debug/config"
4143
DefaultDebugPProfPath = "/debug/pprof"
4244
DefaultDebugBuildPath = "/debug/build"
@@ -63,6 +65,7 @@ var FxCoreModule = fx.Module(
6365
fxhealthcheck.FxHealthcheckModule,
6466
fx.Provide(
6567
NewFxModuleInfoRegistry,
68+
NewTaskRegistry,
6669
NewFxCore,
6770
fx.Annotate(
6871
NewFxCoreModuleInfo,
@@ -92,7 +95,8 @@ type FxCoreParam struct {
9295
Checker *healthcheck.Checker
9396
Config *config.Config
9497
Logger *log.Logger
95-
Registry *FxModuleInfoRegistry
98+
InfoRegistry *FxModuleInfoRegistry
99+
TaskRegistry *TaskRegistry
96100
MetricsRegistry *prometheus.Registry
97101
}
98102

@@ -232,7 +236,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
232236
dashboardEnabled := p.Config.GetBool("modules.core.server.dashboard.enabled")
233237

234238
// dashboard overview
235-
overviewInfo, err := p.Registry.Find(ModuleName)
239+
overviewInfo, err := p.InfoRegistry.Find(ModuleName)
236240
if err != nil {
237241
return nil, err
238242
}
@@ -248,6 +252,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
248252
overviewTraceProcessorExpose := p.Config.GetBool("modules.core.server.dashboard.overview.trace_processor")
249253

250254
// template expositions
255+
tasksExpose := p.Config.GetBool("modules.core.server.tasks.expose")
251256
metricsExpose := p.Config.GetBool("modules.core.server.metrics.expose")
252257
startupExpose := p.Config.GetBool("modules.core.server.healthcheck.startup.expose")
253258
livenessExpose := p.Config.GetBool("modules.core.server.healthcheck.liveness.expose")
@@ -260,6 +265,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
260265
modulesExpose := p.Config.GetBool("modules.core.server.debug.modules.expose")
261266

262267
// template paths
268+
tasksPath := p.Config.GetString("modules.core.server.tasks.path")
263269
metricsPath := p.Config.GetString("modules.core.server.metrics.path")
264270
startupPath := p.Config.GetString("modules.core.server.healthcheck.startup.path")
265271
livenessPath := p.Config.GetString("modules.core.server.healthcheck.liveness.path")
@@ -271,6 +277,48 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
271277
buildPath := p.Config.GetString("modules.core.server.debug.build.path")
272278
modulesPath := p.Config.GetString("modules.core.server.debug.modules.path")
273279

280+
// tasks
281+
if tasksExpose {
282+
if tasksPath == "" {
283+
tasksPath = DefaultTasksPath
284+
}
285+
286+
coreServer.POST(fmt.Sprintf("%s/:name", tasksPath), func(c echo.Context) error {
287+
ctx := c.Request().Context()
288+
289+
logger := log.CtxLogger(ctx)
290+
291+
name := c.Param("name")
292+
293+
input, err := io.ReadAll(c.Request().Body)
294+
if err != nil {
295+
logger.Error().Err(err).Str("task", name).Msg("request body read error")
296+
297+
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("cannot read request body: %v", err.Error()))
298+
}
299+
300+
err = c.Request().Body.Close()
301+
if err != nil {
302+
logger.Error().Err(err).Str("task", name).Msg("request body close error")
303+
304+
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("cannot close request body: %v", err.Error()))
305+
}
306+
307+
res := p.TaskRegistry.Run(ctx, name, input)
308+
if !res.Success {
309+
logger.Error().Err(err).Str("task", name).Msg("task execution error")
310+
311+
return c.JSON(http.StatusInternalServerError, res)
312+
}
313+
314+
logger.Info().Str("task", name).Msg("task execution success")
315+
316+
return c.JSON(http.StatusOK, res)
317+
})
318+
319+
coreServer.Logger.Debug("registered tasks handler")
320+
}
321+
274322
// metrics
275323
if metricsExpose {
276324
if metricsPath == "" {
@@ -393,14 +441,14 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
393441
coreServer.Logger.Debug("registered debug build handler")
394442
}
395443

396-
// debug modules
444+
// modules
397445
if modulesExpose || appDebug {
398446
if modulesPath == "" {
399447
modulesPath = DefaultDebugModulesPath
400448
}
401449

402450
coreServer.GET(fmt.Sprintf("%s/:name", modulesPath), func(c echo.Context) error {
403-
info, err := p.Registry.Find(c.Param("name"))
451+
info, err := p.InfoRegistry.Find(c.Param("name"))
404452
if err != nil {
405453
return echo.NewHTTPError(http.StatusNotFound, err.Error())
406454
}
@@ -466,6 +514,9 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
466514
"overviewLogOutputExpose": overviewLogOutputExpose,
467515
"overviewTraceSamplerExpose": overviewTraceSamplerExpose,
468516
"overviewTraceProcessorExpose": overviewTraceProcessorExpose,
517+
"tasksExpose": tasksExpose,
518+
"tasksPath": tasksPath,
519+
"tasksNames": p.TaskRegistry.Names(),
469520
"metricsExpose": metricsExpose,
470521
"metricsPath": metricsPath,
471522
"startupExpose": startupExpose,
@@ -486,7 +537,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
486537
"buildPath": buildPath,
487538
"modulesExpose": modulesExpose || appDebug,
488539
"modulesPath": modulesPath,
489-
"modulesNames": p.Registry.Names(),
540+
"modulesNames": p.InfoRegistry.Names(),
490541
"theme": theme,
491542
})
492543
})

0 commit comments

Comments
 (0)