Skip to content

Commit

Permalink
Avoid notifying consensus engine of the block commit announcements if…
Browse files Browse the repository at this point in the history
… consensus engine has already processed them once
  • Loading branch information
charithabandi committed Feb 21, 2025
1 parent 8cbf9b3 commit f711f1a
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 56 deletions.
3 changes: 2 additions & 1 deletion node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
}

// re-announce
n.ce.NotifyBlockCommit(blk, ci)
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer())
n.ce.NotifyBlockCommit(blk, ci, blkHash)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature
}()
Expand Down
3 changes: 0 additions & 3 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,6 @@ func (n *Node) startAckGossip(ctx context.Context, ps *pubsub.PubSub) error {
continue
}

n.log.Infof("notifying CE of the ACK msg from %s (rcvd from %s), data = %x",
peers.PeerIDStringer(fromPeerID), peers.PeerIDStringer(ackMsg.ReceivedFrom), ack)

go n.ce.NotifyACK(pubkeyBytes, ack)
}
}()
Expand Down
6 changes: 6 additions & 0 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ type StateInfo struct {
// proposed block for the current height
blkProp *blockProposal

// hasBlock indicates that the CE has been notified about block corresponding to the height.
// can be used by the p2p layer to avoid re-sending the same block multiple times
// to the consensus engine.
hasBlock atomic.Int64

lastCommit lastCommit
}

Expand Down Expand Up @@ -346,6 +351,7 @@ func New(cfg *Config) (*ConsensusEngine, error) {
// Not initializing the role here will panic the node, as a lot of RPC calls such as HealthCheck,
// Status, etc. tries to access the role.
ce.role.Store(types.RoleSentry)
ce.stateInfo.hasBlock.Store(0)

// set the node to be in the catchup mode
ce.inSync.Store(true)
Expand Down
18 changes: 9 additions & 9 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commit",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp1.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp1.blk, ci)
val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -315,7 +315,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "commit(InvalidAppHash)",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}})
val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
if val.lastCommitHeight() != 0 {
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -417,7 +417,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -498,7 +498,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down
70 changes: 38 additions & 32 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
// the node will stop receiving any committed blocks from the network and
// is stuck waiting for the votes. The only way to recover the node is to
// restart it, so that it can start
if ce.stateInfo.hasBlock.Load() == height {
ce.log.Infof("CE already been notified about the block, height: %d, blockID: %s", height, blkID)
return false
}

ce.stateInfo.mtx.RLock()
defer ce.stateInfo.mtx.RUnlock()
Expand All @@ -157,53 +161,53 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
}

// check if there are any leader updates in the block header
leader := ce.leader
updatedLeader, leaderUpdated := ci.ParamUpdates[ktypes.ParamNameLeader]
if leaderUpdated {
// accept this new leader only if the commitInfo votes are correctly validated
if err := ce.verifyVotes(ci, blkID); err != nil {
ce.log.Error("Error verifying votes", "error", err)
return false
}

leader = (updatedLeader.(ktypes.PublicKey)).PublicKey
leader := (updatedLeader.(ktypes.PublicKey)).PublicKey

ce.log.Infof("Received block with leader update, new leader: %s old leader: %s", hex.EncodeToString(leader.Bytes()), hex.EncodeToString(ce.leader.Bytes()))
}

return true

// Leader signature verification is not required as long as the commitInfo includes the signatures
// from majority of the validators. There can also scenarios where the node tried to promote a new
// leader candidate, but the candidate did not receive enough votes to be promoted as a leader.
// In such cases, the old leader prodces the block, but this node will not accept the blkAnn message
// from the old leader, as the node has a different leader now. So accept the committed block as
// long as the block is valid, without worrying about who the leader is.

// check if this is the first time we are hearing about this block and not already downloaded it.
blk, _, err := ce.blockStore.Get(blkID)
if err != nil {
if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) {
ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID)
return true // we want it
}
ce.log.Error("Unexpected error getting block from blockstore", "error", err)
return false
}

// no need to worry if we are currently processing a different block, commitBlock will take care of it.
// just ensure that you are processing the block which is for the height after the last committed block.
blkCommit := &blockAnnounce{
ci: ci,
blk: blk,
}

ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID)
go ce.sendConsensusMessage(&consensusMessage{
MsgType: blkCommit.Type(),
Msg: blkCommit,
Sender: leader.Bytes(),
})
// // check if this is the first time we are hearing about this block and not already downloaded it.
// blk, _, err := ce.blockStore.Get(blkID)
// if err != nil {
// if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) {
// ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID)
// return true // we want it
// }
// ce.log.Error("Unexpected error getting block from blockstore", "error", err)
// return false
// }

// // no need to worry if we are currently processing a different block, commitBlock will take care of it.
// // just ensure that you are processing the block which is for the height after the last committed block.
// blkCommit := &blockAnnounce{
// ci: ci,
// blk: blk,
// }

// ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID)
// // Notify only if CE hasn't already acknowledged the block
// go ce.sendConsensusMessage(&consensusMessage{
// MsgType: blkCommit.Type(),
// Msg: blkCommit,
// Sender: leader.Bytes(),
// })

return false
}

// ProcessBlockProposal is used by the validator's consensus engine to process the new block proposal message.
Expand Down Expand Up @@ -367,10 +371,8 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c
return ce.processAndCommit(ctx, blk, ci)
}

if ce.state.blockRes == nil {
// Still processing the block, return and commit when ready.
return nil
}
// The block is already processed, just validate the appHash and commit the block if valid.
ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height)

if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics
haltR := fmt.Sprintf("Incorrect ParamUpdates, halting the node. received: %s, computed: %s", ci.ParamUpdates, ce.state.blockRes.paramUpdates)
Expand Down Expand Up @@ -406,6 +408,10 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo
}

ce.log.Info("Processing committed block", "height", blk.Header.Height, "blockID", blkID, "appHash", ci.AppHash)

// set the hasBlock to the height of the block
ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height)

if err := ce.validateBlock(blk); err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions node/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block) {
}

// NotifyBlockCommit is used by the p2p stream handler to notify the consensus engine of a committed block.
func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo) {
func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) {
leaderU, ok := ci.ParamUpdates[ktypes.ParamNameLeader]
leader := ce.leader

Expand All @@ -137,11 +137,14 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit
ci: ci,
}

go ce.sendConsensusMessage(&consensusMessage{
MsgType: blkCommit.Type(),
Msg: blkCommit,
Sender: leader.Bytes(),
})
// only notify if the leader doesn't already know about the block
if ce.stateInfo.hasBlock.Load() != blk.Header.Height {
go ce.sendConsensusMessage(&consensusMessage{
MsgType: blkCommit.Type(),
Msg: blkCommit,
Sender: leader.Bytes(),
})
}
}

// NotifyACK notifies the consensus engine about the ACK received from the validator.
Expand Down
2 changes: 1 addition & 1 deletion node/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ConsensusEngine interface {
NotifyBlockProposal(blk *ktypes.Block)

AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool
NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo)
NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash)

NotifyACK(validatorPK []byte, ack types.AckRes)

Expand Down
3 changes: 3 additions & 0 deletions node/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mempool

import (
"context"
"fmt"
"io"
"slices"
"sync"
Expand Down Expand Up @@ -184,6 +185,8 @@ func (mp *Mempool) PeekN(n, szLimit int) []types.NamedTx {
}
txns = append(txns, tx)
}

fmt.Printf("PeekN: proposedTxs: %d, remainingTxs: %d", len(txns), len(mp.txQ)-len(txns))
return txns
}

Expand Down
8 changes: 4 additions & 4 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ type dummyCE struct {
rejectACK bool

ackHandler func(validatorPK []byte, ack types.AckRes)
blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo)
blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash)
blockPropHandler func(blk *ktypes.Block)
resetStateHandler func(height int64, txIDs []types.Hash)

Expand All @@ -308,9 +308,9 @@ func (ce *dummyCE) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.Bloc
return !ce.rejectCommit
}

func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo) {
func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) {
if ce.blockCommitHandler != nil {
ce.blockCommitHandler(blk, ci)
ce.blockCommitHandler(blk, ci, blkID)
return
}
}
Expand Down Expand Up @@ -427,7 +427,7 @@ func (f *faker) SetACKHandler(ackHandler func(validatorPK []byte, ack types.AckR
f.ackHandler = ackHandler
}

func (f *faker) SetBlockCommitHandler(blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo)) {
func (f *faker) SetBlockCommitHandler(blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash)) {
f.blockCommitHandler = blockCommitHandler
}

Expand Down

0 comments on commit f711f1a

Please sign in to comment.