diff --git a/balancer/balancer.go b/balancer/balancer.go index 8282a7df..aad25eec 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -49,9 +49,16 @@ type peer struct { name string entities []balanceEntity offset int - clusters map[string]bool } +type cluster struct { + name string + peers map[string]*peer + balancedDistribution int +} + +const BALANCERUNS = 5 + // New returns a new instance of the Balancer func New(nc *nats.Conn, log api.Logger) (*Balancer, error) { mgr, err := jsm.New(nc) @@ -80,79 +87,68 @@ func (b *Balancer) calcOffset(servers *map[string]*peer, evenDistribution int) { } } -// We consider a balanced stream to be one with an offset of 0 or less -func (b *Balancer) isBalanced(servers map[string]*peer) bool { - for _, s := range servers { - if s.offset > 0 { - return false - } - } - - return true -} - -func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string]*peer) (map[string]*peer, error) { - info, err := entity.ClusterInfo() +func (b *Balancer) createClusterMappings(e balanceEntity, clusterMap map[string]*cluster) (map[string]*cluster, error) { + info, err := e.ClusterInfo() if err != nil { return nil, err } + clustername := info.Name + _, ok := clusterMap[clustername] + if !ok { + tmp := cluster{ + name: clustername, + peers: map[string]*peer{}, + balancedDistribution: 0, + } + clusterMap[clustername] = &tmp + } + + leadername := info.Leader + active := clusterMap[clustername] - leaderName := info.Leader - _, ok := serverMap[leaderName] + _, ok = active.peers[leadername] if !ok { tmp := peer{ - name: leaderName, + name: leadername, entities: []balanceEntity{}, - clusters: map[string]bool{}, + offset: 0, } - serverMap[leaderName] = &tmp + active.peers[leadername] = &tmp } - serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity) - serverMap[leaderName].clusters[info.Name] = true + active.peers[leadername].entities = append(active.peers[leadername].entities, e) for _, replica := range info.Replicas { - _, ok = serverMap[replica.Name] + _, ok = active.peers[replica.Name] if !ok { tmp := &peer{ name: replica.Name, entities: []balanceEntity{}, - clusters: map[string]bool{info.Name: true}, + offset: 0, } - serverMap[replica.Name] = tmp + active.peers[replica.Name] = tmp } } - return serverMap, nil + return clusterMap, nil } -func (b *Balancer) calcDistribution(entities, servers int) int { - evenDistributionf := float64(entities) / float64(servers) - if evenDistributionf == float64(int64(evenDistributionf)) { - return int(evenDistributionf) - } - return int(math.Ceil(evenDistributionf + 0.5)) -} - -func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeHint string) (int, error) { +func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, cluster string, typeHint string) (int, error) { steppedDown := 0 - for !b.isBalanced(servers) { + for i := 1; i <= BALANCERUNS; i++ { for _, s := range servers { if s.offset > 0 { - b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset) + b.log.Infof("Rebalancing server '%s' with offset of %d", s.name, s.offset) retries := 0 for s.offset > 0 { // find a random stream (or consumer) to move to another server randomIndex := rand.Intn(len(s.entities)) entity := s.entities[randomIndex] - clusterinfo, err := entity.ClusterInfo() - if err != nil { - return 0, fmt.Errorf("unable to get clusterinfo for %s '%s'. %s", typeHint, entity.Name(), err) - } + b.log.Infof("Moving %s to available server in cluster %s", entity.Name(), cluster) for _, ns := range servers { - if ns.offset < 0 && ns.clusters[clusterinfo.Name] { + if ns.offset < 0 { b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name) - placement := api.Placement{Preferred: ns.name, Cluster: clusterinfo.Name} + placement := api.Placement{Preferred: ns.name, Cluster: cluster} err := entity.LeaderStepDown(&placement) if err != nil { b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err) @@ -185,46 +181,113 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeH return steppedDown, nil } +func (b *Balancer) calcClusterDistribution(c *cluster) int { + servercount := len(c.peers) + entitycount := 0 + + for _, e := range c.peers { + entitycount += len(e.entities) + } + + evenDistributionf := float64(entitycount) / float64(servercount) + if evenDistributionf == float64(int64(evenDistributionf)) { + return int(evenDistributionf) + } + return int(math.Ceil(evenDistributionf + 0.5)) +} + +func (b *Balancer) calcLeaderOffset(p map[string]*peer, distribution int) { + for _, v := range p { + v.offset = len(v.entities) - distribution + } +} + +func (b Balancer) logClusterStats(clusterMap map[string]*cluster) { + for k, v := range clusterMap { + b.log.Infof("") + b.log.Infof("Found cluster %s with a balanced distribution of %d", k, v.balancedDistribution) + b.log.Infof("Cluster %s has %d available peers", k, len(v.peers)) + for _, server := range v.peers { + b.log.Infof("Nats server '%s' is the lead of %d entities with a offset of %d", server.name, len(server.entities), server.offset) + } + b.log.Infof("") + } +} + // BalanceStreams finds the expected distribution of stream leaders over servers // and forces leader election on any with an uneven distribution func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) { var err error - servers := map[string]*peer{} + clusterMap := map[string]*cluster{} + balanced := 0 + // Formulate our view of the world and all it's clusters + b.log.Debugf("building relationship between clusters, leaders and streams") for _, s := range streams { - servers, err = b.mapEntityToServers(s, servers) + clusterMap, err = b.createClusterMappings(s, clusterMap) if err != nil { return 0, err } } - b.log.Debugf("found %d streams on %d servers\n", len(streams), len(servers)) - evenDistribution := b.calcDistribution(len(streams), len(servers)) - b.log.Debugf("even distribution is %d\n", evenDistribution) - b.log.Debugf("calculating offset for each server") - b.calcOffset(&servers, evenDistribution) + // Figure out what the balanced distribution for each cluster looks like + for k, v := range clusterMap { + b.log.Debugf("calculating balanced distribution for Nats cluster %s", k) + v.balancedDistribution = b.calcClusterDistribution(v) + + b.log.Debugf("calculating offset for each server in Nats cluster %s", k) + b.calcLeaderOffset(v.peers, v.balancedDistribution) + } + + // Balance each cluster + for k, v := range clusterMap { + b.logClusterStats(clusterMap) + b.log.Debugf("balancing streams on cluster %s", k) + b, err := b.balance(v.peers, v.balancedDistribution, k, "stream") + if err != nil { + return 0, err + } + balanced += b + } - return b.balance(servers, evenDistribution, "stream") + return balanced, nil } // BalanceConsumers finds the expected distribution of consumer leaders over servers // and forces leader election on any with an uneven distribution func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) { var err error - servers := map[string]*peer{} + clusterMap := map[string]*cluster{} + balanced := 0 + // Formulate our view of the world and all it's clusters + b.log.Debugf("building relationship between clusters, leaders and consumers") for _, s := range consumers { - servers, err = b.mapEntityToServers(s, servers) + clusterMap, err = b.createClusterMappings(s, clusterMap) if err != nil { return 0, err } } - b.log.Debugf("found %d consumers on %d servers\n", len(consumers), len(servers)) - evenDistribution := b.calcDistribution(len(consumers), len(servers)) - b.log.Debugf("even distribution is %d\n", evenDistribution) - b.log.Debugf("calculating offset for each server") - b.calcOffset(&servers, evenDistribution) + // Figure out what the balanced distribution for each cluster looks like + for k, v := range clusterMap { + b.log.Debugf("calculating balanced distribution for Nats cluster %s", k) + v.balancedDistribution = b.calcClusterDistribution(v) + + b.log.Debugf("calculating offset for each server in Nats cluster %s", k) + b.calcLeaderOffset(v.peers, v.balancedDistribution) + } + + // Balance each cluster + for k, v := range clusterMap { + b.logClusterStats(clusterMap) + b.log.Debugf("balancing consumers on cluster %s", k) + b, err := b.balance(v.peers, v.balancedDistribution, k, "consumer") + if err != nil { + return 0, err + } + balanced += b + } - return b.balance(servers, evenDistribution, "consumer") + return balanced, nil }