Skip to content

Commit

Permalink
fix: gossip into channels instead of callbacks (#1159)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Gossip uses callbacks. A callback approach adds extra code we don't
need if we use channels. This is a refactoring to change it to channels.
Each commit is standalone.

## Short description of the changes

- [x] Split the gossip interface file from the inmem implementation
- [x] Add a SubscribeChan function that lives beside the Subscribe
function
- [x] Add test code for chan version
- [x] Convert stress relief to use the channel model
- [x] Remove callback code
- [x] Rename remaining functions to simpler names again

My desire is to merge this before #1158 and then fix that one (which
does a bit more gossiping) to use channels.
  • Loading branch information
kentquirk authored May 20, 2024
1 parent 02738c8 commit e782204
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 139 deletions.
90 changes: 44 additions & 46 deletions collect/stressRelief/stress_relief_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type StressRelief struct {
stayOnUntil time.Time
minDuration time.Duration
identification string
stressGossipCh chan []byte

eg *errgroup.Group

Expand Down Expand Up @@ -104,46 +105,59 @@ func (s *StressRelief) Start() error {
s.stressLevels = make(map[string]stressReport)
s.done = make(chan struct{})

s.eg = &errgroup.Group{}

s.Health.Register(stressReliefHealthSource, 5*calculationInterval)

s.RefineryMetrics.Register("cluster_stress_level", "gauge")
s.RefineryMetrics.Register("individual_stress_level", "gauge")
s.RefineryMetrics.Register("stress_relief_activated", "gauge")

if err := s.Gossip.Subscribe("stress_level", s.onStressLevelMessage); err != nil {
return err
}
s.stressGossipCh = s.Gossip.Subscribe("stress_level", 20)
s.eg = &errgroup.Group{}
s.eg.Go(s.monitor)

return nil
}

// start our monitor goroutine that periodically calls recalc
s.eg.Go(func() error {
tick := time.NewTicker(calculationInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
currentLevel := s.Recalc()
// publish the stress level to the rest of the cluster
msg := stressLevelMessage{
level: currentLevel,
id: s.identification,
}
err := s.Gossip.Publish("stress_level", msg.ToBytes())
if err != nil {
s.Logger.Error().Logf("error publishing stress level: %s", err)
} else {
s.Health.Ready(stressReliefHealthSource, true)
}
case <-s.done:
s.Logger.Debug().Logf("Stopping StressRelief system")
return nil
func (s *StressRelief) monitor() error {
tick := time.NewTicker(calculationInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
currentLevel := s.Recalc()
// publish the stress level to the rest of the cluster
msg := stressLevelMessage{
level: currentLevel,
id: s.identification,
}
err := s.Gossip.Publish("stress_level", msg.ToBytes())
if err != nil {
s.Logger.Error().Logf("error publishing stress level: %s", err)
} else {
s.Health.Ready(stressReliefHealthSource, true)
}
}

})
case data := <-s.stressGossipCh:
msg, err := newMessageFromBytes(data)
if err != nil {
s.Logger.Error().Logf("error parsing stress level message: %s", err)
continue
}

s.lock.Lock()
s.stressLevels[msg.id] = stressReport{
key: msg.id,
level: msg.level,
timestamp: s.Clock.Now(),
}
s.lock.Unlock()

case <-s.done:
s.Logger.Debug().Logf("Stopping StressRelief system")
return nil
}
}

return nil
}

func (s *StressRelief) Stop() error {
Expand Down Expand Up @@ -311,22 +325,6 @@ func (s *StressRelief) deterministicFraction() uint {
return uint(float64(s.overallStressLevel-s.activateLevel)/float64(100-s.activateLevel)*100 + 0.5)
}

func (s *StressRelief) onStressLevelMessage(data []byte) {
msg, err := newMessageFromBytes(data)
if err != nil {
s.Logger.Error().Logf("error parsing stress level message: %s", err)
return
}

s.lock.Lock()
s.stressLevels[msg.id] = stressReport{
key: msg.id,
level: msg.level,
timestamp: s.Clock.Now(),
}
s.lock.Unlock()
}

type stressReport struct {
key string
level uint
Expand Down
75 changes: 4 additions & 71 deletions internal/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package gossip

import (
"bytes"
"errors"

"github.com/facebookgo/startstop"
"golang.org/x/sync/errgroup"
)

// Gossiper is an interface for broadcasting messages to all receivers
Expand All @@ -14,80 +12,15 @@ type Gossiper interface {
// Publish sends a message to all peers listening on the channel
Publish(channel string, value []byte) error

// Subscribe listens for messages on the channel
Subscribe(channel string, callback func(data []byte)) error
// Subscribe returns a Go channel that will receive messages from the Gossip channel
// (Redis already called the thing we listen to a channel, so we have to live with that)
// The channel has a buffer of depth; if the buffer is full, messages will be dropped.
Subscribe(channel string, depth int) chan []byte

startstop.Starter
startstop.Stopper
}

var _ Gossiper = &InMemoryGossip{}

// InMemoryGossip is a Gossiper that uses an in-memory channel
type InMemoryGossip struct {
channel chan []byte
subscribers map[string][]func(data []byte)

done chan struct{}
eg *errgroup.Group
}

func (g *InMemoryGossip) Publish(channel string, value []byte) error {
msg := message{
key: channel,
data: value,
}

select {
case <-g.done:
return errors.New("gossip has been stopped")
case g.channel <- msg.ToBytes():
default:
}
return nil
}

func (g *InMemoryGossip) Subscribe(channel string, callback func(data []byte)) error {
select {
case <-g.done:
return errors.New("gossip has been stopped")
default:
}

g.subscribers[channel] = append(g.subscribers[channel], callback)
return nil
}

func (g *InMemoryGossip) Start() error {
g.channel = make(chan []byte, 10)
g.eg = &errgroup.Group{}
g.subscribers = make(map[string][]func(data []byte))
g.done = make(chan struct{})

g.eg.Go(func() error {
for {
select {
case <-g.done:
return nil
case value := <-g.channel:
msg := newMessageFromBytes(value)
callbacks := g.subscribers[msg.key]

for _, cb := range callbacks {
cb(msg.data)
}
}
}
})

return nil
}
func (g *InMemoryGossip) Stop() error {
close(g.done)
close(g.channel)
return g.eg.Wait()
}

type message struct {
key string
data []byte
Expand Down
94 changes: 94 additions & 0 deletions internal/gossip/gossip_inmem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package gossip

import (
"errors"
"sync"

"github.com/honeycombio/refinery/logger"
"golang.org/x/sync/errgroup"
)

// InMemoryGossip is a Gossiper that uses an in-memory channel
type InMemoryGossip struct {
Logger logger.Logger `inject:""`
gossipCh chan []byte
subscriptions map[string][]chan []byte

done chan struct{}
mut sync.RWMutex
eg *errgroup.Group
}

var _ Gossiper = &InMemoryGossip{}

func (g *InMemoryGossip) Publish(channel string, value []byte) error {
msg := message{
key: channel,
data: value,
}

select {
case <-g.done:
return errors.New("gossip has been stopped")
case g.gossipCh <- msg.ToBytes():
default:
g.Logger.Warn().WithFields(map[string]interface{}{
"channel": channel,
"msg": string(value),
}).Logf("Unable to publish message")
}
return nil
}

func (g *InMemoryGossip) Subscribe(channel string, depth int) chan []byte {
select {
case <-g.done:
return nil
default:
}

ch := make(chan []byte, depth)
g.mut.Lock()
g.subscriptions[channel] = append(g.subscriptions[channel], ch)
g.mut.Unlock()

return ch
}

func (g *InMemoryGossip) Start() error {
g.gossipCh = make(chan []byte, 10)
g.eg = &errgroup.Group{}
g.subscriptions = make(map[string][]chan []byte)
g.done = make(chan struct{})

g.eg.Go(func() error {
for {
select {
case <-g.done:
return nil
case value := <-g.gossipCh:
msg := newMessageFromBytes(value)
g.mut.RLock()
for _, ch := range g.subscriptions[msg.key] {
select {
case ch <- msg.data:
default:
g.Logger.Warn().WithFields(map[string]interface{}{
"channel": msg.key,
"msg": string(msg.data),
}).Logf("Unable to forward message")
}
}
g.mut.RUnlock()
}
}
})

return nil
}

func (g *InMemoryGossip) Stop() error {
close(g.done)
close(g.gossipCh)
return g.eg.Wait()
}
37 changes: 23 additions & 14 deletions internal/gossip/gossip_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type GossipRedis struct {
Logger logger.Logger `inject:""`
eg *errgroup.Group

lock sync.RWMutex
subscribers map[string][]func(data []byte)
done chan struct{}
lock sync.RWMutex
subscriptions map[string][]chan []byte
done chan struct{}

startstop.Stopper
}
Expand All @@ -35,7 +35,7 @@ type GossipRedis struct {
func (g *GossipRedis) Start() error {
g.eg = &errgroup.Group{}
g.done = make(chan struct{})
g.subscribers = make(map[string][]func(data []byte))
g.subscriptions = make(map[string][]chan []byte)

g.eg.Go(func() error {
for {
Expand All @@ -46,14 +46,22 @@ func (g *GossipRedis) Start() error {
err := g.Redis.ListenPubSubChannels(nil, func(channel string, b []byte) {
msg := newMessageFromBytes(b)
g.lock.RLock()
callbacks := g.subscribers[msg.key]
chans := g.subscriptions[msg.key]
g.lock.RUnlock()
for _, cb := range callbacks {
cb(msg.data)
// we never block on sending to a subscriber; if it's full, we drop the message
for _, ch := range chans {
select {
case ch <- msg.data:
default:
g.Logger.Warn().WithFields(map[string]interface{}{
"channel": msg.key,
"msg": string(msg.data),
}).Logf("Unable to forward message")
}
}
}, g.done, "refinery-gossip")
if err != nil {
g.Logger.Debug().Logf("Error listening to refinery-gossip channel: %v", err)
g.Logger.Warn().Logf("Error listening to refinery-gossip channel: %v", err)
}
}
}
Expand All @@ -68,20 +76,21 @@ func (g *GossipRedis) Stop() error {
return g.eg.Wait()
}

// Subscribe registers a callback for a given channel.
func (g *GossipRedis) Subscribe(channel string, cb func(data []byte)) error {
// Subscribe returns a channel that will receive messages from the Gossip channel.
// The channel has a buffer of depth; if the buffer is full, messages will be dropped.
func (g *GossipRedis) Subscribe(channel string, depth int) chan []byte {
select {
case <-g.done:
return errors.New("gossip has been stopped")
return nil
default:
}

ch := make(chan []byte, depth)
g.lock.Lock()
defer g.lock.Unlock()
g.subscribers[channel] = append(g.subscribers[channel], cb)

return nil
g.subscriptions[channel] = append(g.subscriptions[channel], ch)

return ch
}

// Publish sends a message to all subscribers of a given channel.
Expand Down
Loading

0 comments on commit e782204

Please sign in to comment.