Skip to content

Commit 4c04176

Browse files
committed
Refactor graceful for serial stops
1 parent efaf529 commit 4c04176

File tree

5 files changed

+581
-409
lines changed

5 files changed

+581
-409
lines changed

graceful/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
module github.com/wafer-bw/go-toolbox/graceful
22

3-
go 1.22.4
3+
go 1.24.2
44

55
require (
66
github.com/stretchr/testify v1.10.0
7-
golang.org/x/sync v0.11.0
7+
golang.org/x/sync v0.14.0
88
)
99

1010
require (

graceful/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
44
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
55
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
66
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
7-
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
8-
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
7+
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
8+
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
99
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1010
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1111
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

graceful/graceful.go

Lines changed: 114 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1-
// Package graceful provides mechanisms for starting and stopping groups of
2-
// services, primarily used to accomplish a graceful shutdown.
1+
// Package graceful provides mechanisms for asynchronously starting &
2+
// synchronously stopping trees of long running tasks enabling graceful
3+
// shutdowns.
4+
//
5+
// This package handles the common case of starting several things in parallel
6+
// then stopping them gracefully in series. It does not act as a full directed
7+
// acyclic graph (https://en.wikipedia.org/wiki/Directed_acyclic_graph).
38
package graceful
49

510
import (
11+
"cmp"
612
"context"
713
"os"
814
"os/signal"
@@ -13,73 +19,137 @@ import (
1319

1420
// Runner is capable of starting and stopping itself.
1521
type Runner interface {
16-
// Start must either complete within the lifetime of the context passed to
17-
// it respecting the context's deadline or terminate when Stop is called.
22+
// Start must terminate when the passed context is canceled, [Runner.Stop]
23+
// is called, or it completes without error (whichever happens first). It
24+
// must not panic if [Runner.Stop] was called first.
1825
Start(context.Context) error
1926

20-
// Stop must complete within the lifetime of the context passed to it
21-
// respecting the context's deadline.
27+
// Stop must terminate when the passed context is canceled or it completes
28+
// without error (whichever happens first). It must not panic if
29+
// [Runner.Start] was not called.
2230
Stop(context.Context) error
2331
}
2432

25-
// Group of [Runner] that should run concurrently together via a single call
26-
// point and stop gracefully should one of them encounter an error or the
27-
// application receive a signal.
28-
type Group []Runner
33+
type RunOption func(*RunConfig)
34+
35+
func WithStopTimeout(d time.Duration) RunOption {
36+
return func(cfg *RunConfig) {
37+
cfg.stopTimeout = d
38+
}
39+
}
40+
41+
func WithStopSignals(signals ...os.Signal) RunOption {
42+
return func(cfg *RunConfig) {
43+
cfg.signals = signals
44+
}
45+
}
46+
47+
func WithStoppingCh(ch chan<- struct{}) RunOption {
48+
return func(cfg *RunConfig) {
49+
cfg.stoppingCh = ch
50+
}
51+
}
2952

30-
// Run is a convenience method that calls [Group.Start] & [Group.Stop] in
31-
// sequence returning the error (if any) from [Group.Start] and ignoring the
32-
// error (if any) from [Group.Stop].
33-
func (g Group) Run(ctx context.Context, timeout time.Duration, signals ...os.Signal) error {
34-
defer g.Stop(ctx, timeout) //nolint:errcheck // intentionally ignored.
35-
return g.Start(ctx, signals...)
53+
type RunConfig struct {
54+
stopTimeout time.Duration
55+
signals []os.Signal
56+
stoppingCh chan<- struct{}
3657
}
3758

38-
// Start all [Runner] concurrently, blocking until either a Runner.Start call
39-
// encounters an error, one of the provided signals is received via
40-
// [signal.NotifyContext], or the context provided to it is canceled, then
41-
// returns the first non-nil error (if any) or nil if a signal was received.
59+
// Group of [Runner] which can be started in parallel & stopped in series.
4260
//
43-
// An error returned from Start does not indicate that all runners have stopped,
44-
// you must call [Group.Stop] to stop all runners.
45-
func (g Group) Start(ctx context.Context, signals ...os.Signal) error {
46-
eg, errCtx := errgroup.WithContext(ctx)
47-
signalCtx, stop := signal.NotifyContext(ctx, signals...)
48-
defer stop()
61+
// Group satisfies [Runner] and thus it can be nested within itself to create
62+
// a tree.
63+
type Group []Runner
4964

65+
// Start all [Runner] in parallel. Blocks until all [Runner.Start] have returned
66+
// normally, then returns the first non-nil errror (if any) from them.
67+
func (g Group) Start(ctx context.Context) error {
68+
eg := new(errgroup.Group)
5069
for _, r := range g {
5170
if r == nil {
5271
continue
5372
}
5473
eg.Go(func() error { return r.Start(ctx) })
5574
}
5675

57-
select {
58-
case <-errCtx.Done():
59-
return context.Cause(errCtx)
60-
case <-signalCtx.Done():
61-
return ctx.Err()
62-
}
76+
return eg.Wait()
6377
}
6478

65-
// Stop all [Runner] concurrently, blocking until all Runner.Stop calls have
66-
// returned, then returns the first non-nil error (if any) from them.
67-
//
68-
// If a Runner.Stop does not complete before timeout the context passed to
69-
// it will cancel with [ShutdownTimeoutError] as the [context.Cause].
70-
func (g Group) Stop(ctx context.Context, timeout time.Duration) error {
71-
ctx, cancel := context.WithTimeoutCause(ctx, timeout, ShutdownTimeoutError{})
72-
defer cancel()
73-
74-
eg := new(errgroup.Group)
79+
// Stop all [Runner] in series. Blocks until all [Runner.Stop] have returned
80+
// normally, then returns the first non-nil errror (if any) from them.
81+
func (g Group) Stop(ctx context.Context) error {
82+
var firstErr error
7583
for _, r := range g {
7684
if r == nil {
7785
continue
7886
}
79-
eg.Go(func() error { return r.Stop(ctx) })
87+
if err := r.Stop(ctx); err != nil && firstErr == nil {
88+
firstErr = err
89+
}
8090
}
8191

82-
return eg.Wait()
92+
return firstErr
93+
}
94+
95+
// Run starts all [Runner] in parallel and stops them in series.
96+
//
97+
// Stopping is initiated when any of the following occurs:
98+
// - the passed context is canceled
99+
// - a signal passed via [WithStopSignals] is received
100+
// - a [Runner.Start] returns an error
101+
//
102+
// When stopping is initiated, the channel passed via [WithStoppingCh] will be
103+
// closed. It will use the timeout passed via [WithStopTimeout] as the deadline
104+
// for the [context.Context] passed to each [Runner.Stop].
105+
//
106+
// The first encountered error (either [Runner.Start] error,
107+
// [context.Context.Err], or [Runner.Stop] error) will be returned. However, all
108+
// [Runner.Stop] are guaranteed to be called.
109+
func (g Group) Run(ctx context.Context, opts ...RunOption) error {
110+
cfg := &RunConfig{}
111+
for _, opt := range opts {
112+
if opt == nil {
113+
continue
114+
}
115+
opt(cfg)
116+
}
117+
118+
var startErr, runErr error
119+
startErrCh := make(chan error)
120+
go func() {
121+
if err := g.Start(ctx); err != nil {
122+
startErrCh <- err
123+
}
124+
}()
125+
126+
signalCh := make(chan os.Signal, 1)
127+
if len(cfg.signals) != 0 {
128+
signal.Notify(signalCh, cfg.signals...)
129+
}
130+
select {
131+
case <-signalCh:
132+
// received signal
133+
case err := <-startErrCh:
134+
startErr = err
135+
case <-ctx.Done():
136+
runErr = ctx.Err()
137+
}
138+
signal.Stop(signalCh)
139+
140+
if cfg.stoppingCh != nil {
141+
close(cfg.stoppingCh)
142+
}
143+
144+
stopCtx, cancel := context.WithTimeout(ctx, cfg.stopTimeout)
145+
if cfg.stopTimeout == 0 {
146+
stopCtx = ctx
147+
}
148+
defer cancel()
149+
150+
stopErr := g.Stop(stopCtx)
151+
152+
return cmp.Or(startErr, stopErr, runErr)
83153
}
84154

85155
// RunnerType is an adapter type to allow the use of ordinary start and stop
@@ -104,16 +174,3 @@ func (r RunnerType) Stop(ctx context.Context) error {
104174
}
105175
return r.StopFunc(ctx)
106176
}
107-
108-
type ShutdownTimeoutError struct{}
109-
110-
func (ShutdownTimeoutError) Error() string {
111-
return "graceful shutdown timed out"
112-
}
113-
114-
// Timeout returns true if the error is a timeout error, this allows callers
115-
// to identify the nature of the error without needing to match the error
116-
// based on equality.
117-
func (ShutdownTimeoutError) Timeout() bool {
118-
return true
119-
}

0 commit comments

Comments
 (0)