Skip to content

Commit

Permalink
Do not block indefinitely on the semaphore (backport cometbft#1654) (c…
Browse files Browse the repository at this point in the history
…ometbft#1689)

* Do not block indefinitely on the semaphore (cometbft#1654)

* Do not block indefinitely on the semaphore

* Cancel the context, irrespective of the flow followed

* Makes the code more readable

* Improving comment

* make linter happy

* Updating comments to match

* Commenting out `select` and leaving it as TODO for when Contexts are more widely used

* Cleaned up comments

(cherry picked from commit 2679498)

# Conflicts:
#	config/config.go
#	config/toml.go
#	test/e2e/pkg/manifest.go

* Backports cometbft#1558 and cometbft#1584 to 0.38.x (cometbft#1592)

* Experimental - Reduce # of connections effectively used to gossip transactions out (cometbft#1558)

* maxpeers for mempool

* mempool: fix max_peers bcast routine active flag

* Use semaphore to limit concurrency

* Rename MaxPeers to MaxOutboundPeers

* Add max_outbound_peers to config toml template

* Rename in error message

* Renams the parameter to highlight its experimental nature. Extend the AddPeer method to return an error. Moves the semaphone to outside the broadcast routine

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* Fixing lint issue

* renaming semaphore to something more meaningful

* make default value 0, which is the same as the current behavior. 10 is the recommended value.

* adding new flag to manifest.go

* Adding changelog

* Improve the description of the parameter in the generated config file.

* Add metric to track the current number of active connections.

* Change metric to gauge type and rename it.

* e2e: Allow disabling the PEX reactor on all nodes in the testnet

* Apply suggestions from code review

Co-authored-by: Sergio Mena <sergio@informal.systems>

* Update config/config.go comment

* fix lint error

* Improve config description

* Rename metric (remove experimental prefix)

* Add unit test

* Improve unit test

* Update mempool/reactor.go comment

---------

Co-authored-by: Ethan Buchman <ethan@coinculture.info>
Co-authored-by: Daniel Cason <daniel.cason@informal.systems>
Co-authored-by: lasarojc <lasaro@informal.systems>
Co-authored-by: hvanz <hernan.vanzetto@gmail.com>
Co-authored-by: Andy Nogueira <me@andynogueira.dev>
Co-authored-by: Sergio Mena <sergio@informal.systems>

* Updating test file, leaving it broken for now

* mempool: Limit gossip connections to persistent and non-persistent peers (experimental) (cometbft#1584)

* Ignore persistent peers from limiting of outbound connections

* Update 1558-experimental-gossip-limiting.md

Update changeling

* Fix typo in mempool/metrics.go

* Use two independent configs and semaphores for persistent and non-persistent peers

* Forgot to rename in test

* Update metric description

* Rename semaphores

* Add comment to unit test

---------

Co-authored-by: hvanz <hernan.vanzetto@gmail.com>

* Reverting to old way of reporting errors

* Reverting change that shouldn't have been included in cherry-pick

* Reverting tests to use older functions

* fix rebase merge

---------

Co-authored-by: Adi Seredinschi <adizere@gmail.com>
Co-authored-by: Ethan Buchman <ethan@coinculture.info>
Co-authored-by: Daniel Cason <daniel.cason@informal.systems>
Co-authored-by: hvanz <hernan.vanzetto@gmail.com>
Co-authored-by: Andy Nogueira <me@andynogueira.dev>
Co-authored-by: Sergio Mena <sergio@informal.systems>

* Fixes conflict.

---------

Co-authored-by: lasaro <lasaro@informal.systems>
Co-authored-by: Adi Seredinschi <adizere@gmail.com>
Co-authored-by: Ethan Buchman <ethan@coinculture.info>
Co-authored-by: Daniel Cason <daniel.cason@informal.systems>
Co-authored-by: hvanz <hernan.vanzetto@gmail.com>
Co-authored-by: Andy Nogueira <me@andynogueira.dev>
Co-authored-by: Sergio Mena <sergio@informal.systems>
  • Loading branch information
8 people authored Nov 24, 2023
1 parent 656c1e4 commit 7dae514
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 27 deletions.
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,13 @@ type MempoolConfig struct {
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// This feature is only available for the default mempool (version config set to "v0").
// We use two independent upper values for persistent peers and for non-persistent peers.
// We use two independent upper values for persistent and non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer.
// Similarly for non-persistent peers, with an upper limit of
// ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
9 changes: 5 additions & 4 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,13 @@ ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# This feature is only available for the default mempool (version config set to "v0").
# We use two independent upper values for persistent peers and for non-persistent peers.
# We use two independent upper values for persistent and non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
# persistent peers disconnects, activate another persistent peer.
# Similarly for non-persistent peers, with an upper limit of
# ExperimentalMaxGossipConnectionsToNonPersistentPeers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
41 changes: 23 additions & 18 deletions mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,37 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
// Depending on the type of peer, we choose a semaphore to limit the gossiping peers.
var peerSemaphore *semaphore.Weighted
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
peerSemaphore = memR.activePersistentPeersSemaphore
} else if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
peerSemaphore = memR.activeNonPersistentPeersSemaphore
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
if peerSemaphore != nil {
for peer.IsRunning() {
// Block on the semaphore until a slot is available to start gossiping with this peer.
// Do not block indefinitely, in case the peer is disconnected before gossiping starts.
ctxTimeout, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
err := peerSemaphore.Acquire(ctxTimeout, 1)
cancel()

if err != nil {
continue
}

// Release semaphore to allow other peer to start sending transactions.
defer peerSemaphore.Release(1)
break
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
memR.broadcastTxRoutine(peer)
}()
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Manifest struct {
// Defaults to false (disabled).
Prometheus bool `toml:"prometheus"`

// Maximum number of peers to which the node gossip transactions
// Maximum number of peers to which the node gossips transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`
}
Expand Down

0 comments on commit 7dae514

Please sign in to comment.