Skip to content

Parallel PutChunk RPC calls in WriteMV #1806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 121 additions & 55 deletions internal/dcache/clustermap/clustermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
Expand All @@ -55,6 +56,11 @@ func Update() {
clusterMap.loadLocalMap()
}

// Return Epoch value of the cached clustermap.
func GetEpoch() int64 {
return clusterMap.getEpoch()
}

// It will return online MVs Map <mvName, MV> as per local cache copy of cluster map.
func GetActiveMVs() map[string]dcache.MirroredVolume {
return clusterMap.getActiveMVs()
Expand Down Expand Up @@ -95,6 +101,14 @@ func GetRVs(mvName string) map[string]dcache.StateEnum {
return clusterMap.getRVs(mvName)
}

// Same as GetRVs() but also returns the clusterMap epoch that corresponds to the component RVs returned.
// Useful for callers who might want to refresh the clusterMap on receiving the NeedToRefreshClusterMap error
// from the server. They can refresh the clusterMap till they get a higher epoch value than the one corresponding
// to the component RVs which were dismissed by the server.
func GetRVsEx(mvName string) (map[string]dcache.StateEnum, int64) {
return clusterMap.getRVsEx(mvName)
}

// Return the state of the given RV from the local cache copy of cluster map.
func GetRVState(rvName string) dcache.StateEnum {
return clusterMap.getRVState(rvName)
Expand Down Expand Up @@ -150,17 +164,60 @@ func IsClusterReadonly() bool {
// Refresh clustermap local copy from the metadata store.
// Once RefreshClusterMap() completes successfully, any clustermap call made would return results from the
// updated clustermap.
// Note: Usually you will not need to work on the most uptodate clustermap, the last periodically refreshed copy
// higherThanEpoch is typically the current clustermap epoch value that solicited a NeedToRefreshClusterMap
// error from the server, so the caller is interested in a clustermap having epoch value higher than this.
// Note that it's not guaranteed that the next higher epoch would have the changes the caller expects, it's
// upto the caller to retry till it gets the required clusterMap.
// If you do not care about any specific clusterMap epoch but just want it to be refreshed once, pass 0 for
// higherThanEpoch.
//
// of clustermap should be fine for most users. This API must be used by callers which cannot safely proceed
// w/o knowing the latest clustermap. This should not be a common requirement and codepaths calling it should
// be very infrequently executed.
func RefreshClusterMap() error {
// Note: Usually you will not need to work on the most uptodate clustermap, the last periodically refreshed copy
// of clustermap should be fine for most users. This API must be used by callers which cannot safely proceed
// w/o knowing the latest clustermap. This should not be a common requirement and codepaths calling it should
// be very infrequently executed.

func RefreshClusterMap(higherThanEpoch int64) error {
// Clustermanager must call RegisterClusterMapSyncRefresher() in startup, so we don't expect this to be nil.
common.Assert(clusterMapRefresher != nil)
log.Debug("RefreshClusterMap: Fetching latest clustermap from metadata store")

return clusterMapRefresher()
//
// NeedToRefreshClusterMap return from the server typically means that the global clusterMap is always
// updated and client can simply refresh and get that, but sometimes server may update the global
// clusterMap after returning the NeedToRefreshClusterMap, so we try for a small time.
//
startTime := time.Now()
maxWait := 5 * time.Second

for {
// Time check.
elapsed := time.Since(startTime)
if elapsed > maxWait {
common.Assert(false)
return fmt.Errorf("RefreshClusterMap: timed out waiting for epoch %d, got %d",
higherThanEpoch+1, GetEpoch())
}

log.Debug("RefreshClusterMap: Fetching latest clustermap from metadata store")

err := clusterMapRefresher()
if err != nil {
common.Assert(false)
return fmt.Errorf("RefreshClusterMap: failed to fetch clusterMap: %v", err)
}

//
// Break if we got the desired epoch, else try after a small wait.
//
if GetEpoch() > higherThanEpoch {
break
}

log.Warn("RefreshClusterMap: Got epoch %d, while waiting for %d, retrying...",
GetEpoch(), higherThanEpoch+1)
time.Sleep(1 * time.Second)
}

return nil
}

// RegisterClusterMapRefresher is how the cluster_manager registers its real implementation.
Expand Down Expand Up @@ -204,13 +261,28 @@ var (
// methods for querying clustermap.
type ClusterMap struct {
localMap *dcache.ClusterMap
mu sync.RWMutex // Synchronizes access to localMap.
localClusterMapPath string
wg sync.WaitGroup // wait group for the processEvents() goroutine
}

func (c *ClusterMap) stop() {
}

// Use this to get the local clustermap pointer safe from update by loadLocalMap().
// Note: Do not use c.localMap directly.
func (c *ClusterMap) getLocalMap() *dcache.ClusterMap {
//
// TODO: Evaluate if atomic.Pointer is faster than RWMutex.
// Since we can have heavy read access while very infrequent write access, RWMutex seems to
// be better, but need to evaluate under extreme load.
//
c.mu.RLock()
defer c.mu.RUnlock()

common.Assert(c.localMap != nil)
return c.localMap
}

func (c *ClusterMap) loadLocalMap() {
data, err := os.ReadFile(c.localClusterMapPath)
if err != nil {
Expand All @@ -226,14 +298,19 @@ func (c *ClusterMap) loadLocalMap() {
return
}

c.mu.Lock()
defer c.mu.Unlock()

c.localMap = &newClusterMap
}

func (c *ClusterMap) getActiveMVs() map[string]dcache.MirroredVolume {
common.Assert(c.localMap != nil)
func (c *ClusterMap) getEpoch() int64 {
return c.getLocalMap().Epoch
}

func (c *ClusterMap) getActiveMVs() map[string]dcache.MirroredVolume {
activeMVs := make(map[string]dcache.MirroredVolume)
for mvName, mv := range c.localMap.MVMap {
for mvName, mv := range c.getLocalMap().MVMap {
if mv.State == dcache.StateOnline {
activeMVs[mvName] = mv
}
Expand All @@ -242,11 +319,10 @@ func (c *ClusterMap) getActiveMVs() map[string]dcache.MirroredVolume {
}

func (c *ClusterMap) getActiveMVNames() []string {
common.Assert(c.localMap != nil)

localMap := c.getLocalMap()
i := 0
activeMVNames := make([]string, len(c.localMap.MVMap))
for mvName, mv := range c.localMap.MVMap {
activeMVNames := make([]string, len(localMap.MVMap))
for mvName, mv := range localMap.MVMap {
if mv.State == dcache.StateOnline {
activeMVNames[i] = mvName
i++
Expand All @@ -256,10 +332,8 @@ func (c *ClusterMap) getActiveMVNames() []string {
}

func (c *ClusterMap) getDegradedMVs() map[string]dcache.MirroredVolume {
common.Assert(c.localMap != nil)

degradedMVs := make(map[string]dcache.MirroredVolume)
for mvName, mv := range c.localMap.MVMap {
for mvName, mv := range c.getLocalMap().MVMap {
if mv.State == dcache.StateDegraded {
degradedMVs[mvName] = mv
}
Expand All @@ -268,10 +342,8 @@ func (c *ClusterMap) getDegradedMVs() map[string]dcache.MirroredVolume {
}

func (c *ClusterMap) getOfflineMVs() map[string]dcache.MirroredVolume {
common.Assert(c.localMap != nil)

offlineMVs := make(map[string]dcache.MirroredVolume)
for mvName, mv := range c.localMap.MVMap {
for mvName, mv := range c.getLocalMap().MVMap {
if mv.State == dcache.StateOffline {
offlineMVs[mvName] = mv
}
Expand All @@ -281,43 +353,34 @@ func (c *ClusterMap) getOfflineMVs() map[string]dcache.MirroredVolume {

// Scan through the RV list and return the set of all nodes which have contributed at least one RV.
func (c *ClusterMap) getAllNodes() map[string]struct{} {
common.Assert(c.localMap != nil)

nodesMap := make(map[string]struct{})

for _, rv := range c.localMap.RVMap {
for _, rv := range c.getLocalMap().RVMap {
nodesMap[rv.NodeId] = struct{}{}
}

return nodesMap
}

func (c *ClusterMap) isClusterReadonly() bool {
common.Assert(c.localMap != nil)

return c.localMap.Readonly
return c.getLocalMap().Readonly
}

func (c *ClusterMap) getCacheConfig() *dcache.DCacheConfig {
common.Assert(c.localMap != nil)

return &c.localMap.Config
return &c.getLocalMap().Config
}

func (c *ClusterMap) getClusterMap() dcache.ClusterMap {
common.Assert(c.localMap != nil)
return *c.localMap
return *c.getLocalMap()
}

// Get RVs belonging to this node.
func (c *ClusterMap) getMyRVs() map[string]dcache.RawVolume {
common.Assert(c.localMap != nil)

nodeId, err := common.GetNodeUUID()
common.Assert(err == nil, fmt.Sprintf("Error getting nodeId: %v", err))

myRvs := make(map[string]dcache.RawVolume)
for name, rv := range c.localMap.RVMap {
for name, rv := range c.getLocalMap().RVMap {
if rv.NodeId == nodeId {
myRvs[name] = rv
}
Expand All @@ -334,16 +397,31 @@ func (c *ClusterMap) isMyRV(rvName string) bool {

// Get component RVs for the given MV.
func (c *ClusterMap) getRVs(mvName string) map[string]dcache.StateEnum {
mv, ok := c.localMap.MVMap[mvName]
mv, ok := c.getLocalMap().MVMap[mvName]
if !ok {
log.Err("ClusterMap::getRVs: no mirrored volume named %s", mvName)
return nil
}
return mv.RVs
}

// Get component RVs for the given MV, along with the clustermap epoch.
func (c *ClusterMap) getRVsEx(mvName string) (map[string]dcache.StateEnum, int64) {
//
// Save a copy of the clusterMap pointer to use for accessing MVMap and Epoch, so that both
// correspond to the same instance of clusterMap.
//
localMap := c.getLocalMap()
mv, ok := localMap.MVMap[mvName]
if !ok {
log.Err("ClusterMap::getRVs: no mirrored volume named %s", mvName)
return nil, -1
}
return mv.RVs, localMap.Epoch
}

func (c *ClusterMap) getRVState(rvName string) dcache.StateEnum {
rv, ok := c.localMap.RVMap[rvName]
rv, ok := c.getLocalMap().RVMap[rvName]
if !ok {
log.Err("ClusterMap::getRVState: no raw volume named %s", rvName)
common.Assert(false, rvName)
Expand All @@ -356,9 +434,7 @@ func (c *ClusterMap) getRVState(rvName string) dcache.StateEnum {
}

func (c *ClusterMap) isOnline(nodeId string) bool {
common.Assert(c.localMap != nil)

for _, rv := range c.localMap.RVMap {
for _, rv := range c.getLocalMap().RVMap {
if rv.NodeId == nodeId {
return rv.State == dcache.StateOnline
}
Expand Down Expand Up @@ -395,9 +471,7 @@ func (c *ClusterMap) lowestIndexOnlineRV(mv dcache.MirroredVolume) string {
}

func (c *ClusterMap) nodeIdToIP(nodeId string) string {
common.Assert(c.localMap != nil)

for _, rv := range c.localMap.RVMap {
for _, rv := range c.getLocalMap().RVMap {
if rv.NodeId == nodeId {
return rv.IPAddress
}
Expand All @@ -411,9 +485,7 @@ func (c *ClusterMap) nodeIdToIP(nodeId string) string {
}

func (c *ClusterMap) rVNameToNodeId(rvName string) string {
common.Assert(c.localMap != nil)

rv, ok := c.localMap.RVMap[rvName]
rv, ok := c.getLocalMap().RVMap[rvName]
if !ok {
log.Debug("ClusterMap::rvNameToId: rvName %s not found", rvName)
// Callers should not call for non-existent RV.
Expand All @@ -425,9 +497,7 @@ func (c *ClusterMap) rVNameToNodeId(rvName string) string {
}

func (c *ClusterMap) rvIdToName(rvId string) string {
common.Assert(c.localMap != nil)

for rvName, rv := range c.localMap.RVMap {
for rvName, rv := range c.getLocalMap().RVMap {
if rv.RvId == rvId {
// TODO: Uncomment once we move IsValidRVName() and other utility functions to clustermap package.
//common.Assert(IsValidRVName(rvName))
Expand All @@ -443,9 +513,7 @@ func (c *ClusterMap) rvIdToName(rvId string) string {
}

func (c *ClusterMap) rvNameToId(rvName string) string {
common.Assert(c.localMap != nil)

rv, ok := c.localMap.RVMap[rvName]
rv, ok := c.getLocalMap().RVMap[rvName]
if !ok {
log.Debug("ClusterMap::rvNameToId: rvName %s not found", rvName)
// Callers should not call for non-existent RV.
Expand All @@ -456,9 +524,7 @@ func (c *ClusterMap) rvNameToId(rvName string) string {
}

func (c *ClusterMap) rVNameToIp(rvName string) string {
common.Assert(c.localMap != nil)

rv, ok := c.localMap.RVMap[rvName]
rv, ok := c.getLocalMap().RVMap[rvName]
if !ok {
log.Debug("ClusterMap::rVNameToIp: rvName %s not found", rvName)
// Callers should not call for non-existent RV.
Expand Down
Loading