Skip to content

Commit 0743352

Browse files
fix(backup): run native backup upload in parallel
Fixes #4388
1 parent 4b7e582 commit 0743352

File tree

1 file changed

+95
-68
lines changed

1 file changed

+95
-68
lines changed

pkg/service/backup/worker_upload.go

Lines changed: 95 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ package backup
44

55
import (
66
"context"
7+
stdErr "errors"
78
"time"
89

910
"github.com/pkg/errors"
1011
"github.com/scylladb/scylla-manager/backupspec"
1112
"github.com/scylladb/scylla-manager/v3/pkg/scheduler"
1213
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
1314
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
14-
1515
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1616
)
1717

@@ -33,88 +33,115 @@ func (w *worker) Upload(ctx context.Context, hosts []hostInfo, limits []DCLimit)
3333
}
3434

3535
func (w *worker) uploadHost(ctx context.Context, h hostInfo) error {
36+
rclone, native, err := w.hostSnapshotDirsByMethod(ctx, h)
37+
if err != nil {
38+
return errors.Wrap(err, "ensure backup method")
39+
}
40+
41+
var retErr error
42+
if len(native) > 0 {
43+
err := w.uploadSnapshotDirsInParallel(ctx, h, native, w.nativeBackup, parallel.NoLimit)
44+
retErr = stdErr.Join(retErr, err)
45+
}
46+
if len(rclone) > 0 {
47+
err := w.uploadSnapshotDirsInParallel(ctx, h, rclone, w.rcloneBackup, 1)
48+
retErr = stdErr.Join(retErr, err)
49+
}
50+
return retErr
51+
}
52+
53+
func (w *worker) hostSnapshotDirsByMethod(ctx context.Context, h hostInfo) (rclone, native []snapshotDir, err error) {
3654
dirs := w.hostSnapshotDirs(h)
37-
hostNativeBackupSupportErr := w.hostNativeBackupSupport(ctx, h.IP, h.NodeConfig.NodeInfo, h.Location)
38-
if hostNativeBackupSupportErr == nil {
39-
reset, err := w.Client.ScyllaControlTaskUserTTL(ctx, h.IP)
40-
if err != nil {
41-
return err
42-
}
43-
defer reset()
55+
56+
if w.Method == methodRclone {
57+
return dirs, nil, nil
4458
}
4559

46-
f := func(i int) (err error) {
47-
d := dirs[i]
48-
49-
// Skip snapshots that are empty.
50-
if d.Progress.Size == 0 {
51-
w.Logger.Info(ctx, "Table is empty skipping", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table)
52-
now := timeutc.Now()
53-
d.Progress.StartedAt = &now
54-
d.Progress.CompletedAt = &now
55-
w.onRunProgress(ctx, d.Progress)
56-
return nil
57-
}
58-
// Skip snapshots that are already uploaded.
59-
if d.Progress.IsUploaded() {
60-
w.Logger.Info(ctx, "Snapshot already uploaded skipping", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table)
61-
return nil
60+
// Handle lack of native backup support on the host level
61+
if err := w.hostNativeBackupSupport(ctx, h.IP, h.NodeConfig.NodeInfo, h.Location); err != nil {
62+
if w.Method == methodNative {
63+
return nil, nil, err
6264
}
65+
return dirs, nil, nil
66+
}
6367

64-
// NOTE that defers are executed in LIFO order
65-
// Abort on cancel.
66-
defer func() {
67-
if scheduler.IsTaskInterrupted(ctx) {
68-
err = parallel.Abort(err)
68+
if w.Method == methodNative {
69+
for _, d := range dirs {
70+
if err := w.snapshotDirNativeBackupSupport(ctx, h.IP, d); err != nil {
71+
return nil, nil, errors.Wrapf(err, "%s.%s: ensure native backup support", d.Keyspace, d.Table)
6972
}
70-
}()
71-
// Add keyspace table info to error mgs.
72-
defer func() {
73-
err = errors.Wrapf(err, "%s.%s", d.Keyspace, d.Table)
74-
}()
75-
// Delete table backupspec.
76-
defer func() {
77-
if err != nil {
78-
return
79-
}
80-
err = errors.Wrap(w.deleteTableSnapshot(ctx, h, d), "delete table snapshot")
81-
}()
82-
83-
nativeBackupSupportErr := hostNativeBackupSupportErr
84-
if nativeBackupSupportErr == nil {
85-
nativeBackupSupportErr = w.snapshotDirNativeBackupSupport(ctx, h.IP, d)
8673
}
74+
return nil, dirs, nil
75+
}
8776

88-
switch w.Method {
89-
case methodNative:
90-
if nativeBackupSupportErr != nil {
91-
return errors.Wrap(nativeBackupSupportErr, "ensure native backup")
92-
}
93-
return errors.Wrap(w.nativeBackup(ctx, h, d), "native backup")
94-
case methodRclone:
95-
return errors.Wrap(w.rcloneBackup(ctx, h, d), "rclone backup")
96-
case methodAuto:
97-
if nativeBackupSupportErr != nil {
98-
return errors.Wrap(w.rcloneBackup(ctx, h, d), "auto rclone backup")
77+
if w.Method == methodAuto {
78+
for _, d := range dirs {
79+
if err := w.snapshotDirNativeBackupSupport(ctx, h.IP, d); err != nil {
80+
rclone = append(rclone, d)
9981
} else {
100-
return errors.Wrap(w.nativeBackup(ctx, h, d), "auto native backup")
82+
native = append(native, d)
10183
}
102-
default:
103-
return errors.New("unknown method: " + string(w.Method))
10484
}
85+
return rclone, native, nil
10586
}
10687

107-
notify := func(i int, err error) {
108-
d := dirs[i]
109-
w.Logger.Error(ctx, "Failed to upload host",
110-
"host", d.Host,
111-
"keyspace", d.Keyspace,
112-
"table", d.Table,
113-
"error", err,
114-
)
88+
return nil, nil, errors.New("unknown method: " + string(w.Method))
89+
}
90+
91+
type uploadSnapshotDirFunc func(ctx context.Context, h hostInfo, d snapshotDir) error
92+
93+
func (w *worker) uploadSnapshotDirsInParallel(ctx context.Context, h hostInfo, dirs []snapshotDir, f uploadSnapshotDirFunc, limit int) error {
94+
return parallel.Run(len(dirs), limit,
95+
func(i int) error {
96+
return w.uploadSnapshotDirWrapper(ctx, h, dirs[i], f)
97+
},
98+
func(i int, err error) {
99+
d := dirs[i]
100+
w.Logger.Error(ctx, "Failed to upload host",
101+
"host", d.Host,
102+
"keyspace", d.Keyspace,
103+
"table", d.Table,
104+
"error", err,
105+
)
106+
})
107+
}
108+
109+
func (w *worker) uploadSnapshotDirWrapper(ctx context.Context, h hostInfo, d snapshotDir, f uploadSnapshotDirFunc) (err error) {
110+
// Skip snapshots that are empty.
111+
if d.Progress.Size == 0 {
112+
w.Logger.Info(ctx, "Table is empty skipping", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table)
113+
now := timeutc.Now()
114+
d.Progress.StartedAt = &now
115+
d.Progress.CompletedAt = &now
116+
w.onRunProgress(ctx, d.Progress)
117+
return nil
118+
}
119+
// Skip snapshots that are already uploaded.
120+
if d.Progress.IsUploaded() {
121+
w.Logger.Info(ctx, "Snapshot already uploaded skipping", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table)
122+
return nil
115123
}
116124

117-
return parallel.Run(len(dirs), 1, f, notify)
125+
// NOTE that defers are executed in LIFO order
126+
// Abort on cancel.
127+
defer func() {
128+
if scheduler.IsTaskInterrupted(ctx) {
129+
err = parallel.Abort(err)
130+
}
131+
}()
132+
// Add keyspace table info to error mgs.
133+
defer func() {
134+
err = errors.Wrapf(err, "%s.%s", d.Keyspace, d.Table)
135+
}()
136+
// Delete table backupspec.
137+
defer func() {
138+
if err != nil {
139+
return
140+
}
141+
err = errors.Wrap(w.deleteTableSnapshot(ctx, h, d), "delete table snapshot")
142+
}()
143+
144+
return f(ctx, h, d)
118145
}
119146

120147
func (w *worker) rcloneBackup(ctx context.Context, h hostInfo, d snapshotDir) error {

0 commit comments

Comments
 (0)