Skip to content

Commit

Permalink
add logs for s3 state operations
Browse files Browse the repository at this point in the history
Relates to #221

---

Pull Request resolved: #222
commit_hash:2e96e735c73ea73f985f52d08152eb5b005319d6
  • Loading branch information
laskoviymishka authored and robot-piglet committed Feb 20, 2025
1 parent fd08e3c commit be77a53
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/trcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func main() {
}
case "s3":
var err error
cp, err = s3coordinator.NewS3(coordinatorS3Bucket)
cp, err = s3coordinator.NewS3(coordinatorS3Bucket, logger.Log)
if err != nil {
return xerrors.Errorf("unable to load s3 coordinator: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/coordinator/s3coordinator/coordinator_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type CoordinatorS3 struct {
state map[string]map[string]*coordinator.TransferStateData
s3Client *s3.S3
bucket string
lgr log.Logger
}

// GetTransferState fetches all state objects with a given transferID (prefix).
Expand Down Expand Up @@ -72,6 +73,7 @@ func (c *CoordinatorS3) GetTransferState(transferID string) (map[string]*coordin
}
state[strings.TrimSuffix(key, ".json")] = &transferData
}
c.lgr.Info("load transfer state", log.Any("transfer_id", transferID), log.Any("state", state))

return state, nil
}
Expand All @@ -94,6 +96,7 @@ func (c *CoordinatorS3) SetTransferState(transferID string, state map[string]*co
return xerrors.Errorf("failed to upload state object: %w", err)
}
}
c.lgr.Info("set transfer state", log.Any("transfer_id", transferID), log.Any("state", state))
return nil
}

Expand All @@ -110,6 +113,7 @@ func (c *CoordinatorS3) RemoveTransferState(transferID string, keys []string) er
return xerrors.Errorf("failed to delete state object: %w", err)
}
}
c.lgr.Info("remove transfer state keys", log.Any("transfer_id", transferID), log.Any("keys", keys))
return nil
}

Expand Down Expand Up @@ -381,7 +385,7 @@ func (c *CoordinatorS3) listObjects(prefix string) ([]*s3.Object, error) {
}

// NewS3 creates a new CoordinatorS3 with AWS SDK v1.
func NewS3(bucket string, cfgs ...*aws.Config) (*CoordinatorS3, error) {
func NewS3(bucket string, l log.Logger, cfgs ...*aws.Config) (*CoordinatorS3, error) {
sess, err := session.NewSession(cfgs...)
if err != nil {
return nil, xerrors.Errorf("unable to create AWS session: %w", err)
Expand All @@ -397,5 +401,6 @@ func NewS3(bucket string, cfgs ...*aws.Config) (*CoordinatorS3, error) {
state: map[string]map[string]*coordinator.TransferStateData{},
bucket: bucket,
s3Client: s3Client,
lgr: log.With(l, log.Any("component", "s3-coordinator")),
}, nil
}
16 changes: 10 additions & 6 deletions pkg/coordinator/s3coordinator/coordinator_s3_recipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,16 @@ func NewS3Recipe(bucket string) (*CoordinatorS3, error) {
// No need to check error because maybe the bucket already exists
logger.Log.Info("create bucket result", log.Any("res", res), log.Error(err))
}
cp, err := NewS3(bucket, &aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(accessKey, secret, ""),
Endpoint: aws.String(endpoint),
S3ForcePathStyle: aws.Bool(true), // Enable path-style access
})
cp, err := NewS3(
bucket,
logger.Log,
&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(accessKey, secret, ""),
Endpoint: aws.String(endpoint),
S3ForcePathStyle: aws.Bool(true), // Enable path-style access
},
)
if err != nil {
return nil, xerrors.Errorf("unable to create s3 coordinator: %w", err)
}
Expand Down

0 comments on commit be77a53

Please sign in to comment.