Skip to content

Commit 1cef3cf

Browse files
authored
[Feature] Parallel backup uploads limit (#859)
1 parent 3903543 commit 1cef3cf

File tree

7 files changed

+172
-8
lines changed

7 files changed

+172
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- Fix Exporter in Deployments without authentication
99
- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods
1010
- Add additional timeout parameters and kubernetes batch size
11+
- Limit parallel Backup uploads
1112

1213
## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25)
1314
- Split & Unify Lifecycle management functionality

main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ var (
126126
operatorKubernetesOptions struct {
127127
maxBatchSize int64
128128
}
129+
operatorBackup struct {
130+
concurrentUploads int
131+
}
129132
operatorTimeouts struct {
130133
k8s time.Duration
131134
arangoD time.Duration
@@ -167,6 +170,7 @@ func init() {
167170
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
168171
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
169172
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
173+
f.IntVar(&operatorBackup.concurrentUploads, "backup-concurrent-uploads", globals.DefaultBackupConcurrentUploads, "Number of concurrent uploads per deployment")
170174
features.Init(&cmdMain)
171175
}
172176

@@ -196,6 +200,7 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
196200
globals.GetGlobalTimeouts().ArangoD().Set(operatorTimeouts.arangoD)
197201
globals.GetGlobalTimeouts().Reconciliation().Set(operatorTimeouts.reconciliation)
198202
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)
203+
globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads)
199204

200205
// Prepare log service
201206
var err error

pkg/backup/handlers/arango/backup/state_ready_test.go

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"sync"
2727
"testing"
2828

29+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
30+
2931
"github.com/arangodb/go-driver"
3032
"github.com/arangodb/kube-arangodb/pkg/util"
3133
"k8s.io/apimachinery/pkg/util/uuid"
@@ -464,8 +466,8 @@ func Test_State_Ready_KeepPendingWithForcedRunning(t *testing.T) {
464466
}
465467
}
466468

467-
require.Equal(t, size, upload)
468-
require.Equal(t, 0, ready)
469+
require.Equal(t, globals.DefaultBackupConcurrentUploads, upload)
470+
require.Equal(t, size-globals.DefaultBackupConcurrentUploads, ready)
469471
}
470472

471473
func Test_State_Ready_KeepPendingWithForcedRunningSameId(t *testing.T) {
@@ -531,3 +533,103 @@ func Test_State_Ready_KeepPendingWithForcedRunningSameId(t *testing.T) {
531533
require.Equal(t, 1, upload)
532534
require.Equal(t, size-1, ready)
533535
}
536+
537+
func Test_State_Ready_Concurrent_Queued(t *testing.T) {
538+
// Arrange
539+
handler, mock := newErrorsFakeHandler(mockErrorsArangoClientBackup{})
540+
541+
createResponse, err := mock.Create()
542+
require.NoError(t, err)
543+
544+
obj, deployment := newObjectSet(backupApi.ArangoBackupStateReady)
545+
546+
backupMeta, err := mock.Get(createResponse.ID)
547+
require.NoError(t, err)
548+
549+
obj.Status.Backup = createBackupFromMeta(backupMeta, nil)
550+
obj.Spec.Upload = &backupApi.ArangoBackupSpecOperation{
551+
RepositoryURL: "Any",
552+
}
553+
554+
size := globals.DefaultBackupConcurrentUploads
555+
objects := make([]*backupApi.ArangoBackup, size)
556+
for id := range objects {
557+
createResponse, err := mock.Create()
558+
require.NoError(t, err)
559+
560+
backupMeta, err := mock.Get(createResponse.ID)
561+
require.NoError(t, err)
562+
563+
obj := newArangoBackup(deployment.GetName(), deployment.GetNamespace(), string(uuid.NewUUID()), backupApi.ArangoBackupStateUploading)
564+
565+
obj.Status.Backup = createBackupFromMeta(backupMeta, nil)
566+
obj.Spec.Upload = &backupApi.ArangoBackupSpecOperation{
567+
RepositoryURL: "s3://test",
568+
}
569+
obj.Status.Available = true
570+
571+
objects[id] = obj
572+
}
573+
574+
// Act
575+
createArangoDeployment(t, handler, deployment)
576+
createArangoBackup(t, handler, objects...)
577+
createArangoBackup(t, handler, obj)
578+
579+
require.NoError(t, handler.Handle(newItemFromBackup(operation.Update, obj)))
580+
581+
// Assert
582+
newObj := refreshArangoBackup(t, handler, obj)
583+
checkBackup(t, newObj, backupApi.ArangoBackupStateReady, true)
584+
compareBackupMeta(t, backupMeta, newObj)
585+
}
586+
587+
func Test_State_Ready_Concurrent_Started(t *testing.T) {
588+
// Arrange
589+
handler, mock := newErrorsFakeHandler(mockErrorsArangoClientBackup{})
590+
591+
createResponse, err := mock.Create()
592+
require.NoError(t, err)
593+
594+
obj, deployment := newObjectSet(backupApi.ArangoBackupStateReady)
595+
596+
backupMeta, err := mock.Get(createResponse.ID)
597+
require.NoError(t, err)
598+
599+
obj.Status.Backup = createBackupFromMeta(backupMeta, nil)
600+
obj.Spec.Upload = &backupApi.ArangoBackupSpecOperation{
601+
RepositoryURL: "Any",
602+
}
603+
604+
size := globals.DefaultBackupConcurrentUploads - 1
605+
objects := make([]*backupApi.ArangoBackup, size)
606+
for id := range objects {
607+
createResponse, err := mock.Create()
608+
require.NoError(t, err)
609+
610+
backupMeta, err := mock.Get(createResponse.ID)
611+
require.NoError(t, err)
612+
613+
obj := newArangoBackup(deployment.GetName(), deployment.GetNamespace(), string(uuid.NewUUID()), backupApi.ArangoBackupStateUploading)
614+
615+
obj.Status.Backup = createBackupFromMeta(backupMeta, nil)
616+
obj.Spec.Upload = &backupApi.ArangoBackupSpecOperation{
617+
RepositoryURL: "s3://test",
618+
}
619+
obj.Status.Available = true
620+
621+
objects[id] = obj
622+
}
623+
624+
// Act
625+
createArangoDeployment(t, handler, deployment)
626+
createArangoBackup(t, handler, objects...)
627+
createArangoBackup(t, handler, obj)
628+
629+
require.NoError(t, handler.Handle(newItemFromBackup(operation.Update, obj)))
630+
631+
// Assert
632+
newObj := refreshArangoBackup(t, handler, obj)
633+
checkBackup(t, newObj, backupApi.ArangoBackupStateUpload, true)
634+
compareBackupMeta(t, backupMeta, newObj)
635+
}

pkg/backup/handlers/arango/backup/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"context"
2727
"strings"
2828

29+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
30+
2931
clientBackup "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/typed/backup/v1"
3032

3133
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -62,6 +64,8 @@ func isBackupRunning(backup *backupApi.ArangoBackup, client clientBackup.ArangoB
6264
return false, newTemporaryError(err)
6365
}
6466

67+
currentUploads := 0
68+
6569
for _, existingBackup := range backups.Items {
6670
if existingBackup.Name == backup.Name {
6771
continue
@@ -70,6 +74,7 @@ func isBackupRunning(backup *backupApi.ArangoBackup, client clientBackup.ArangoB
7074
// We can upload multiple uploads from same deployment in same time
7175
if backup.Status.State == backupApi.ArangoBackupStateReady &&
7276
(existingBackup.Status.State == backupApi.ArangoBackupStateUpload || existingBackup.Status.State == backupApi.ArangoBackupStateUploading) {
77+
currentUploads++
7378
if backupUpload := backup.Status.Backup; backupUpload != nil {
7479
if existingBackupUpload := existingBackup.Status.Backup; existingBackupUpload != nil {
7580
if strings.EqualFold(backupUpload.ID, existingBackupUpload.ID) {
@@ -88,5 +93,9 @@ func isBackupRunning(backup *backupApi.ArangoBackup, client clientBackup.ArangoB
8893
}
8994
}
9095

96+
if backup.Status.State == backupApi.ArangoBackupStateReady {
97+
return currentUploads >= globals.GetGlobals().Backup().ConcurrentUploads().Get(), nil
98+
}
99+
91100
return false, nil
92101
}

pkg/util/globals/global.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const (
2828
DefaultReconciliationTimeout = time.Minute
2929

3030
DefaultKubernetesRequestBatchSize = 256
31+
32+
DefaultBackupConcurrentUploads = 4
3133
)
3234

3335
var globalObj = &globals{
@@ -37,7 +39,10 @@ var globalObj = &globals{
3739
reconciliation: NewTimeout(DefaultReconciliationTimeout),
3840
},
3941
kubernetes: &globalKubernetes{
40-
requestBatchSize: NewInt(DefaultKubernetesRequestBatchSize),
42+
requestBatchSize: NewInt64(DefaultKubernetesRequestBatchSize),
43+
},
44+
backup: &globalBackup{
45+
concurrentUploads: NewInt(DefaultBackupConcurrentUploads),
4146
},
4247
}
4348

@@ -52,11 +57,17 @@ func GetGlobalTimeouts() GlobalTimeouts {
5257
type Globals interface {
5358
Timeouts() GlobalTimeouts
5459
Kubernetes() GlobalKubernetes
60+
Backup() GlobalBackup
5561
}
5662

5763
type globals struct {
5864
timeouts *globalTimeouts
5965
kubernetes *globalKubernetes
66+
backup *globalBackup
67+
}
68+
69+
func (g globals) Backup() GlobalBackup {
70+
return g.backup
6071
}
6172

6273
func (g globals) Kubernetes() GlobalKubernetes {
@@ -79,6 +90,18 @@ func (g *globalKubernetes) RequestBatchSize() Int64 {
7990
return g.requestBatchSize
8091
}
8192

93+
type GlobalBackup interface {
94+
ConcurrentUploads() Int
95+
}
96+
97+
type globalBackup struct {
98+
concurrentUploads Int
99+
}
100+
101+
func (g *globalBackup) ConcurrentUploads() Int {
102+
return g.concurrentUploads
103+
}
104+
82105
type GlobalTimeouts interface {
83106
Reconciliation() Timeout
84107

pkg/util/globals/globals_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,22 @@ func Test_Globals(t *testing.T) {
3232
require.EqualValues(t, DefaultKubernetesTimeout, GetGlobals().Timeouts().Kubernetes().Get())
3333
require.EqualValues(t, DefaultArangoDTimeout, GetGlobals().Timeouts().ArangoD().Get())
3434
require.EqualValues(t, DefaultReconciliationTimeout, GetGlobals().Timeouts().Reconciliation().Get())
35+
require.EqualValues(t, DefaultBackupConcurrentUploads, GetGlobals().Backup().ConcurrentUploads().Get())
3536
})
3637

3738
t.Run("Override", func(t *testing.T) {
3839
GetGlobals().Kubernetes().RequestBatchSize().Set(0)
3940
GetGlobals().Timeouts().Kubernetes().Set(0)
4041
GetGlobals().Timeouts().ArangoD().Set(0)
4142
GetGlobals().Timeouts().Reconciliation().Set(0)
43+
GetGlobals().Backup().ConcurrentUploads().Set(0)
4244
})
4345

4446
t.Run("Check", func(t *testing.T) {
4547
require.EqualValues(t, 0, GetGlobals().Kubernetes().RequestBatchSize().Get())
4648
require.EqualValues(t, 0, GetGlobals().Timeouts().Kubernetes().Get())
4749
require.EqualValues(t, 0, GetGlobals().Timeouts().ArangoD().Get())
4850
require.EqualValues(t, 0, GetGlobals().Timeouts().Reconciliation().Get())
51+
require.EqualValues(t, 0, GetGlobals().Backup().ConcurrentUploads().Get())
4952
})
5053
}

pkg/util/globals/int.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,44 @@
2020

2121
package globals
2222

23+
type Int interface {
24+
Set(in int)
25+
Get() int
26+
}
27+
28+
func NewInt(def int) Int {
29+
return &intObj{i: def}
30+
}
31+
32+
type intObj struct {
33+
i int
34+
}
35+
36+
func (i *intObj) Set(in int) {
37+
i.i = in
38+
}
39+
40+
func (i *intObj) Get() int {
41+
return i.i
42+
}
43+
2344
type Int64 interface {
2445
Set(in int64)
2546
Get() int64
2647
}
2748

28-
func NewInt(def int64) Int64 {
29-
return &intObj{i: def}
49+
func NewInt64(def int64) Int64 {
50+
return &int64Obj{i: def}
3051
}
3152

32-
type intObj struct {
53+
type int64Obj struct {
3354
i int64
3455
}
3556

36-
func (i *intObj) Set(in int64) {
57+
func (i *int64Obj) Set(in int64) {
3758
i.i = in
3859
}
3960

40-
func (i *intObj) Get() int64 {
61+
func (i *int64Obj) Get() int64 {
4162
return i.i
4263
}

0 commit comments

Comments
 (0)