Skip to content

Commit

Permalink
Merge pull request #143 from 0chain/s3migration-cloud
Browse files Browse the repository at this point in the history
S3migration cloud
  • Loading branch information
Jayashsatolia403 authored Feb 16, 2025
2 parents 8214ce3 + 39401a5 commit 27b6438
Show file tree
Hide file tree
Showing 19 changed files with 1,562 additions and 241 deletions.
170 changes: 170 additions & 0 deletions azure/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package azure

import (
"context"
"fmt"
"io"
"os"
"path"
"time"

zlogger "github.com/0chain/s3migration/logger"
T "github.com/0chain/s3migration/types"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

type AzureClient struct {
service *azblob.Client
workDir string
newerThan *time.Time
olderThan *time.Time
containerName string
}

func NewAzureClient(workDir, accountName, connectionString, containerName string, newerThan *time.Time, olderThan *time.Time) (*AzureClient, error) {
// Create the client
blobURL := fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
zlogger.Logger.Info("bloburl", blobURL, accountName)

client, err := azblob.NewClientFromConnectionString(connectionString, &azblob.ClientOptions{Audience: blobURL})
if err != nil {
return nil, fmt.Errorf("failed to create blob client: %v", err)
}
zlogger.Logger.Info("bloburl", blobURL)
pager := client.NewListContainersPager(&azblob.ListContainersOptions{})

_, err = pager.NextPage(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to validate client access: %v", err)
}

return &AzureClient{
service: client,
workDir: workDir,
newerThan: newerThan,
olderThan: olderThan,
containerName: containerName,
}, nil
}
func (g *AzureClient) ListFiles(ctx context.Context) (<-chan *T.ObjectMeta, <-chan error) {
objectChan := make(chan *T.ObjectMeta)
errChan := make(chan error)

go func() {
defer func() {
close(objectChan)
close(errChan)
}()
pager := g.service.NewListBlobsFlatPager(g.containerName, &azblob.ListBlobsFlatOptions{
Include: azblob.ListBlobsInclude{Snapshots: true, Versions: true},
})

for pager.More() {
fmt.Println("Pager has more data:", pager.More())
resp, err := pager.NextPage(ctx)
if err != nil {
zlogger.Logger.Error("Error fetching page", err)
errChan <- err
return
}

for _, blob := range resp.Segment.BlobItems {
fmt.Println(*blob.Name)
lastModified := blob.Properties.LastModified
if (g.newerThan == nil || g.newerThan.Unix() == 0 || lastModified.Unix() >= g.newerThan.Unix()) &&
(g.olderThan == nil || g.olderThan.Unix() == 0 || lastModified.Unix() <= g.olderThan.Unix()) {
fmt.Println("Pushing object to channel:", *blob.Name)
objectChan <- &T.ObjectMeta{
Key: *blob.Name,
Size: *blob.Properties.ContentLength,
ContentType: *blob.Properties.ContentType,
Ext: path.Ext(*blob.Name),
}
}
}
}

}()

return objectChan, errChan
}

func (g *AzureClient) GetFileContent(ctx context.Context, fileID string) (*T.Object, error) {
resp, err := g.service.DownloadStream(ctx, g.containerName, fileID, nil)
if err != nil {
return nil, err
}

obj := &T.Object{
Body: resp.Body,
ContentType: *resp.ContentType,
ContentLength: *resp.ContentLength,
}

return obj, nil
}

func (g *AzureClient) DeleteFile(ctx context.Context, fileID string) error {
_, err := g.service.DeleteBlob(ctx, g.containerName, fileID, nil)
if err != nil {
return err
}
return nil
}

func (g *AzureClient) DownloadToFile(ctx context.Context, fileID string) (string, error) {
resp, err := g.service.DownloadStream(ctx, g.containerName, fileID, nil)
if err != nil {
return "", err
}

zlogger.Logger.Info(fmt.Sprintf("Original File Name: %s", fileID))
destinationPath := fileID

out, err := os.Create(destinationPath)
if err != nil {
return "", err
}

defer out.Close()

_, err = io.Copy(out, resp.Body)
if err != nil {
return "", err
}

zlogger.Logger.Info(fmt.Sprintf("Downloaded file ID: %s to %s\n", fileID, destinationPath))
return destinationPath, nil
}

func (g *AzureClient) DownloadToMemory(ctx context.Context, fileID string, offset int64, chunkSize, fileSize int64) ([]byte, error) {
limit := offset + chunkSize - 1
if limit > fileSize {
limit = fileSize
}

resp, err := g.service.DownloadStream(ctx, g.containerName, fileID, &azblob.DownloadStreamOptions{
Range: azblob.HTTPRange{ // Pass by value, no `&` needed here
Offset: offset,
Count: chunkSize,
},
})

if err != nil {
return nil, fmt.Errorf("failed to download chunk: %w", err)
}
defer resp.Body.Close()

data := make([]byte, chunkSize)

n, err := io.ReadFull(resp.Body, data)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, fmt.Errorf("error reading blob content: %w", err)
}

if int64(n) < chunkSize && fileSize != chunkSize {
data = data[:n]
}

return data, nil
}
137 changes: 137 additions & 0 deletions azure/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package azure

import (
"context"
"fmt"
"testing"

zlogger "github.com/0chain/s3migration/logger"
)

var (
connectionString = ""
testFileID = ""
workDir = ""
accountName = ""
)

func TestAzureClient_ListFiles(t *testing.T) {
client, err := NewAzureClient(workDir, accountName, connectionString, workDir, nil, nil)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
return
}

ctx := context.Background()
objectChan, errChan := client.ListFiles(ctx)

go func() {
for err := range errChan {
zlogger.Logger.Error(fmt.Sprintf("err while list files: %v", err))
}
}()

for object := range objectChan {
zlogger.Logger.Info(fmt.Sprintf("file:%s, size: %d bytes, type: %s", object.Key, object.Size, object.ContentType))
}
zlogger.Logger.Info("DATA")
}

func TestAzureClient_GetFileContent(t *testing.T) {
client, err := NewAzureClient(workDir, accountName, connectionString, workDir, nil, nil)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("Failed to creating Google Drive client: %v", err))
return
}

ctx := context.Background()
fileID := testFileID
obj, err := client.GetFileContent(ctx, fileID)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while getting file content: %v", err))
return
}

defer obj.Body.Close()

zlogger.Logger.Info(fmt.Sprintf("file content type: %s, length: %d", obj.ContentType, obj.ContentLength))

if (obj.Body == nil) || (obj.ContentLength == 0) {
zlogger.Logger.Info("empty file content")
return
}

buf := make([]byte, obj.ContentLength)
n, err := obj.Body.Read(buf)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while read file content: %v", err))
return
}
zlogger.Logger.Info(fmt.Sprintf("read data: %s", buf[:n]))
}

func TestAzureClient_DeleteFile(t *testing.T) {
client, err := NewAzureClient(workDir, accountName, connectionString, workDir, nil, nil)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
return
}

ctx := context.Background()
fileID := testFileID
err = client.DeleteFile(ctx, fileID)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while delete file: %v", err))
return
}
zlogger.Logger.Error(fmt.Sprintf("file: %s deleted successfully", fileID))
}

func TestAzureClient_DownloadToFile(t *testing.T) {
client, err := NewAzureClient(workDir, accountName, connectionString, workDir, nil, nil)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
}

ctx := context.Background()
fileID := testFileID
destinationPath, err := client.DownloadToFile(ctx, fileID)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while downloading file: %v", err))
return
}
zlogger.Logger.Info(fmt.Sprintf("downloaded to: %s", destinationPath))
}

func TestAzureClient_DownloadToMemory(t *testing.T) {
client, err := NewAzureClient(workDir, accountName, connectionString, workDir, nil, nil)
if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while creating Google Drive client: %v", err))
}

ctx := context.Background()

fileID := testFileID

offset := int64(0)

// download only half chunk for testing
chunkSize := int64(313)

fileSize := int64(626)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while getting file size: %v", err))
return
}

data, err := client.DownloadToMemory(ctx, fileID, offset, chunkSize, fileSize)

if err != nil {
zlogger.Logger.Error(fmt.Sprintf("err while downloading file: %v", err))
return
}

zlogger.Logger.Info(fmt.Sprintf("downloaded data: %s", data))
}
Loading

0 comments on commit 27b6438

Please sign in to comment.