Skip to content

Commit 922e2b9

Browse files
committed
Replace S3 retry logic with AWS S3 Rertryer
1 parent 0fab2e9 commit 922e2b9

File tree

4 files changed

+133
-157
lines changed

4 files changed

+133
-157
lines changed

cli/print.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,39 +28,39 @@ func printFinalStats(syncGroup *pipeline.Group, status syncStatus) {
2828
dur := time.Since(syncGroup.StartTime).Seconds()
2929
for _, val := range syncGroup.GetStepsInfo() {
3030
log.WithFields(logrus.Fields{
31-
"stepNum": val.Num,
32-
"stepName": val.Name,
33-
"InputObj": val.Stats.Input,
34-
"OutputObj": val.Stats.Output,
35-
"ErrorObj": val.Stats.Error,
36-
"InputObjSpeed": float64(val.Stats.Input)/dur,
37-
"OutputObjSpeed": float64(val.Stats.Output)/dur,
31+
"stepNum": val.Num,
32+
"stepName": val.Name,
33+
"InputObj": val.Stats.Input,
34+
"OutputObj": val.Stats.Output,
35+
"ErrorObj": val.Stats.Error,
36+
"InputObjSpeed": float64(val.Stats.Input) / dur,
37+
"OutputObjSpeed": float64(val.Stats.Output) / dur,
3838
}).Info("Pipeline step finished")
3939
}
4040
log.WithFields(logrus.Fields{
41-
"durationSec": time.Since(syncGroup.StartTime).Seconds(),
41+
"durationSec": time.Since(syncGroup.StartTime).Seconds(),
4242
}).Infof("Duration: %s", time.Since(syncGroup.StartTime).String())
4343

4444
switch status {
4545
case syncStatusOk:
4646
log.WithFields(logrus.Fields{
47-
"status": status,
47+
"status": status,
4848
}).Infof("Sync Done")
4949
case syncStatusFailed:
5050
log.WithFields(logrus.Fields{
51-
"status": status,
51+
"status": status,
5252
}).Error("Sync Failed")
5353
case syncStatusAborted:
5454
log.WithFields(logrus.Fields{
55-
"status": status,
55+
"status": status,
5656
}).Warnf("Sync Aborted")
5757
case syncStatusConfError:
5858
log.WithFields(logrus.Fields{
59-
"status": status,
59+
"status": status,
6060
}).Errorf("Sync Configuration error")
6161
default:
6262
log.WithFields(logrus.Fields{
63-
"status": status,
63+
"status": status,
6464
}).Warnf("Sync Unknown status")
6565
}
6666
}

pipeline/collection/misc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ var Logger pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-ch
2727
for obj := range input {
2828
if ok {
2929
cfg.WithFields(logrus.Fields{
30-
"key": *obj.Key,
31-
"size": len(*obj.Content),
30+
"key": *obj.Key,
31+
"size": len(*obj.Content),
3232
"Content-Type": *obj.ContentType,
3333
}).Infof("Sync file")
3434
output <- obj

storage/s3/retryer.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package s3
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws/request"
5+
"time"
6+
)
7+
8+
// Retryer implements basic retry logic
9+
//You can implement the request.Retryer interface.
10+
type Retryer struct {
11+
// RetryCnt is the number of max retries that will be performed.
12+
// By default, this is zero.
13+
RetryCnt uint
14+
15+
// RetryDelay is the minimum retry delay after which retry will be performed.
16+
// If not set, the value is 0ns.
17+
RetryDelay time.Duration
18+
}
19+
20+
// MaxRetries returns the number of maximum returns the service will use to make
21+
// an individual API request.
22+
func (d Retryer) MaxRetries() int {
23+
return int(d.RetryCnt)
24+
}
25+
26+
// RetryRules returns the delay duration before retrying this request again
27+
func (d Retryer) RetryRules(r *request.Request) time.Duration {
28+
29+
// if number of max retries is zero, no retries will be performed.
30+
if d.RetryCnt == 0 {
31+
return 0
32+
}
33+
34+
return d.RetryDelay
35+
}
36+
37+
// ShouldRetry returns true if the request should be retried.
38+
func (d Retryer) ShouldRetry(r *request.Request) bool {
39+
40+
// ShouldRetry returns false if number of max retries is 0.
41+
if d.RetryCnt == 0 {
42+
return false
43+
}
44+
45+
// If one of the other handlers already set the retry state
46+
// we don't want to override it based on the service's state
47+
if r.Retryable != nil {
48+
return *r.Retryable
49+
}
50+
51+
return r.IsErrorRetryable() || r.IsErrorThrottle() || true
52+
}

storage/s3/s3.go

Lines changed: 66 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ type S3Storage struct {
3333
// NewS3Storage return new configured S3 storage.
3434
//
3535
// You should always create new storage with this constructor.
36-
func NewS3Storage(awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryInterval time.Duration) *S3Storage {
36+
func NewS3Storage(awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration) *S3Storage {
3737
sess := session.Must(session.NewSessionWithOptions(session.Options{
3838
SharedConfigState: session.SharedConfigEnable,
3939
}))
4040
sess.Config.S3ForcePathStyle = aws.Bool(true)
4141
sess.Config.Region = aws.String(awsRegion)
42+
sess.Config.Retryer = &Retryer{RetryCnt: retryCnt, RetryDelay: retryDelay}
4243

4344
if awsAccessKey != "" || awsSecretKey != "" {
4445
sess.Config.Credentials = credentials.NewStaticCredentials(awsAccessKey, awsSecretKey, awsToken)
@@ -66,7 +67,7 @@ func NewS3Storage(awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, buc
6667
prefix: prefix,
6768
keysPerReq: keysPerReq,
6869
retryCnt: retryCnt,
69-
retryInterval: retryInterval,
70+
retryInterval: retryDelay,
7071
ctx: context.TODO(),
7172
rlBucket: ratelimit.NewFakeBucket(),
7273
}
@@ -107,30 +108,20 @@ func (st *S3Storage) List(output chan<- *storage.Object) error {
107108
return !lastPage // continue paging
108109
}
109110

110-
for i := uint(0); ; i++ {
111-
input := &s3.ListObjectsInput{
112-
Bucket: st.awsBucket,
113-
Prefix: aws.String(st.prefix),
114-
MaxKeys: aws.Int64(st.keysPerReq),
115-
EncodingType: aws.String(s3.EncodingTypeUrl),
116-
Marker: st.listMarker,
117-
}
111+
input := &s3.ListObjectsInput{
112+
Bucket: st.awsBucket,
113+
Prefix: aws.String(st.prefix),
114+
MaxKeys: aws.Int64(st.keysPerReq),
115+
EncodingType: aws.String(s3.EncodingTypeUrl),
116+
Marker: st.listMarker,
117+
}
118118

119-
err := st.awsSvc.ListObjectsPagesWithContext(st.ctx, input, listObjectsFn)
120-
if err == nil {
121-
storage.Log.Debugf("Listing bucket finished")
122-
return nil
123-
} else if storage.IsAwsContextCanceled(err) {
124-
return err
125-
} else if (err != nil) && (i < st.retryCnt) {
126-
storage.Log.Debugf("S3 listing failed with error: %s", err)
127-
time.Sleep(st.retryInterval)
128-
continue
129-
} else if (err != nil) && (i == st.retryCnt) {
130-
storage.Log.Debugf("S3 listing failed with error: %s", err)
131-
return err
132-
}
119+
if err := st.awsSvc.ListObjectsPagesWithContext(st.ctx, input, listObjectsFn); err != nil {
120+
return err
133121
}
122+
storage.Log.Debugf("Listing bucket finished")
123+
return nil
124+
134125
}
135126

136127
// PutObject saves object to S3.
@@ -153,22 +144,8 @@ func (st *S3Storage) PutObject(obj *storage.Object) error {
153144
StorageClass: obj.StorageClass,
154145
}
155146

156-
for i := uint(0); ; i++ {
157-
_, err := st.awsSvc.PutObjectWithContext(st.ctx, input)
158-
if storage.IsAwsContextCanceled(err) {
159-
return err
160-
} else if err == nil {
161-
break
162-
} else if (err != nil) && (i < st.retryCnt) {
163-
storage.Log.Debugf("S3 obj uploading failed with error: %s", err)
164-
time.Sleep(st.retryInterval)
165-
if _, err := rlReader.Seek(0,0); err != nil {
166-
return err
167-
}
168-
continue
169-
} else if (err != nil) && (i == st.retryCnt) {
170-
return err
171-
}
147+
if _, err := st.awsSvc.PutObjectWithContext(st.ctx, input); err != nil {
148+
return err
172149
}
173150

174151
if obj.AccessControlPolicy != nil {
@@ -178,19 +155,8 @@ func (st *S3Storage) PutObject(obj *storage.Object) error {
178155
AccessControlPolicy: obj.AccessControlPolicy,
179156
}
180157

181-
for i := uint(0); ; i++ {
182-
_, err := st.awsSvc.PutObjectAclWithContext(st.ctx, inputAcl)
183-
if storage.IsAwsContextCanceled(err) {
184-
return err
185-
} else if err == nil {
186-
break
187-
} else if (err != nil) && (i < st.retryCnt) {
188-
storage.Log.Debugf("S3 ACL uploading failed with error: %s", err)
189-
time.Sleep(st.retryInterval)
190-
continue
191-
} else if (err != nil) && (i == st.retryCnt) {
192-
return err
193-
}
158+
if _, err := st.awsSvc.PutObjectAclWithContext(st.ctx, inputAcl); err != nil {
159+
return err
194160
}
195161
}
196162

@@ -205,44 +171,29 @@ func (st *S3Storage) GetObjectContent(obj *storage.Object) error {
205171
VersionId: obj.VersionId,
206172
}
207173

208-
for i := uint(0); ; i++ {
209-
result, err := st.awsSvc.GetObjectWithContext(st.ctx, input)
210-
if storage.IsAwsContextCanceled(err) {
211-
return err
212-
} else if (err != nil) && (i < st.retryCnt) {
213-
storage.Log.Debugf("S3 obj content downloading request failed with error: %s", err)
214-
time.Sleep(st.retryInterval)
215-
continue
216-
} else if (err != nil) && (i == st.retryCnt) {
217-
return err
218-
}
219-
220-
buf := bytes.NewBuffer(make([]byte, 0, aws.Int64Value(result.ContentLength)))
221-
_, err = io.Copy(ratelimit.NewWriter(buf, st.rlBucket), result.Body)
222-
if storage.IsAwsContextCanceled(err) {
223-
return err
224-
} else if (err != nil) && (i < st.retryCnt) {
225-
storage.Log.Debugf("S3 obj content downloading failed with error: %s", err)
226-
time.Sleep(st.retryInterval)
227-
continue
228-
} else if (err != nil) && (i == st.retryCnt) {
229-
return err
230-
}
174+
result, err := st.awsSvc.GetObjectWithContext(st.ctx, input)
175+
if err != nil {
176+
return err
177+
}
231178

232-
data := buf.Bytes()
233-
obj.Content = &data
234-
obj.ContentType = result.ContentType
235-
obj.ContentDisposition = result.ContentDisposition
236-
obj.ContentEncoding = result.ContentEncoding
237-
obj.ContentLanguage = result.ContentLanguage
238-
obj.ETag = storage.StrongEtag(result.ETag)
239-
obj.Metadata = result.Metadata
240-
obj.Mtime = result.LastModified
241-
obj.CacheControl = result.CacheControl
242-
obj.StorageClass = result.StorageClass
243-
244-
return nil
179+
buf := bytes.NewBuffer(make([]byte, 0, aws.Int64Value(result.ContentLength)))
180+
if _, err := io.Copy(ratelimit.NewWriter(buf, st.rlBucket), result.Body); err != nil {
181+
return err
245182
}
183+
184+
data := buf.Bytes()
185+
obj.Content = &data
186+
obj.ContentType = result.ContentType
187+
obj.ContentDisposition = result.ContentDisposition
188+
obj.ContentEncoding = result.ContentEncoding
189+
obj.ContentLanguage = result.ContentLanguage
190+
obj.ETag = storage.StrongEtag(result.ETag)
191+
obj.Metadata = result.Metadata
192+
obj.Mtime = result.LastModified
193+
obj.CacheControl = result.CacheControl
194+
obj.StorageClass = result.StorageClass
195+
196+
return nil
246197
}
247198

248199
// GetObjectACL read object ACL from S3.
@@ -253,25 +204,17 @@ func (st *S3Storage) GetObjectACL(obj *storage.Object) error {
253204
VersionId: obj.VersionId,
254205
}
255206

256-
for i := uint(0); ; i++ {
257-
result, err := st.awsSvc.GetObjectAclWithContext(st.ctx, input)
258-
if storage.IsAwsContextCanceled(err) {
259-
return err
260-
} else if (err != nil) && (i < st.retryCnt) {
261-
storage.Log.Debugf("S3 obj ACL downloading request failed with error: %s", err)
262-
time.Sleep(st.retryInterval)
263-
continue
264-
} else if (err != nil) && (i == st.retryCnt) {
265-
return err
266-
}
267-
268-
obj.AccessControlPolicy = &s3.AccessControlPolicy{
269-
Grants: result.Grants,
270-
Owner: result.Owner,
271-
}
207+
result, err := st.awsSvc.GetObjectAclWithContext(st.ctx, input)
208+
if err != nil {
209+
return err
210+
}
272211

273-
return nil
212+
obj.AccessControlPolicy = &s3.AccessControlPolicy{
213+
Grants: result.Grants,
214+
Owner: result.Owner,
274215
}
216+
217+
return nil
275218
}
276219

277220
// GetObjectMeta update object metadata from S3.
@@ -282,30 +225,22 @@ func (st *S3Storage) GetObjectMeta(obj *storage.Object) error {
282225
VersionId: obj.VersionId,
283226
}
284227

285-
for i := uint(0); ; i++ {
286-
result, err := st.awsSvc.HeadObjectWithContext(st.ctx, input)
287-
if storage.IsAwsContextCanceled(err) {
288-
return err
289-
} else if (err != nil) && (i < st.retryCnt) {
290-
storage.Log.Debugf("S3 obj meta downloading request failed with error: %s", err)
291-
time.Sleep(st.retryInterval)
292-
continue
293-
} else if (err != nil) && (i == st.retryCnt) {
294-
return err
295-
}
296-
297-
obj.ContentType = result.ContentType
298-
obj.ContentDisposition = result.ContentDisposition
299-
obj.ContentEncoding = result.ContentEncoding
300-
obj.ContentLanguage = result.ContentLanguage
301-
obj.ETag = storage.StrongEtag(result.ETag)
302-
obj.Metadata = result.Metadata
303-
obj.Mtime = result.LastModified
304-
obj.CacheControl = result.CacheControl
305-
obj.StorageClass = result.StorageClass
306-
307-
return nil
228+
result, err := st.awsSvc.HeadObjectWithContext(st.ctx, input)
229+
if err != nil {
230+
return err
308231
}
232+
233+
obj.ContentType = result.ContentType
234+
obj.ContentDisposition = result.ContentDisposition
235+
obj.ContentEncoding = result.ContentEncoding
236+
obj.ContentLanguage = result.ContentLanguage
237+
obj.ETag = storage.StrongEtag(result.ETag)
238+
obj.Metadata = result.Metadata
239+
obj.Mtime = result.LastModified
240+
obj.CacheControl = result.CacheControl
241+
obj.StorageClass = result.StorageClass
242+
243+
return nil
309244
}
310245

311246
// DeleteObject remove object from S3.
@@ -316,19 +251,8 @@ func (st *S3Storage) DeleteObject(obj *storage.Object) error {
316251
VersionId: obj.VersionId,
317252
}
318253

319-
for i := uint(0); ; i++ {
320-
_, err := st.awsSvc.DeleteObjectWithContext(st.ctx, input)
321-
if err == nil {
322-
break
323-
} else if storage.IsAwsContextCanceled(err) {
324-
return err
325-
} else if (err != nil) && (i < st.retryCnt) {
326-
storage.Log.Debugf("S3 obj removing failed with error: %s", err)
327-
time.Sleep(st.retryInterval)
328-
continue
329-
} else if (err != nil) && (i == st.retryCnt) {
330-
return err
331-
}
254+
if _, err := st.awsSvc.DeleteObjectWithContext(st.ctx, input); err != nil {
255+
return err
332256
}
333257
return nil
334258
}

0 commit comments

Comments
 (0)