@@ -18,15 +18,16 @@ package builder
18
18
19
19
import (
20
20
"context"
21
+ "fmt"
21
22
"strings"
22
- "sync"
23
23
24
24
"github.com/containerd/accelerated-container-image/cmd/convertor/database"
25
25
"github.com/containerd/containerd/reference"
26
26
"github.com/containerd/containerd/remotes/docker"
27
27
v1 "github.com/opencontainers/image-spec/specs-go/v1"
28
28
"github.com/pkg/errors"
29
29
"github.com/sirupsen/logrus"
30
+ "golang.org/x/sync/errgroup"
30
31
)
31
32
32
33
type Builder interface {
@@ -95,112 +96,93 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
95
96
alreadyConverted := make ([]chan * v1.Descriptor , b .layers )
96
97
downloaded := make ([]chan error , b .layers )
97
98
converted := make ([]chan error , b .layers )
98
- var uploaded sync.WaitGroup
99
-
100
- errCh := make (chan error )
101
- defer close (errCh )
102
- ctx , cancel := context .WithCancel (ctx )
103
- defer cancel ()
104
-
105
- // collect error and kill all builder goroutines
106
- var retErr error
107
- retErr = nil
108
- go func () {
109
- select {
110
- case <- ctx .Done ():
111
- case retErr = <- errCh :
112
- }
113
- if retErr != nil {
114
- cancel ()
115
- }
116
- }()
99
+ // Errgroups will close the context after wait returns so the operations need their own
100
+ // derived context.
101
+ g , rctx := errgroup .WithContext (ctx )
117
102
118
103
for i := 0 ; i < b .layers ; i ++ {
119
- downloaded [i ] = make (chan error )
120
- converted [i ] = make (chan error )
121
- alreadyConverted [i ] = make (chan * v1.Descriptor )
104
+ idx := i
105
+ downloaded [idx ] = make (chan error )
106
+ converted [idx ] = make (chan error )
107
+ alreadyConverted [idx ] = make (chan * v1.Descriptor )
122
108
123
109
// deduplication Goroutine
124
- go func (idx int ) {
110
+ g . Go ( func () error {
125
111
defer close (alreadyConverted [idx ])
126
112
// try to find chainID -> converted digest conversion if available
127
- desc , err := b .engine .CheckForConvertedLayer (ctx , idx )
113
+ desc , err := b .engine .CheckForConvertedLayer (rctx , idx )
128
114
if err != nil {
129
115
// in the event of failure fallback to regular process
130
- return
116
+ return nil
131
117
}
132
118
alreadyConverted [idx ] <- & desc
133
- }(i )
119
+ return nil
120
+ })
134
121
135
122
// download goroutine
136
- go func (idx int ) {
123
+ g . Go ( func () error {
137
124
var cachedLayer * v1.Descriptor
138
125
select {
139
- case <- ctx .Done ():
126
+ case <- rctx .Done ():
140
127
case cachedLayer = <- alreadyConverted [idx ]:
141
128
}
142
129
143
130
defer close (downloaded [idx ])
144
131
if cachedLayer != nil {
145
132
// download the converted layer
146
- err := b .engine .DownloadConvertedLayer (ctx , idx , * cachedLayer )
133
+ err := b .engine .DownloadConvertedLayer (rctx , idx , * cachedLayer )
147
134
if err == nil {
148
135
logrus .Infof ("downloaded cached layer %d" , idx )
149
- sendToChannel (ctx , downloaded [idx ], nil )
150
- return
136
+ return err
151
137
}
152
138
logrus .Infof ("failed to download cached layer %d falling back to conversion : %s" , idx , err )
153
139
}
154
140
155
- if err := b .engine .DownloadLayer (ctx , idx ); err != nil {
156
- sendToChannel (ctx , errCh , errors .Wrapf (err , "failed to download layer %d" , idx ))
157
- return
141
+ if err := b .engine .DownloadLayer (rctx , idx ); err != nil {
142
+ return err
158
143
}
159
144
logrus .Infof ("downloaded layer %d" , idx )
160
- sendToChannel (ctx , downloaded [idx ], nil )
161
- }(i )
145
+ sendToChannel (rctx , downloaded [idx ], nil )
146
+ return nil
147
+ })
162
148
163
149
// convert goroutine
164
- go func (idx int ) {
150
+ g . Go ( func () error {
165
151
defer close (converted [idx ])
166
- if waitForChannel (ctx , downloaded [idx ]); ctx .Err () != nil {
167
- return
152
+ if waitForChannel (rctx , downloaded [idx ]); rctx .Err () != nil {
153
+ return rctx . Err ()
168
154
}
169
155
if idx > 0 {
170
- if waitForChannel (ctx , converted [idx - 1 ]); ctx .Err () != nil {
171
- return
156
+ if waitForChannel (rctx , converted [idx - 1 ]); rctx .Err () != nil {
157
+ return rctx . Err ()
172
158
}
173
159
}
174
- if err := b .engine .BuildLayer (ctx , idx ); err != nil {
175
- sendToChannel (ctx , errCh , errors .Wrapf (err , "failed to convert layer %d" , idx ))
176
- return
160
+ if err := b .engine .BuildLayer (rctx , idx ); err != nil {
161
+ return fmt .Errorf ("failed to convert layer %d: %w" , idx , err )
177
162
}
178
163
logrus .Infof ("layer %d converted" , idx )
179
164
// send to upload(idx) and convert(idx+1) once each
180
- sendToChannel (ctx , converted [idx ], nil )
165
+ sendToChannel (rctx , converted [idx ], nil )
181
166
if idx + 1 < b .layers {
182
- sendToChannel (ctx , converted [idx ], nil )
167
+ sendToChannel (rctx , converted [idx ], nil )
183
168
}
184
- }(i )
185
-
186
- // upload goroutine
187
- uploaded .Add (1 )
188
- go func (idx int ) {
189
- defer uploaded .Done ()
190
- if waitForChannel (ctx , converted [idx ]); ctx .Err () != nil {
191
- return
169
+ return nil
170
+ })
171
+
172
+ g .Go (func () error {
173
+ if waitForChannel (rctx , converted [idx ]); rctx .Err () != nil {
174
+ return rctx .Err ()
192
175
}
193
- if err := b .engine .UploadLayer (ctx , idx ); err != nil {
194
- sendToChannel (ctx , errCh , errors .Wrapf (err , "failed to upload layer %d" , idx ))
195
- return
176
+ if err := b .engine .UploadLayer (rctx , idx ); err != nil {
177
+ return fmt .Errorf ("failed to upload layer %d: %w" , idx , err )
196
178
}
197
- b .engine .StoreConvertedLayerDetails (ctx , idx )
179
+ b .engine .StoreConvertedLayerDetails (rctx , idx )
198
180
logrus .Infof ("layer %d uploaded" , idx )
199
- }(i )
181
+ return nil
182
+ })
200
183
}
201
- uploaded .Wait ()
202
- if retErr != nil {
203
- return retErr
184
+ if err := g .Wait (); err != nil {
185
+ return err
204
186
}
205
187
206
188
if err := b .engine .UploadImage (ctx ); err != nil {
0 commit comments