Skip to content

Commit

Permalink
Update balancer to be completely cluster aware
Browse files Browse the repository at this point in the history
Here we change the balancer to build it's internal state of the
world from a cluster first approach, instead of a leader first
approach.

This contains leader step downs and determining the balanced
distribution, improving the accuracy of our rebalancing algorithm.
  • Loading branch information
ploubser committed Jan 23, 2025
1 parent 049c6fd commit 973f240
Showing 1 changed file with 121 additions and 58 deletions.
179 changes: 121 additions & 58 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ type peer struct {
name string
entities []balanceEntity
offset int
clusters map[string]bool
}

type cluster struct {
name string
peers map[string]*peer
balancedDistribution int
}

const BALANCERUNS = 5

// New returns a new instance of the Balancer
func New(nc *nats.Conn, log api.Logger) (*Balancer, error) {
mgr, err := jsm.New(nc)
Expand Down Expand Up @@ -80,79 +87,68 @@ func (b *Balancer) calcOffset(servers *map[string]*peer, evenDistribution int) {
}
}

// We consider a balanced stream to be one with an offset of 0 or less
func (b *Balancer) isBalanced(servers map[string]*peer) bool {
for _, s := range servers {
if s.offset > 0 {
return false
}
}

return true
}

func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string]*peer) (map[string]*peer, error) {
info, err := entity.ClusterInfo()
func (b *Balancer) createClusterMappings(e balanceEntity, clusterMap map[string]*cluster) (map[string]*cluster, error) {
info, err := e.ClusterInfo()
if err != nil {
return nil, err
}
clustername := info.Name
_, ok := clusterMap[clustername]
if !ok {
tmp := cluster{
name: clustername,
peers: map[string]*peer{},
balancedDistribution: 0,
}
clusterMap[clustername] = &tmp
}

leadername := info.Leader
active := clusterMap[clustername]

leaderName := info.Leader
_, ok := serverMap[leaderName]
_, ok = active.peers[leadername]
if !ok {
tmp := peer{
name: leaderName,
name: leadername,
entities: []balanceEntity{},
clusters: map[string]bool{},
offset: 0,
}
serverMap[leaderName] = &tmp
active.peers[leadername] = &tmp
}
serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity)
serverMap[leaderName].clusters[info.Name] = true
active.peers[leadername].entities = append(active.peers[leadername].entities, e)

for _, replica := range info.Replicas {
_, ok = serverMap[replica.Name]
_, ok = active.peers[replica.Name]
if !ok {
tmp := &peer{
name: replica.Name,
entities: []balanceEntity{},
clusters: map[string]bool{info.Name: true},
offset: 0,
}
serverMap[replica.Name] = tmp
active.peers[replica.Name] = tmp
}
}

return serverMap, nil
return clusterMap, nil
}

func (b *Balancer) calcDistribution(entities, servers int) int {
evenDistributionf := float64(entities) / float64(servers)
if evenDistributionf == float64(int64(evenDistributionf)) {
return int(evenDistributionf)
}
return int(math.Ceil(evenDistributionf + 0.5))
}

func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeHint string) (int, error) {
func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, cluster string, typeHint string) (int, error) {
steppedDown := 0
for !b.isBalanced(servers) {
for i := 1; i <= BALANCERUNS; i++ {
for _, s := range servers {
if s.offset > 0 {
b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset)
b.log.Infof("Rebalancing server '%s' with offset of %d", s.name, s.offset)
retries := 0
for s.offset > 0 {
// find a random stream (or consumer) to move to another server
randomIndex := rand.Intn(len(s.entities))
entity := s.entities[randomIndex]
clusterinfo, err := entity.ClusterInfo()
if err != nil {
return 0, fmt.Errorf("unable to get clusterinfo for %s '%s'. %s", typeHint, entity.Name(), err)
}

b.log.Infof("Moving %s to available server in cluster %s", entity.Name(), cluster)
for _, ns := range servers {
if ns.offset < 0 && ns.clusters[clusterinfo.Name] {
if ns.offset < 0 {
b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name)
placement := api.Placement{Preferred: ns.name, Cluster: clusterinfo.Name}
placement := api.Placement{Preferred: ns.name, Cluster: cluster}
err := entity.LeaderStepDown(&placement)
if err != nil {
b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err)
Expand Down Expand Up @@ -185,46 +181,113 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeH
return steppedDown, nil
}

func (b *Balancer) calcClusterDistribution(c *cluster) int {
servercount := len(c.peers)
entitycount := 0

for _, e := range c.peers {
entitycount += len(e.entities)
}

evenDistributionf := float64(entitycount) / float64(servercount)
if evenDistributionf == float64(int64(evenDistributionf)) {
return int(evenDistributionf)
}
return int(math.Ceil(evenDistributionf + 0.5))
}

func (b *Balancer) calcLeaderOffset(p map[string]*peer, distribution int) {
for _, v := range p {
v.offset = len(v.entities) - distribution
}
}

func (b Balancer) logClusterStats(clusterMap map[string]*cluster) {
for k, v := range clusterMap {
b.log.Infof("")
b.log.Infof("Found cluster %s with a balanced distribution of %d", k, v.balancedDistribution)
b.log.Infof("Cluster %s has %d available peers", k, len(v.peers))
for _, server := range v.peers {
b.log.Infof("Nats server '%s' is the lead of %d entities with a offset of %d", server.name, len(server.entities), server.offset)
}
b.log.Infof("")
}
}

// BalanceStreams finds the expected distribution of stream leaders over servers
// and forces leader election on any with an uneven distribution
func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) {
var err error
servers := map[string]*peer{}
clusterMap := map[string]*cluster{}
balanced := 0

// Formulate our view of the world and all it's clusters
b.log.Debugf("building relationship between clusters, leaders and streams")
for _, s := range streams {
servers, err = b.mapEntityToServers(s, servers)
clusterMap, err = b.createClusterMappings(s, clusterMap)
if err != nil {
return 0, err
}
}

b.log.Debugf("found %d streams on %d servers\n", len(streams), len(servers))
evenDistribution := b.calcDistribution(len(streams), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)
// Figure out what the balanced distribution for each cluster looks like
for k, v := range clusterMap {
b.log.Debugf("calculating balanced distribution for Nats cluster %s", k)
v.balancedDistribution = b.calcClusterDistribution(v)

b.log.Debugf("calculating offset for each server in Nats cluster %s", k)
b.calcLeaderOffset(v.peers, v.balancedDistribution)
}

// Balance each cluster
for k, v := range clusterMap {
b.logClusterStats(clusterMap)
b.log.Debugf("balancing streams on cluster %s", k)
b, err := b.balance(v.peers, v.balancedDistribution, k, "stream")
if err != nil {
return 0, err
}
balanced += b
}

return b.balance(servers, evenDistribution, "stream")
return balanced, nil
}

// BalanceConsumers finds the expected distribution of consumer leaders over servers
// and forces leader election on any with an uneven distribution
func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) {
var err error
servers := map[string]*peer{}
clusterMap := map[string]*cluster{}
balanced := 0

// Formulate our view of the world and all it's clusters
b.log.Debugf("building relationship between clusters, leaders and consumers")
for _, s := range consumers {
servers, err = b.mapEntityToServers(s, servers)
clusterMap, err = b.createClusterMappings(s, clusterMap)
if err != nil {
return 0, err
}
}

b.log.Debugf("found %d consumers on %d servers\n", len(consumers), len(servers))
evenDistribution := b.calcDistribution(len(consumers), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)
// Figure out what the balanced distribution for each cluster looks like
for k, v := range clusterMap {
b.log.Debugf("calculating balanced distribution for Nats cluster %s", k)
v.balancedDistribution = b.calcClusterDistribution(v)

b.log.Debugf("calculating offset for each server in Nats cluster %s", k)
b.calcLeaderOffset(v.peers, v.balancedDistribution)
}

// Balance each cluster
for k, v := range clusterMap {
b.logClusterStats(clusterMap)
b.log.Debugf("balancing consumers on cluster %s", k)
b, err := b.balance(v.peers, v.balancedDistribution, k, "consumer")
if err != nil {
return 0, err
}
balanced += b
}

return b.balance(servers, evenDistribution, "consumer")
return balanced, nil
}

0 comments on commit 973f240

Please sign in to comment.