From 168b58d89009f5dcbc5fd6c675089ceb2e52c217 Mon Sep 17 00:00:00 2001 From: charithabandi Date: Wed, 19 Feb 2025 11:51:52 -0600 Subject: [PATCH] Avoid notifying consensus engine of the block commit announcements if consensus engine has already processed them once --- node/block.go | 3 +- node/consensus.go | 3 -- node/consensus/engine.go | 6 +++ node/consensus/engine_test.go | 18 ++++----- node/consensus/follower.go | 70 +++++++++++++++++++---------------- node/consensus/messages.go | 15 +++++--- node/interfaces.go | 2 +- node/mempool/mempool.go | 3 ++ node/node_test.go | 8 ++-- 9 files changed, 72 insertions(+), 56 deletions(-) diff --git a/node/block.go b/node/block.go index 80505881a..528a4c396 100644 --- a/node/block.go +++ b/node/block.go @@ -173,7 +173,8 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { } // re-announce - n.ce.NotifyBlockCommit(blk, ci) + n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer()) + n.ce.NotifyBlockCommit(blk, ci, blkHash) go func() { n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature }() diff --git a/node/consensus.go b/node/consensus.go index 735c84028..92907b1b7 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -357,9 +357,6 @@ func (n *Node) startAckGossip(ctx context.Context, ps *pubsub.PubSub) error { continue } - n.log.Infof("notifying CE of the ACK msg from %s (rcvd from %s), data = %x", - peers.PeerIDStringer(fromPeerID), peers.PeerIDStringer(ackMsg.ReceivedFrom), ack) - go n.ce.NotifyACK(pubkeyBytes, ack) } }() diff --git a/node/consensus/engine.go b/node/consensus/engine.go index f8416888d..be1c0a3eb 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -241,6 +241,11 @@ type StateInfo struct { // proposed block for the current height blkProp *blockProposal + // hasBlock indicates that the CE has been notified about block corresponding to the height. + // can be used by the p2p layer to avoid re-sending the same block multiple times + // to the consensus engine. + hasBlock atomic.Int64 + lastCommit lastCommit } @@ -346,6 +351,7 @@ func New(cfg *Config) (*ConsensusEngine, error) { // Not initializing the role here will panic the node, as a lot of RPC calls such as HealthCheck, // Status, etc. tries to access the role. ce.role.Store(types.RoleSentry) + ce.stateInfo.hasBlock.Store(0) // set the node to be in the catchup mode ce.inSync.Store(true) diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 001d68522..1978d1af0 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -289,7 +289,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commit", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp1.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp1.blk, ci) + val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -315,7 +315,7 @@ func TestValidatorStateMachine(t *testing.T) { { name: "commit(InvalidAppHash)", trigger: func(t *testing.T, leader, val *ConsensusEngine) { - val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}) + val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { if val.lastCommitHeight() != 0 { @@ -354,7 +354,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -390,7 +390,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -417,7 +417,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -471,7 +471,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -498,7 +498,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -552,7 +552,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -597,7 +597,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) diff --git a/node/consensus/follower.go b/node/consensus/follower.go index f9dcee14d..e4ddd55fa 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -136,6 +136,10 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty // the node will stop receiving any committed blocks from the network and // is stuck waiting for the votes. The only way to recover the node is to // restart it, so that it can start + if ce.stateInfo.hasBlock.Load() == height { + ce.log.Infof("CE already been notified about the block, height: %d, blockID: %s", height, blkID) + return false + } ce.stateInfo.mtx.RLock() defer ce.stateInfo.mtx.RUnlock() @@ -157,7 +161,6 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty } // check if there are any leader updates in the block header - leader := ce.leader updatedLeader, leaderUpdated := ci.ParamUpdates[ktypes.ParamNameLeader] if leaderUpdated { // accept this new leader only if the commitInfo votes are correctly validated @@ -165,12 +168,13 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty ce.log.Error("Error verifying votes", "error", err) return false } - - leader = (updatedLeader.(ktypes.PublicKey)).PublicKey + leader := (updatedLeader.(ktypes.PublicKey)).PublicKey ce.log.Infof("Received block with leader update, new leader: %s old leader: %s", hex.EncodeToString(leader.Bytes()), hex.EncodeToString(ce.leader.Bytes())) } + return true + // Leader signature verification is not required as long as the commitInfo includes the signatures // from majority of the validators. There can also scenarios where the node tried to promote a new // leader candidate, but the candidate did not receive enough votes to be promoted as a leader. @@ -178,32 +182,32 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty // from the old leader, as the node has a different leader now. So accept the committed block as // long as the block is valid, without worrying about who the leader is. - // check if this is the first time we are hearing about this block and not already downloaded it. - blk, _, err := ce.blockStore.Get(blkID) - if err != nil { - if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) { - ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID) - return true // we want it - } - ce.log.Error("Unexpected error getting block from blockstore", "error", err) - return false - } - - // no need to worry if we are currently processing a different block, commitBlock will take care of it. - // just ensure that you are processing the block which is for the height after the last committed block. - blkCommit := &blockAnnounce{ - ci: ci, - blk: blk, - } - - ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID) - go ce.sendConsensusMessage(&consensusMessage{ - MsgType: blkCommit.Type(), - Msg: blkCommit, - Sender: leader.Bytes(), - }) + // // check if this is the first time we are hearing about this block and not already downloaded it. + // blk, _, err := ce.blockStore.Get(blkID) + // if err != nil { + // if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) { + // ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID) + // return true // we want it + // } + // ce.log.Error("Unexpected error getting block from blockstore", "error", err) + // return false + // } + + // // no need to worry if we are currently processing a different block, commitBlock will take care of it. + // // just ensure that you are processing the block which is for the height after the last committed block. + // blkCommit := &blockAnnounce{ + // ci: ci, + // blk: blk, + // } + + // ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID) + // // Notify only if CE hasn't already acknowledged the block + // go ce.sendConsensusMessage(&consensusMessage{ + // MsgType: blkCommit.Type(), + // Msg: blkCommit, + // Sender: leader.Bytes(), + // }) - return false } // ProcessBlockProposal is used by the validator's consensus engine to process the new block proposal message. @@ -367,10 +371,8 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c return ce.processAndCommit(ctx, blk, ci) } - if ce.state.blockRes == nil { - // Still processing the block, return and commit when ready. - return nil - } + // The block is already processed, just validate the appHash and commit the block if valid. + ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics haltR := fmt.Sprintf("Incorrect ParamUpdates, halting the node. received: %s, computed: %s", ci.ParamUpdates, ce.state.blockRes.paramUpdates) @@ -406,6 +408,10 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo } ce.log.Info("Processing committed block", "height", blk.Header.Height, "blockID", blkID, "appHash", ci.AppHash) + + // set the hasBlock to the height of the block + ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) + if err := ce.validateBlock(blk); err != nil { return err } diff --git a/node/consensus/messages.go b/node/consensus/messages.go index e63e200fb..7d19c94bf 100644 --- a/node/consensus/messages.go +++ b/node/consensus/messages.go @@ -119,7 +119,7 @@ func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block) { } // NotifyBlockCommit is used by the p2p stream handler to notify the consensus engine of a committed block. -func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo) { +func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { leaderU, ok := ci.ParamUpdates[ktypes.ParamNameLeader] leader := ce.leader @@ -137,11 +137,14 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit ci: ci, } - go ce.sendConsensusMessage(&consensusMessage{ - MsgType: blkCommit.Type(), - Msg: blkCommit, - Sender: leader.Bytes(), - }) + // only notify if the leader doesn't already know about the block + if ce.stateInfo.hasBlock.Load() != blk.Header.Height { + go ce.sendConsensusMessage(&consensusMessage{ + MsgType: blkCommit.Type(), + Msg: blkCommit, + Sender: leader.Bytes(), + }) + } } // NotifyACK notifies the consensus engine about the ACK received from the validator. diff --git a/node/interfaces.go b/node/interfaces.go index 97b65319a..796de3e08 100644 --- a/node/interfaces.go +++ b/node/interfaces.go @@ -21,7 +21,7 @@ type ConsensusEngine interface { NotifyBlockProposal(blk *ktypes.Block) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool - NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo) + NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) NotifyACK(validatorPK []byte, ack types.AckRes) diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index adc442140..087f48a09 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -2,6 +2,7 @@ package mempool import ( "context" + "fmt" "io" "slices" "sync" @@ -184,6 +185,8 @@ func (mp *Mempool) PeekN(n, szLimit int) []types.NamedTx { } txns = append(txns, tx) } + + fmt.Printf("PeekN: proposedTxs: %d, remainingTxs: %d", len(txns), len(mp.txQ)-len(txns)) return txns } diff --git a/node/node_test.go b/node/node_test.go index beebc6a4a..161abb49c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -285,7 +285,7 @@ type dummyCE struct { rejectACK bool ackHandler func(validatorPK []byte, ack types.AckRes) - blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo) + blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) blockPropHandler func(blk *ktypes.Block) resetStateHandler func(height int64, txIDs []types.Hash) @@ -308,9 +308,9 @@ func (ce *dummyCE) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.Bloc return !ce.rejectCommit } -func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo) { +func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { if ce.blockCommitHandler != nil { - ce.blockCommitHandler(blk, ci) + ce.blockCommitHandler(blk, ci, blkID) return } } @@ -427,7 +427,7 @@ func (f *faker) SetACKHandler(ackHandler func(validatorPK []byte, ack types.AckR f.ackHandler = ackHandler } -func (f *faker) SetBlockCommitHandler(blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo)) { +func (f *faker) SetBlockCommitHandler(blockCommitHandler func(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash)) { f.blockCommitHandler = blockCommitHandler }