Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Glacier Support #599

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Glacier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Beta COLD Storage AKA Glacier support

## Objectif

* Backup directly to Glacier
* Restore directly from Glacier

## Usage

* Add a new option "storageclass": "GLACIER" to your preference file
* Use a compatible S3CStorage driver. Actually only tested on Scaleway's Object Storage and C14 Glacier
* Data chunks will go directly to Glacier, snapshot chunks will stay in Standard storage. One can however move snapshot chunks to Glacier also

## Caveats

* Corner cases not tested
* When restore, duplicacy will try to move chunks from Glacier to Standard storage, *ONE BY ONE*, thus can be very slow. One can speed this up by moving all thunks to Standard at once
* You cannot move the config file nor the snapshots folder to Glacier

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Duplicacy currently provides the following storage backends:
* WebDAV (under beta testing)
* pcloud (via WebDAV)
* Box.com (via WebDAV)
* Scaleway with initial [Glacier](Glacier.md) support

Please consult the [wiki page](https://github.com/gilbertchen/duplicacy/wiki/Storage-Backends) on how to set up Duplicacy to work with each cloud storage.

Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_acdstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (storage *ACDStorage) DownloadFile(threadIndex int, filePath string, chunk
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *ACDStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *ACDStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {
parent := path.Dir(filePath)
if parent == "." {
parent = ""
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_azurestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (storage *AzureStorage) DownloadFile(threadIndex int, filePath string, chun
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *AzureStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *AzureStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

tries := 0

Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_b2storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (storage *B2Storage) DownloadFile(threadIndex int, filePath string, chunk *
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *B2Storage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {
return storage.client.UploadFile(threadIndex, filePath, content, storage.UploadRateLimit/storage.client.Threads)
}

Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Benchmark(localDirectory string, storage Storage, fileSize int64, chunkSize

startTime = float64(time.Now().UnixNano()) / 1e9
benchmarkRun(uploadThreads, chunkCount, func(threadIndex int, chunkIndex int) {
err := storage.UploadFile(threadIndex, fmt.Sprintf("benchmark/chunk%d", chunkIndex), chunks[chunkIndex])
err := storage.UploadFile(threadIndex, fmt.Sprintf("benchmark/chunk%d", chunkIndex), chunks[chunkIndex], nil)
if err != nil {
LOG_ERROR("BENCHMARK_UPLOAD", "Failed to upload the chunk: %v", err)
return
Expand Down
37 changes: 30 additions & 7 deletions src/duplicacy_chunkdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"sync/atomic"
"time"

"github.com/gilbertchen/goamz/s3"
)

// ChunkDownloadTask encapsulates information need to download a chunk.
Expand Down Expand Up @@ -268,7 +270,7 @@ func (downloader *ChunkDownloader) WaitForCompletion() {
}

// Looping until there isn't a download task in progress
for downloader.numberOfActiveChunks > 0 || downloader.lastChunkIndex + 1 < len(downloader.taskList) {
for downloader.numberOfActiveChunks > 0 || downloader.lastChunkIndex+1 < len(downloader.taskList) {

// Wait for a completion event first
if downloader.numberOfActiveChunks > 0 {
Expand All @@ -280,8 +282,8 @@ func (downloader *ChunkDownloader) WaitForCompletion() {
}

// Pass the tasks one by one to the download queue
if downloader.lastChunkIndex + 1 < len(downloader.taskList) {
task := &downloader.taskList[downloader.lastChunkIndex + 1]
if downloader.lastChunkIndex+1 < len(downloader.taskList) {
task := &downloader.taskList[downloader.lastChunkIndex+1]
if task.isDownloading {
downloader.lastChunkIndex++
continue
Expand Down Expand Up @@ -358,6 +360,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
chunk.Reset(false)

const MaxDownloadAttempts = 3
const GlacierDelay = 1
for downloadAttempt := 0; ; downloadAttempt++ {

// Find the chunk by ID first.
Expand Down Expand Up @@ -417,15 +420,35 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT
err = downloader.storage.DownloadFile(threadIndex, chunkPath, chunk)
if err != nil {
_, isHubic := downloader.storage.(*HubicStorage)
s3c, isArgo := downloader.storage.(*S3CStorage)
// Retry on EOF or if it is a Hubic backend as it may return 404 even when the chunk exists
if (err == io.ErrUnexpectedEOF || isHubic) && downloadAttempt < MaxDownloadAttempts {
LOG_WARN("DOWNLOAD_RETRY", "Failed to download the chunk %s: %v; retrying", chunkID, err)
chunk.Reset(false)
continue
} else {
LOG_ERROR("DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
return false
}
if s3err, ok := err.(*s3.Error); ok {
// Hit by glacier
if s3err.StatusCode == 403 && isArgo {
if downloadAttempt == 0 {
LOG_DEBUG("TITANIC", "Requestion restore %v from GLACIER", chunkID)
err = s3c.RestoreFile(threadIndex, chunkPath, 1) // XXX FIXME : hardcoded 1 day retention
if err != nil {
LOG_WARN("DOWNLOAD_RETRY", "Restore %v from GLACIER failed: %v", chunkID, err)
return false
}
}
// retry up to 3 * 3 times
if downloadAttempt <= MaxDownloadAttempts*3 {
LOG_WARN("DOWNLOAD_RETRY", "Unable get chunk %s from GLACIER; retry in %v minte", chunkID, GlacierDelay)
chunk.Reset(false)
time.Sleep(GlacierDelay * time.Minute) // XXX : Will bloc here!
continue
}
}
}
LOG_ERROR("DOWNLOAD_CHUNK", "Failed to download the chunk %s: %v", chunkID, err)
return false
}

err = chunk.Decrypt(downloader.config.ChunkKey, task.chunkHash)
Expand Down Expand Up @@ -457,7 +480,7 @@ func (downloader *ChunkDownloader) Download(threadIndex int, task ChunkDownloadT

if len(cachedPath) > 0 {
// Save a copy to the local snapshot cache
err := downloader.snapshotCache.UploadFile(threadIndex, cachedPath, chunk.GetBytes())
err := downloader.snapshotCache.UploadFile(threadIndex, cachedPath, chunk.GetBytes(), nil)
if err != nil {
LOG_WARN("DOWNLOAD_CACHE", "Failed to add the chunk %s to the snapshot cache: %v", chunkID, err)
}
Expand Down
14 changes: 12 additions & 2 deletions src/duplicacy_chunkuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,20 @@ func (uploader *ChunkUploader) Upload(threadIndex int, task ChunkUploadTask) boo
chunk := task.chunk
chunkSize := chunk.GetLength()
chunkID := chunk.GetID()
var stropt StorageOption

// For a snapshot chunk, verify that its chunk id is correct
if uploader.snapshotCache != nil {
chunk.VerifyID()
// initialize StorageClass for meta chunk
if st, ok := uploader.storage.(*S3CStorage); ok {
stropt = st.stOptionMeta
}
} else {
// initialize StorageClass for data chunk
if st, ok := uploader.storage.(*S3CStorage); ok {
stropt = st.stOptionData
}
}

if uploader.snapshotCache != nil && uploader.storage.IsCacheNeeded() {
Expand All @@ -104,7 +114,7 @@ func (uploader *ChunkUploader) Upload(threadIndex int, task ChunkUploadTask) boo
LOG_WARN("UPLOAD_CACHE", "Failed to find the cache path for the chunk %s: %v", chunkID, err)
} else if exist {
LOG_DEBUG("CHUNK_CACHE", "Chunk %s already exists in the snapshot cache", chunkID)
} else if err = uploader.snapshotCache.UploadFile(threadIndex, chunkPath, chunk.GetBytes()); err != nil {
} else if err = uploader.snapshotCache.UploadFile(threadIndex, chunkPath, chunk.GetBytes(), stropt); err != nil {
LOG_WARN("UPLOAD_CACHE", "Failed to save the chunk %s to the snapshot cache: %v", chunkID, err)
} else {
LOG_DEBUG("CHUNK_CACHE", "Chunk %s has been saved to the snapshot cache", chunkID)
Expand Down Expand Up @@ -135,7 +145,7 @@ func (uploader *ChunkUploader) Upload(threadIndex int, task ChunkUploadTask) boo
}

if !uploader.config.dryRun {
err = uploader.storage.UploadFile(threadIndex, chunkPath, chunk.GetBytes())
err = uploader.storage.UploadFile(threadIndex, chunkPath, chunk.GetBytes(), stropt)
if err != nil {
LOG_ERROR("UPLOAD_CHUNK", "Failed to upload the chunk %s: %v", chunkID, err)
return false
Expand Down
22 changes: 11 additions & 11 deletions src/duplicacy_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
"bytes"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"crypto/rsa"
"crypto/sha256"
"crypto/x509"
"encoding/binary"
"encoding/hex"
"encoding/json"
"encoding/pem"
"fmt"
"hash"
"io/ioutil"
"os"
"reflect"
"runtime"
"runtime/debug"
"sync/atomic"
"io/ioutil"
"reflect"

blake2 "github.com/minio/blake2b-simd"
)
Expand Down Expand Up @@ -72,7 +72,7 @@ type Config struct {

// for RSA encryption
rsaPrivateKey *rsa.PrivateKey
rsaPublicKey *rsa.PublicKey
rsaPublicKey *rsa.PublicKey

chunkPool chan *Chunk
numberOfChunks int32
Expand All @@ -84,17 +84,17 @@ type aliasedConfig Config

type jsonableConfig struct {
*aliasedConfig
ChunkSeed string `json:"chunk-seed"`
HashKey string `json:"hash-key"`
IDKey string `json:"id-key"`
ChunkKey string `json:"chunk-key"`
FileKey string `json:"file-key"`
ChunkSeed string `json:"chunk-seed"`
HashKey string `json:"hash-key"`
IDKey string `json:"id-key"`
ChunkKey string `json:"chunk-key"`
FileKey string `json:"file-key"`
RSAPublicKey string `json:"rsa-public-key"`
}

func (config *Config) MarshalJSON() ([]byte, error) {

publicKey := []byte {}
publicKey := []byte{}
if config.rsaPublicKey != nil {
publicKey, _ = x509.MarshalPKIXPublicKey(config.rsaPublicKey)
}
Expand Down Expand Up @@ -504,7 +504,7 @@ func UploadConfig(storage Storage, config *Config, password string, iterations i
}
}

err = storage.UploadFile(0, "config", chunk.GetBytes())
err = storage.UploadFile(0, "config", chunk.GetBytes(), nil)
if err != nil {
LOG_ERROR("CONFIG_INIT", "Failed to configure the storage: %v", err)
return false
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_dropboxstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (storage *DropboxStorage) DownloadFile(threadIndex int, filePath string, ch
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *DropboxStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *DropboxStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {
if filePath != "" && filePath[0] != '/' {
filePath = "/" + filePath
}
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_filestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (storage *FileStorage) DownloadFile(threadIndex int, filePath string, chunk
}

// UploadFile writes 'content' to the file at 'filePath'
func (storage *FileStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *FileStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

fullPath := path.Join(storage.storageDir, filePath)

Expand Down
6 changes: 3 additions & 3 deletions src/duplicacy_gcdstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type GCDStorage struct {
service *drive.Service
idCache map[string]string // only directories are saved in this cache
idCacheLock sync.Mutex
backoffs []int // desired backoff time in seconds for each thread
attempts []int // number of failed attempts since last success for each thread
backoffs []int // desired backoff time in seconds for each thread
attempts []int // number of failed attempts since last success for each thread
driveID string // the ID of the shared drive or 'root' (GCDUserDrive) if the user's drive

createDirectoryLock sync.Mutex
Expand Down Expand Up @@ -715,7 +715,7 @@ func (storage *GCDStorage) DownloadFile(threadIndex int, filePath string, chunk
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *GCDStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

// We never upload a fossil so there is no need to convert the path
parent := path.Dir(filePath)
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_gcsstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (storage *GCSStorage) DownloadFile(threadIndex int, filePath string, chunk
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *GCSStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *GCSStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

backoff := 1
for {
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_hubicstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (storage *HubicStorage) DownloadFile(threadIndex int, filePath string, chun
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *HubicStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *HubicStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {
return storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThreads)
}

Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_onestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (storage *OneDriveStorage) DownloadFile(threadIndex int, filePath string, c
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *OneDriveStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *OneDriveStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {
err = storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThread)

if e, ok := err.(OneDriveError); ok && e.Status == 409 {
Expand Down
3 changes: 2 additions & 1 deletion src/duplicacy_preference.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type Preference struct {
DoNotSavePassword bool `json:"no_save_password"`
NobackupFile string `json:"nobackup_file"`
Keys map[string]string `json:"keys"`
FiltersFile string `json:"filters"`
FiltersFile string `json:"filters"`
StorageClass string `json:"storageclass"`
}

var preferencePath string
Expand Down
24 changes: 20 additions & 4 deletions src/duplicacy_s3cstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type S3CStorage struct {

buckets []*s3.Bucket
storageDir string

stOptionMeta s3.Options // Option to use for metadata
stOptionData s3.Options // Option to use for file data
}

// CreateS3CStorage creates a amazon s3 storage object.
Expand Down Expand Up @@ -54,8 +57,10 @@ func CreateS3CStorage(regionName string, endpoint string, bucketName string, sto
}

storage = &S3CStorage{
buckets: buckets,
storageDir: storageDir,
buckets: buckets,
storageDir: storageDir,
stOptionMeta: s3.Options{Tagging: "TYPE=META"},
stOptionData: s3.Options{Tagging: "TYPE=DATA"},
}

storage.DerivedStorage = storage
Expand Down Expand Up @@ -172,13 +177,24 @@ func (storage *S3CStorage) DownloadFile(threadIndex int, filePath string, chunk
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *S3CStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *S3CStorage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

options := s3.Options{}
var options s3.Options
if sto, ok := storageOption.(s3.Options); ok {
options = sto
}
LOG_DEBUG("debug", "s3cOption %+v", options)
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/len(storage.buckets))
return storage.buckets[threadIndex].PutReader(storage.storageDir+filePath, reader, int64(len(content)), "application/duplicacy", s3.Private, options)
}

// RestoreFile request restore filePath from GLACIER to STANDARD storage with a retention of days
func (storage *S3CStorage) RestoreFile(threadIndex int, filePath string, days int) (err error) {

LOG_DEBUG("RestoreObject", "RestoreObject %v", filePath)
return storage.buckets[threadIndex].RestoreObject(storage.storageDir+filePath, days)
}

// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
// managing snapshots.
func (storage *S3CStorage) IsCacheNeeded() bool { return true }
Expand Down
2 changes: 1 addition & 1 deletion src/duplicacy_s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (storage *S3Storage) DownloadFile(threadIndex int, filePath string, chunk *
}

// UploadFile writes 'content' to the file at 'filePath'.
func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {
func (storage *S3Storage) UploadFile(threadIndex int, filePath string, content []byte, storageOption StorageOption) (err error) {

attempts := 0

Expand Down
Loading