Skip to content

Commit

Permalink
Implemented caching of the webdav clients for OCM, with fallback to v…
Browse files Browse the repository at this point in the history
…1.0 access
  • Loading branch information
glpatcern committed Feb 20, 2025
1 parent 6af8c62 commit 2076af9
Showing 1 changed file with 59 additions and 5 deletions.
64 changes: 59 additions & 5 deletions pkg/ocm/storage/received/ocm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/url"
"path/filepath"
"strings"
"sync"
"time"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand All @@ -49,9 +51,17 @@ func init() {
registry.Register("ocmreceived", New)
}

type cachedClient struct {
client *gowebdav.Client
share *ocmpb.ReceivedShare
expiresAt time.Time
}

type driver struct {
c *config
gateway gateway.GatewayAPIClient
ccache map[string]*cachedClient
mu sync.Mutex
}

type config struct {
Expand All @@ -78,8 +88,9 @@ func New(ctx context.Context, m map[string]interface{}) (storage.FS, error) {
d := &driver{
c: &c,
gateway: gateway,
ccache: make(map[string]*cachedClient), // this is a cache of webdav clients
}

go d.ccacheCleanupThread()
return d, nil
}

Expand All @@ -106,7 +117,6 @@ func shareInfoFromReference(ref *provider.Reference) (*ocmpb.ShareId, string) {
}

func (d *driver) getWebDAVFromShare(ctx context.Context, shareID *ocmpb.ShareId) (*ocmpb.ReceivedShare, string, string, error) {
// TODO: we may want to cache the share
res, err := d.gateway.GetReceivedOCMShare(ctx, &ocmpb.GetReceivedOCMShareRequest{
Ref: &ocmpb.ShareReference{
Spec: &ocmpb.ShareReference_Id{
Expand Down Expand Up @@ -143,13 +153,22 @@ func getWebDAVProtocol(protocols []*ocmpb.Protocol) (*ocmpb.WebDAVProtocol, bool
}

func (d *driver) webdavClient(ctx context.Context, ref *provider.Reference) (*gowebdav.Client, *ocmpb.ReceivedShare, string, error) {
log := appctx.GetLogger(ctx)
id, rel := shareInfoFromReference(ref)

// check first if we have a cached webdav client
d.mu.Lock()
defer d.mu.Unlock()
if entry, found := d.ccache[id.OpaqueId]; found {
log.Info().Interface("share", entry.share).Str("rel", rel).Msg("Using cached client to access OCM share")
return entry.client, entry.share, rel, nil
}

// we don't, build a webdav client
share, endpoint, secret, err := d.getWebDAVFromShare(ctx, id)
if err != nil {
return nil, nil, "", err
}

endpoint, err = url.PathUnescape(endpoint)
if err != nil {
return nil, nil, "", err
Expand All @@ -158,9 +177,21 @@ func (d *driver) webdavClient(ctx context.Context, ref *provider.Reference) (*go
// use the secret as bearer authentication according to OCM v1.1+
c := gowebdav.NewClient(endpoint, "", "")
c.SetHeader("Authorization", "Bearer "+secret)
_, err = c.Stat(rel)
if err != nil {
// if we got an error, try to use OCM v1.0 basic auth
log.Info().Str("endpoint", endpoint).Interface("share", share).Str("rel", rel).Str("secret", secret).Err(err).Msg("falling back to OCM v1.0 access")
c.SetHeader("Authorization", "Basic "+secret+":")
} else {
log.Info().Str("endpoint", endpoint).Interface("share", share).Str("rel", rel).Str("secret", secret).Msg("using OCM v1.1 access")
}

log := appctx.GetLogger(ctx)
log.Info().Str("endpoint", endpoint).Interface("share", share).Str("rel", rel).Str("secret", secret).Msg("Accessing OCM share")
// add to cache and return
d.ccache[id.OpaqueId] = &cachedClient{
client: c,
expiresAt: time.Now().Add(1 * time.Hour),
share: share,
}
return c, share, rel, nil
}

Expand Down Expand Up @@ -414,3 +445,26 @@ func (d *driver) CreateStorageSpace(ctx context.Context, req *provider.CreateSto
func (d *driver) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
return nil, errtypes.NotSupported("operation not supported")
}

// Cleanup function to remove expired cache entries
func (d *driver) cleanupCache() {
d.mu.Lock()
defer d.mu.Unlock()

now := time.Now()
for key, entry := range d.ccache {
if now.After(entry.expiresAt) {
delete(d.ccache, key)
}
}
}

// Periodic cache cleanup goroutine
func (d *driver) ccacheCleanupThread() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()

for range ticker.C {
d.cleanupCache()
}
}

0 comments on commit 2076af9

Please sign in to comment.