Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration from onedrive #139

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
45c1ea9
default cloud source
pewssh Oct 27, 2024
f3690f2
updated gosdk version
dabasov Nov 1, 2024
da9ef10
updated
pewssh Oct 28, 2024
8018518
* ADded test for onedrive
pewssh Nov 6, 2024
298e89c
use pointer
pewssh Nov 6, 2024
10c86ac
updated for migrate.go file
pewssh Nov 6, 2024
8afe654
Updated for including refresh token
pewssh Nov 6, 2024
fdd7081
updated go.sum
pewssh Nov 8, 2024
940ef52
reset back main.go
pewssh Nov 8, 2024
b9d4863
merge changes
pewssh Nov 8, 2024
005c3a2
format files with gofmt
pewssh Nov 10, 2024
4864c96
updated gosdk version
dabasov Nov 13, 2024
79bafde
Updated for creating file will need .create access
pewssh Nov 20, 2024
3c1c4f2
Azure Microsoft Blob
pewssh Nov 22, 2024
71f8cf8
Code changes for Google cloud
pewssh Nov 27, 2024
798350e
formatted
pewssh Nov 27, 2024
f66ba99
use path package
Hitenjain14 Nov 28, 2024
349cbf5
Optional client credentials
pewssh Nov 29, 2024
bde4e48
Merge pull request #140 from 0chain/fix/path-windows
dabasov Nov 29, 2024
29e3436
tidy go mod
pewssh Dec 4, 2024
8a816b7
updated onedrive version
pewssh Dec 4, 2024
c9941ec
changed repository gor drive;
pewssh Dec 4, 2024
e504751
Merge conflict
pewssh Dec 4, 2024
dee57a7
single return
pewssh Dec 4, 2024
e6ff839
optional client credentials
pewssh Dec 4, 2024
6b45e23
Listing the file based upon name
pewssh Dec 5, 2024
cdb134d
fix drive name
pewssh Dec 5, 2024
e8776a0
Added marking and modified for filename
pewssh Dec 9, 2024
8214ce3
Update gosdk
Jayashsatolia403 Dec 10, 2024
2c4fc97
Removed the upload functionality
pewssh Dec 12, 2024
a6c9794
Merge branch 'staging' into Fix-drive-name
pewssh Dec 13, 2024
34c98d2
*Azure conection string and other method for access
pewssh Dec 24, 2024
f19cda1
Changes from Fix-drive-name;
pewssh Dec 24, 2024
1baa481
newer than in dropbox
pewssh Dec 24, 2024
ad89770
update for test
pewssh Dec 24, 2024
ad6191f
updated for older and newer than
pewssh Dec 24, 2024
7d008a6
updated for newerthan and olderthan
pewssh Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions azure/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package azure

import (
"context"
"fmt"
"io"
"log"
"os"
"path"

zlogger "github.com/0chain/s3migration/logger"
T "github.com/0chain/s3migration/types"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

type AzureClient struct {
service *azblob.Client
workDir string
}

func NewAzureClient(workDir, accountName, connectionString string) (*AzureClient, error) {
blobURL := fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
client, err := azblob.NewClientFromConnectionString(connectionString, &azblob.ClientOptions{Audience: blobURL})

if err != nil {
log.Fatalf("failed to create blob client: %v", err)
}

return &AzureClient{
service: client,
workDir: workDir,
}, nil
}

func (g *AzureClient) ListFiles(ctx context.Context) (<-chan *T.ObjectMeta, <-chan error) {
objectChan := make(chan *T.ObjectMeta)
errChan := make(chan error)
containerName := g.workDir

go func() {
defer func() {
close(objectChan)
close(errChan)
}()

pager := g.service.NewListBlobsFlatPager(containerName, &azblob.ListBlobsFlatOptions{
Include: azblob.ListBlobsInclude{Snapshots: true, Versions: true},
})

for pager.More() {
resp, err := pager.NextPage(context.TODO())

if err != nil {
errChan <- err
return
}

for _, blob := range resp.Segment.BlobItems {
fmt.Println(*blob.Name)
objectChan <- &T.ObjectMeta{
Key: *blob.Name,
Size: *blob.Properties.ContentLength,
ContentType: *blob.Properties.ContentType,
Ext: path.Ext(*blob.Name),
}
}
}

}()

return objectChan, errChan
}

func (g *AzureClient) GetFileContent(ctx context.Context, fileID string) (*T.Object, error) {
resp, err := g.service.DownloadStream(ctx, g.workDir, fileID, nil)
if err != nil {
return nil, err
}

obj := &T.Object{
Body: resp.Body,
ContentType: *resp.ContentType,
ContentLength: *resp.ContentLength,
}

return obj, nil
}

func (g *AzureClient) DeleteFile(ctx context.Context, fileID string) error {
_, err := g.service.DeleteBlob(ctx, g.workDir, fileID, nil)
if err != nil {
return err
}
return nil
}

func (g *AzureClient) DownloadToFile(ctx context.Context, fileID string) (string, error) {
resp, err := g.service.DownloadStream(ctx, g.workDir, fileID, nil)
if err != nil {
return "", err
}

zlogger.Logger.Info(fmt.Sprintf("Original File Name: %s", fileID))
destinationPath := fileID

out, err := os.Create(destinationPath)
if err != nil {
return "", err
}

defer out.Close()

_, err = io.Copy(out, resp.Body)
if err != nil {
return "", err
}

zlogger.Logger.Info(fmt.Sprintf("Downloaded file ID: %s to %s\n", fileID, destinationPath))
return destinationPath, nil
}

func (g *AzureClient) DownloadToMemory(ctx context.Context, fileID string, offset int64, chunkSize, fileSize int64) ([]byte, error) {
limit := offset + chunkSize - 1
if limit > fileSize {
limit = fileSize
}

resp, err := g.service.DownloadStream(ctx, g.workDir, fileID, &azblob.DownloadStreamOptions{
Range: azblob.HTTPRange{ // Pass by value, no `&` needed here
Offset: offset,
Count: chunkSize,
},
})

if err != nil {
return nil, fmt.Errorf("failed to download chunk: %w", err)
}
defer resp.Body.Close()

data := make([]byte, chunkSize)

n, err := io.ReadFull(resp.Body, data)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, fmt.Errorf("error reading blob content: %w", err)
}

if int64(n) < chunkSize && fileSize != chunkSize {
data = data[:n]
}

return data, nil
}
135 changes: 135 additions & 0 deletions azure/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package azure

import (
"context"
"fmt"
"testing"

zlogger "github.com/0chain/s3migration/logger"
)

var (
connectionString = ""
testFileID = ""
)

func TestAzureClient_ListFiles(t *testing.T) {
client, err := NewAzureClient("testing", "climigration", connectionString)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
return
}

ctx := context.Background()
objectChan, errChan := client.ListFiles(ctx)

go func() {
for err := range errChan {
zlogger.Logger.Error(fmt.Sprintf("err while list files: %v", err))
}
}()

for object := range objectChan {
zlogger.Logger.Info(fmt.Sprintf("file:%s, size: %d bytes, type: %s", object.Key, object.Size, object.ContentType))
}
zlogger.Logger.Info("DATA")
}

func TestAzureClient_GetFileContent(t *testing.T) {
client, err := NewAzureClient("testing", "climigration", connectionString)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("Failed to creating Google Drive client: %v", err))
return
}

ctx := context.Background()
fileID := testFileID
obj, err := client.GetFileContent(ctx, fileID)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while getting file content: %v", err))
return
}

defer obj.Body.Close()

zlogger.Logger.Info(fmt.Sprintf("file content type: %s, length: %d", obj.ContentType, obj.ContentLength))

if (obj.Body == nil) || (obj.ContentLength == 0) {
zlogger.Logger.Info("empty file content")
return
}

buf := make([]byte, obj.ContentLength)
n, err := obj.Body.Read(buf)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while read file content: %v", err))
return
}
zlogger.Logger.Info(fmt.Sprintf("read data: %s", buf[:n]))
}

func TestAzureClient_DeleteFile(t *testing.T) {
client, err := NewAzureClient("testing", "climigration", connectionString)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
return
}

ctx := context.Background()
fileID := testFileID
err = client.DeleteFile(ctx, fileID)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while delete file: %v", err))
return
}
zlogger.Logger.Error(fmt.Sprintf("file: %s deleted successfully", fileID))
}

func TestAzureClient_DownloadToFile(t *testing.T) {
client, err := NewAzureClient("testing", "climigration", connectionString)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
}

ctx := context.Background()
fileID := testFileID
destinationPath, err := client.DownloadToFile(ctx, fileID)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while downloading file: %v", err))
return
}
zlogger.Logger.Info(fmt.Sprintf("downloaded to: %s", destinationPath))
}

func TestAzureClient_DownloadToMemory(t *testing.T) {
client, err := NewAzureClient("testing", "climigration", connectionString)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
}

ctx := context.Background()

fileID := testFileID

offset := int64(0)

// download only half chunk for testing
chunkSize := int64(313)

fileSize := int64(626)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while getting file size: %v", err))
return
}

data, err := client.DownloadToMemory(ctx, fileID, offset, chunkSize, fileSize)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while downloading file: %v", err))
return
}

zlogger.Logger.Info(fmt.Sprintf("downloaded data: %s", data))
}
57 changes: 46 additions & 11 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,37 @@ var (
chunkNumber int
batchSize int
source string
clientId string
clientSecret string
clientId string
clientSecret string
connectionString string
accountName string
)

var azureCredentials = map[string]*string{
"connection string": &connectionString,
"account name": &accountName,
}

var Credentials = map[string]*string{
"access key": &accessKey,
"secret key": &secretKey,
}

func validateCredentials(credentials map[string]*string) error {
missingFields := []string{}

for key, value := range credentials {
if *value == "" {
missingFields = append(missingFields, key)
}
}
if len(missingFields) > 0 {
return errors.New("Missing fields: " + strings.Join(missingFields, ", "))
}

return nil
}

// migrateCmd is the migrateFromS3 sub command to migrate whole objects from some buckets.
func init() {
rootCmd.AddCommand(migrateCmd)
Expand Down Expand Up @@ -74,9 +101,13 @@ func init() {
migrateCmd.Flags().Int64Var(&chunkSize, "chunk-size", 50*1024*1024, "chunk size in bytes")
migrateCmd.Flags().IntVar(&chunkNumber, "chunk-number", 250, "number of chunks to upload")
migrateCmd.Flags().IntVar(&batchSize, "batch-size", 20, "number of files to upload in a batch")
migrateCmd.Flags().StringVar(&source, "source", "s3", "s3 or google_drive or dropbox")
migrateCmd.Flags().StringVar(&source, "source", "s3", "s3 or google_drive or dropbox or azure or google_cloud_storage")
migrateCmd.Flags().StringVar(&clientId, "client-id", "", "Client id for Google app console")
migrateCmd.Flags().StringVar(&clientSecret, "client-secret", "", "Client secret for Google app console")

// in case of azure it takes connectionString as param
migrateCmd.PersistentFlags().StringVar(&connectionString, "connection-string", "", "connection string for azure")
migrateCmd.PersistentFlags().StringVar(&accountName, "account-name", "", "account name for azure")
}

var migrateCmd = &cobra.Command{
Expand Down Expand Up @@ -116,8 +147,8 @@ var migrateCmd = &cobra.Command{
}
}

if source == "" {
source = "s3"
if source == "" || (source != "google_drive" && source != "s3" && source != "dropbox" && source != "onedrive" && source != "azure" && source != "google_cloud_storage") {
source = "s3" // Default to "s3"
}

if (accessKey == "" || secretKey == "") && source == "s3" {
Expand All @@ -130,11 +161,6 @@ var migrateCmd = &cobra.Command{
}
}
}
// check if client id and secret exist for google drive

if (clientId == "" && clientSecret == "" && source=="google_drive") {
return fmt.Errorf("missing google client credentials")
}

if bucket == "" && source == "s3" {
bucket, region, prefix, err = util.GetBucketRegionPrefixFromFile(awsCredPath)
Expand All @@ -143,6 +169,15 @@ var migrateCmd = &cobra.Command{
}
}

if err := validateCredentials(func() map[string]*string {
if source == "azure" {
return azureCredentials
}
return Credentials
}()); err != nil {
return err
}

if skip < 0 || skip > 2 {
return fmt.Errorf("skip value not in range 0-2. Provided value is %v", skip)
}
Expand Down Expand Up @@ -245,7 +280,7 @@ var migrateCmd = &cobra.Command{
ChunkSize: chunkSize,
ChunkNumber: chunkNumber,
BatchSize: batchSize,
Source: source,
Source: source,
}

if err := migration.InitMigration(&mConfig); err != nil {
Expand Down
Loading
Loading