Skip to content

Commit dd81d67

Browse files
committed
Add Exist/NotExist filters and refactor few files
1 parent 2caf297 commit dd81d67

File tree

6 files changed

+79
-29
lines changed

6 files changed

+79
-29
lines changed

cli/cli.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type args struct {
7171
FilterMtimeAfter int64 `arg:"--filter-after-mtime" help:"Sync only files modified after given unix timestamp"`
7272
FilterMtimeBefore int64 `arg:"--filter-before-mtime" help:"Sync only files modified before given unix timestamp"`
7373
FilterModified bool `arg:"--filter-modified" help:"Sync only modified files"`
74+
FilterExist bool `arg:"--filter-exist" help:"Sync only files, that exist in target storage'"`
75+
FilterExistNot bool `arg:"--filter-not-exist" help:"Sync only files, that doesn't exist in target storage'"`
7476
// Misc
7577
Workers uint `arg:"-w" help:"Workers count"`
7678
Debug bool `arg:"-d" help:"Show debug logging"`

cli/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,15 @@ WaitLoop:
9898
continue WaitLoop
9999
}
100100

101-
if cli.ErrorHandlingMask.Has(storage.HandleErrNotExist) && isErrNotExist(err) {
101+
if cli.ErrorHandlingMask.Has(storage.HandleErrNotExist) && storage.IsErrNotExist(err) {
102102
var objErr *pipeline.ObjectError
103103
if errors.As(err, &objErr) {
104104
log.Warnf("Skip missing object: %s", *objErr.Object.Key)
105105
} else {
106106
log.Warnf("Skip missing object, err: %s", err)
107107
}
108108
continue WaitLoop
109-
} else if cli.ErrorHandlingMask.Has(storage.HandleErrPermission) && isErrPermission(err) {
109+
} else if cli.ErrorHandlingMask.Has(storage.HandleErrPermission) && storage.IsErrPermission(err) {
110110
var objErr *pipeline.ObjectError
111111
if errors.As(err, &objErr) {
112112
log.Warnf("Skip permission denied object: %s", *objErr.Object.Key)

cli/setup.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ func setupPipeline(syncGroup *pipeline.Group, cli *argsParsed) {
127127
})
128128
}
129129

130+
if cli.FilterExist {
131+
syncGroup.AddPipeStep(pipeline.Step{
132+
Name: "FilterObjectsExist",
133+
Fn: collection.FilterObjectsExist,
134+
})
135+
}
136+
137+
if cli.FilterExistNot {
138+
syncGroup.AddPipeStep(pipeline.Step{
139+
Name: "FilterObjectsExistNot",
140+
Fn: collection.FilterObjectsExistNot,
141+
})
142+
}
143+
130144
syncGroup.AddPipeStep(pipeline.Step{
131145
Name: "LoadObjData",
132146
Fn: collection.LoadObjectData,

pipeline/collection/filter.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ var FilterObjectsByMtimeBefore pipeline.StepFn = func(group *pipeline.Group, ste
157157
}
158158

159159
// FilterObjectsModified accepts an input object and checks if it matches the filter
160-
// This filter gets object meta from target storage and compare object ETags. If Etags are equal object will be skipped
160+
// This filter read object meta from target storage and compare object ETags. If Etags are equal object will be skipped
161161
// For FS storage xattr support are required for proper work.
162162
var FilterObjectsModified pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-chan *storage.Object, output chan<- *storage.Object, errChan chan<- error) {
163163
for obj := range input {
@@ -171,3 +171,39 @@ var FilterObjectsModified pipeline.StepFn = func(group *pipeline.Group, stepNum
171171
}
172172
}
173173
}
174+
175+
// FilterObjectsExist accepts an input object and checks if it exist in target storage
176+
// This filter read object meta from target storage. Object will be processed only when it exist in target storage.
177+
var FilterObjectsExist pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-chan *storage.Object, output chan<- *storage.Object, errChan chan<- error) {
178+
for obj := range input {
179+
destObj := &storage.Object{
180+
Key: obj.Key,
181+
}
182+
err := group.Target.GetObjectMeta(destObj)
183+
if err == nil {
184+
output <- obj
185+
} else if storage.IsErrNotExist(err) {
186+
continue
187+
} else {
188+
errChan <- &pipeline.ObjectError{Object: obj, Err: err}
189+
}
190+
}
191+
}
192+
193+
// FilterObjectsExist accepts an input object and checks if it exist in target storage
194+
// This filter read object meta from target storage. Object will be processed only when it doesn't exist in target storage.
195+
var FilterObjectsExistNot pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-chan *storage.Object, output chan<- *storage.Object, errChan chan<- error) {
196+
for obj := range input {
197+
destObj := &storage.Object{
198+
Key: obj.Key,
199+
}
200+
err := group.Target.GetObjectMeta(destObj)
201+
if err == nil {
202+
continue
203+
} else if storage.IsErrNotExist(err) {
204+
output <- obj
205+
} else {
206+
errChan <- &pipeline.ObjectError{Object: obj, Err: err}
207+
}
208+
}
209+
}

cli/error.go renamed to storage/error_handling.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
package main
1+
package storage
22

33
import (
4+
"context"
45
"errors"
56
"github.com/aws/aws-sdk-go/aws/awserr"
7+
"github.com/aws/aws-sdk-go/aws/request"
68
"github.com/aws/aws-sdk-go/service/s3"
79
"os"
810
)
911

10-
func isErrNotExist(err error) bool {
12+
func IsErrNotExist(err error) bool {
1113
var aErr awserr.Error
1214
if errors.As(err, &aErr) {
1315
if (aErr.Code() == s3.ErrCodeNoSuchKey) || (aErr.Code() == "NotFound") {
@@ -21,7 +23,7 @@ func isErrNotExist(err error) bool {
2123
return false
2224
}
2325

24-
func isErrPermission(err error) bool {
26+
func IsErrPermission(err error) bool {
2527
var aErr awserr.Error
2628
if errors.As(err, &aErr) {
2729
if aErr.Code() == "AccessDenied" {
@@ -34,3 +36,22 @@ func isErrPermission(err error) bool {
3436
}
3537
return false
3638
}
39+
40+
func IsAwsContextCanceled(err error) bool {
41+
if err == nil {
42+
return false
43+
}
44+
45+
if errors.Is(err, context.Canceled) {
46+
return true
47+
}
48+
49+
var aErr awserr.Error
50+
if ok := errors.As(err, &aErr); ok && aErr.OrigErr() == context.Canceled {
51+
return true
52+
} else if ok && aErr.Code() == request.CanceledErrorCode {
53+
return true
54+
}
55+
56+
return false
57+
}

storage/utils.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package storage
22

33
import (
4-
"context"
5-
"errors"
6-
"github.com/aws/aws-sdk-go/aws/awserr"
7-
"github.com/aws/aws-sdk-go/aws/request"
84
"strings"
95
)
106

@@ -30,22 +26,3 @@ const (
3026
HandleErrPermission
3127
HandleErrOther = 64
3228
)
33-
34-
func IsAwsContextCanceled(err error) bool {
35-
if err == nil {
36-
return false
37-
}
38-
39-
if errors.Is(err, context.Canceled) {
40-
return true
41-
}
42-
43-
var aErr awserr.Error
44-
if ok := errors.As(err, &aErr); ok && aErr.OrigErr() == context.Canceled {
45-
return true
46-
} else if ok && aErr.Code() == request.CanceledErrorCode {
47-
return true
48-
}
49-
50-
return false
51-
}

0 commit comments

Comments
 (0)