@@ -20,28 +20,33 @@ import (
20
20
"context"
21
21
"crypto/tls"
22
22
"crypto/x509"
23
+ "encoding/json"
23
24
"fmt"
25
+ "io"
24
26
"net"
25
27
"net/http"
26
28
"os"
27
29
"path/filepath"
28
30
"runtime"
29
31
"strings"
32
+ "sync"
33
+ "sync/atomic"
30
34
"time"
31
35
32
36
"github.com/containerd/accelerated-container-image/cmd/convertor/database"
37
+ "github.com/containerd/containerd/images"
38
+ "github.com/containerd/containerd/log"
39
+ "github.com/containerd/containerd/platforms"
33
40
"github.com/containerd/containerd/reference"
41
+ "github.com/containerd/containerd/remotes"
34
42
"github.com/containerd/containerd/remotes/docker"
43
+ "github.com/opencontainers/go-digest"
35
44
v1 "github.com/opencontainers/image-spec/specs-go/v1"
36
45
"github.com/pkg/errors"
37
46
"github.com/sirupsen/logrus"
38
47
"golang.org/x/sync/errgroup"
39
48
)
40
49
41
- type Builder interface {
42
- Build (ctx context.Context ) error
43
- }
44
-
45
50
type BuilderOptions struct {
46
51
Ref string
47
52
TargetRef string
@@ -57,18 +62,214 @@ type BuilderOptions struct {
57
62
Reserve bool
58
63
NoUpload bool
59
64
DumpManifest bool
65
+
66
+ // ConcurrencyLimit limits the number of manifests that can be built at once
67
+ // 0 means no limit
68
+ ConcurrencyLimit int
60
69
}
61
70
62
- type overlaybdBuilder struct {
63
- layers int
64
- config v1.Image
65
- engine builderEngine
71
+ type graphBuilder struct {
72
+ // required
73
+ Resolver remotes.Resolver
74
+
75
+ // options
76
+ BuilderOptions
77
+
78
+ // private
79
+ fetcher remotes.Fetcher
80
+ pusher remotes.Pusher
81
+ tagPusher remotes.Pusher
82
+ group * errgroup.Group
83
+ sem chan struct {}
84
+ id atomic.Int32
85
+ }
86
+
87
+ func (b * graphBuilder ) Build (ctx context.Context ) error {
88
+ fetcher , err := b .Resolver .Fetcher (ctx , b .Ref )
89
+ if err != nil {
90
+ return fmt .Errorf ("failed to obtain new fetcher: %w" , err )
91
+ }
92
+ pusher , err := b .Resolver .Pusher (ctx , b .TargetRef + "@" ) // append '@' to avoid tag
93
+ if err != nil {
94
+ return fmt .Errorf ("failed to obtain new pusher: %w" , err )
95
+ }
96
+ tagPusher , err := b .Resolver .Pusher (ctx , b .TargetRef ) // append '@' to avoid tag
97
+ if err != nil {
98
+ return fmt .Errorf ("failed to obtain new tag pusher: %w" , err )
99
+ }
100
+ b .fetcher = fetcher
101
+ b .pusher = pusher
102
+ b .tagPusher = tagPusher
103
+ _ , src , err := b .Resolver .Resolve (ctx , b .Ref )
104
+ if err != nil {
105
+ return fmt .Errorf ("failed to resolve: %w" , err )
106
+ }
107
+
108
+ g , gctx := errgroup .WithContext (ctx )
109
+ b .group = g
110
+ if b .ConcurrencyLimit > 0 {
111
+ b .sem = make (chan struct {}, b .ConcurrencyLimit )
112
+ }
113
+ g .Go (func () error {
114
+ target , err := b .process (gctx , src , true )
115
+ if err != nil {
116
+ return fmt .Errorf ("failed to build %q: %w" , src .Digest , err )
117
+ }
118
+ log .G (gctx ).Infof ("converted to %q, digest: %q" , b .TargetRef , target .Digest )
119
+ return nil
120
+ })
121
+ return g .Wait ()
66
122
}
67
123
68
- func NewOverlayBDBuilder (ctx context.Context , opt BuilderOptions ) (Builder , error ) {
124
+ func (b * graphBuilder ) process (ctx context.Context , src v1.Descriptor , tag bool ) (v1.Descriptor , error ) {
125
+ switch src .MediaType {
126
+ case v1 .MediaTypeImageManifest , images .MediaTypeDockerSchema2Manifest :
127
+ return b .buildOne (ctx , src , tag )
128
+ case v1 .MediaTypeImageIndex , images .MediaTypeDockerSchema2ManifestList :
129
+ var index v1.Index
130
+ rc , err := b .fetcher .Fetch (ctx , src )
131
+ if err != nil {
132
+ return v1.Descriptor {}, fmt .Errorf ("failed to fetch index: %w" , err )
133
+ }
134
+ defer rc .Close ()
135
+ indexBytes , err := io .ReadAll (rc )
136
+ if err != nil {
137
+ return v1.Descriptor {}, fmt .Errorf ("failed to read index: %w" , err )
138
+ }
139
+ if err := json .Unmarshal (indexBytes , & index ); err != nil {
140
+ return v1.Descriptor {}, fmt .Errorf ("failed to unmarshal index: %w" , err )
141
+ }
142
+ var wg sync.WaitGroup
143
+ for _i , _m := range index .Manifests {
144
+ i := _i
145
+ m := _m
146
+ wg .Add (1 )
147
+ b .group .Go (func () error {
148
+ defer wg .Done ()
149
+ target , err := b .process (ctx , m , false )
150
+ if err != nil {
151
+ return fmt .Errorf ("failed to build %q: %w" , m .Digest , err )
152
+ }
153
+ index .Manifests [i ] = target
154
+ return nil
155
+ })
156
+ }
157
+ wg .Wait ()
158
+ if ctx .Err () != nil {
159
+ return v1.Descriptor {}, ctx .Err ()
160
+ }
161
+
162
+ // upload index
163
+ indexBytes , err = json .Marshal (index )
164
+ if err != nil {
165
+ return v1.Descriptor {}, fmt .Errorf ("failed to marshal index: %w" , err )
166
+ }
167
+ expected := src
168
+ expected .Digest = digest .FromBytes (indexBytes )
169
+ expected .Size = int64 (len (indexBytes ))
170
+ var pusher remotes.Pusher
171
+ if tag {
172
+ pusher = b .tagPusher
173
+ } else {
174
+ pusher = b .pusher
175
+ }
176
+ if err := uploadBytes (ctx , pusher , expected , indexBytes ); err != nil {
177
+ return v1.Descriptor {}, fmt .Errorf ("failed to upload index: %w" , err )
178
+ }
179
+ return expected , nil
180
+ default :
181
+ return v1.Descriptor {}, fmt .Errorf ("unsupported media type %q" , src .MediaType )
182
+ }
183
+ }
184
+
185
+ func (b * graphBuilder ) buildOne (ctx context.Context , src v1.Descriptor , tag bool ) (v1.Descriptor , error ) {
186
+ if b .sem != nil {
187
+ select {
188
+ case <- ctx .Done ():
189
+ return v1.Descriptor {}, ctx .Err ()
190
+ case b .sem <- struct {}{}:
191
+ }
192
+ }
193
+ defer func () {
194
+ if b .sem != nil {
195
+ select {
196
+ case <- ctx .Done ():
197
+ case <- b .sem :
198
+ }
199
+ }
200
+ }()
201
+ id := b .id .Add (1 )
202
+
203
+ var platform string
204
+ if src .Platform == nil {
205
+ platform = ""
206
+ } else {
207
+ platform = platforms .Format (* src .Platform )
208
+ ctx = log .WithLogger (ctx , log .G (ctx ).WithField ("platform" , platform ))
209
+ }
210
+ workdir := filepath .Join (b .WorkDir , fmt .Sprintf ("%d-%s-%s" , id , strings .ReplaceAll (platform , "/" , "_" ), src .Digest .Encoded ()))
211
+ log .G (ctx ).Infof ("building %s ..." , workdir )
212
+
213
+ // init build engine
214
+ manifest , config , err := fetchManifestAndConfig (ctx , b .fetcher , src )
215
+ if err != nil {
216
+ return v1.Descriptor {}, fmt .Errorf ("failed to fetch manifest and config: %w" , err )
217
+ }
218
+ var pusher remotes.Pusher
219
+ if tag {
220
+ pusher = b .tagPusher
221
+ } else {
222
+ pusher = b .pusher
223
+ }
224
+ engineBase := & builderEngineBase {
225
+ resolver : b .Resolver ,
226
+ fetcher : b .fetcher ,
227
+ pusher : pusher ,
228
+ manifest : * manifest ,
229
+ config : * config ,
230
+ inputDesc : src ,
231
+ }
232
+ engineBase .workDir = workdir
233
+ engineBase .oci = b .OCI
234
+ engineBase .mkfs = b .Mkfs
235
+ engineBase .vsize = b .Vsize
236
+ engineBase .db = b .DB
237
+ refspec , err := reference .Parse (b .Ref )
238
+ if err != nil {
239
+ return v1.Descriptor {}, err
240
+ }
241
+ engineBase .host = refspec .Hostname ()
242
+ engineBase .repository = strings .TrimPrefix (refspec .Locator , engineBase .host + "/" )
243
+ engineBase .reserve = b .Reserve
244
+ engineBase .noUpload = b .NoUpload
245
+ engineBase .dumpManifest = b .DumpManifest
246
+
247
+ var engine builderEngine
248
+ switch b .Engine {
249
+ case Overlaybd :
250
+ engine = NewOverlayBDBuilderEngine (engineBase )
251
+ case TurboOCI :
252
+ engine = NewTurboOCIBuilderEngine (engineBase )
253
+ }
254
+
255
+ // build
256
+ builder := & overlaybdBuilder {
257
+ layers : len (engineBase .manifest .Layers ),
258
+ engine : engine ,
259
+ }
260
+ desc , err := builder .Build (ctx )
261
+ if err != nil {
262
+ return v1.Descriptor {}, fmt .Errorf ("failed to build %s: %w" , workdir , err )
263
+ }
264
+ src .Digest = desc .Digest
265
+ src .Size = desc .Size
266
+ return src , nil
267
+ }
268
+
269
+ func Build (ctx context.Context , opt BuilderOptions ) error {
69
270
tlsConfig , err := loadTLSConfig (opt .CertOption )
70
271
if err != nil {
71
- return nil , fmt .Errorf ("failed to load certifications: %w" , err )
272
+ return fmt .Errorf ("failed to load certifications: %w" , err )
72
273
}
73
274
transport := & http.Transport {
74
275
DialContext : (& net.Dialer {
@@ -106,41 +307,21 @@ func NewOverlayBDBuilder(ctx context.Context, opt BuilderOptions) (Builder, erro
106
307
}),
107
308
),
108
309
})
109
- engineBase , err := getBuilderEngineBase (ctx , resolver , opt .Ref , opt .TargetRef )
110
- if err != nil {
111
- return nil , err
112
- }
113
- engineBase .workDir = opt .WorkDir
114
- engineBase .oci = opt .OCI
115
- engineBase .mkfs = opt .Mkfs
116
- engineBase .vsize = opt .Vsize
117
- engineBase .db = opt .DB
118
310
119
- refspec , err := reference .Parse (opt .Ref )
120
- if err != nil {
121
- return nil , err
122
- }
123
- engineBase .host = refspec .Hostname ()
124
- engineBase .repository = strings .TrimPrefix (refspec .Locator , engineBase .host + "/" )
125
- engineBase .reserve = opt .Reserve
126
- engineBase .noUpload = opt .NoUpload
127
- engineBase .dumpManifest = opt .DumpManifest
311
+ return (& graphBuilder {
312
+ Resolver : resolver ,
313
+ BuilderOptions : opt ,
314
+ }).Build (ctx )
315
+ }
128
316
129
- var engine builderEngine
130
- switch opt .Engine {
131
- case Overlaybd :
132
- engine = NewOverlayBDBuilderEngine (engineBase )
133
- case TurboOCI :
134
- engine = NewTurboOCIBuilderEngine (engineBase )
135
- }
136
- return & overlaybdBuilder {
137
- layers : len (engineBase .manifest .Layers ),
138
- engine : engine ,
139
- config : engineBase .config ,
140
- }, nil
317
+ type overlaybdBuilder struct {
318
+ layers int
319
+ engine builderEngine
141
320
}
142
321
143
- func (b * overlaybdBuilder ) Build (ctx context.Context ) error {
322
+ // Build return a descriptor of the converted target, as the caller may need it
323
+ // to tag or compose an index
324
+ func (b * overlaybdBuilder ) Build (ctx context.Context ) (v1.Descriptor , error ) {
144
325
defer b .engine .Cleanup ()
145
326
alreadyConverted := make ([]chan * v1.Descriptor , b .layers )
146
327
downloaded := make ([]chan error , b .layers )
@@ -150,7 +331,7 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
150
331
// when errors are encountered fallback to regular conversion
151
332
if convertedDesc , err := b .engine .CheckForConvertedManifest (ctx ); err == nil && convertedDesc .Digest != "" {
152
333
logrus .Infof ("Image found already converted in registry with digest %s" , convertedDesc .Digest )
153
- return nil
334
+ return convertedDesc , nil
154
335
}
155
336
156
337
// Errgroups will close the context after wait returns so the operations need their own
@@ -244,15 +425,16 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
244
425
})
245
426
}
246
427
if err := g .Wait (); err != nil {
247
- return err
428
+ return v1. Descriptor {}, err
248
429
}
249
430
250
- if err := b .engine .UploadImage (ctx ); err != nil {
251
- return errors .Wrap (err , "failed to upload manifest or config" )
431
+ targetDesc , err := b .engine .UploadImage (ctx )
432
+ if err != nil {
433
+ return v1.Descriptor {}, errors .Wrap (err , "failed to upload manifest or config" )
252
434
}
253
435
b .engine .StoreConvertedManifestDetails (ctx )
254
436
logrus .Info ("convert finished" )
255
- return nil
437
+ return targetDesc , nil
256
438
}
257
439
258
440
// block until ctx.Done() or sent
0 commit comments