From 80c7cfc8587454b53cb435b1fa41455766df1e96 Mon Sep 17 00:00:00 2001 From: charithabandi Date: Wed, 19 Feb 2025 14:41:57 -0600 Subject: [PATCH] Give CE access to release block fetcher lock upon commit --- app/node/build.go | 2 +- node/block.go | 72 +++++++++++++++++++++------------- node/consensus/block.go | 21 +++++++--- node/consensus/blocksync.go | 14 +++---- node/consensus/engine.go | 8 ++-- node/consensus/engine_test.go | 18 ++++----- node/consensus/follower.go | 74 ++++++++++++----------------------- node/consensus/messages.go | 18 ++++++--- node/interfaces.go | 2 +- node/mempool/mempool.go | 3 -- node/node_test.go | 18 +++++---- node/nogossip.go | 4 +- node/types/interfaces.go | 2 + 13 files changed, 135 insertions(+), 121 deletions(-) diff --git a/app/node/build.go b/app/node/build.go index 194968b23..8fcec2dc1 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -533,7 +533,7 @@ func buildNode(d *coreDependencies, mp *mempool.Mempool, bs *store.BlockStore, failBuild(err, "failed to create node") } - logger.Infof("This node is %s @ %s", node.ID(), node.Addrs()) + logger.Infof("This node is %s @ %s, peerID: %s", node.ID(), node.Addrs(), p2p.Host().ID()) return node } diff --git a/node/block.go b/node/block.go index 528a4c396..f4fc8feed 100644 --- a/node/block.go +++ b/node/block.go @@ -115,6 +115,9 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { // If we are a validator and this is the commit ann for a proposed block // that we already started executing, consensus engine will handle it. if !n.ce.AcceptCommit(height, blkHash, hdr, ci, sig) { + // this either means that the ce already has the block or it is not + // ready to accept it yet. In either case, we don't need to do anything + // here. return } @@ -122,7 +125,6 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { // code like a sentry node might do. need, done := n.bki.PreFetch(blkHash) - defer done() if !need { n.log.Debug("ALREADY HAVE OR FETCHING BLOCK") return // we have or are currently fetching it, do nothing, assuming we have already re-announced @@ -131,28 +133,38 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { n.log.Debug("retrieving new block", "blockID", blkid) t0 := time.Now() + peerID := s.Conn().RemotePeer() + // First try to get from this stream. rawBlk, err := request(s, []byte(getMsg), blkReadLimit) if err != nil { - n.log.Warnf("announcer failed to provide %v, trying other peers", blkid) + n.log.Warnf("announcer failed to provide %v due to error: %v, trying other peers", blkid, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first var gotHeight int64 var gotCI *types.CommitInfo - - gotHeight, rawBlk, gotCI, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10) + var id peer.ID + gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10) if err != nil { n.log.Errorf("unable to retrieve tx %v: %v", blkid, err) + done() return } if gotHeight != height { n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight) + done() return } if gotCI != nil && gotCI.AppHash != ci.AppHash { n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash) + done() return } + // Ensure that the peerID from which the block was downloaded is a valid one. + if id != "" { + n.log.Errorf("getblk response had unexpected peerID: %v", id) + } + peerID = id } n.log.Debugf("obtained content for block %q in %v", blkid, time.Since(t0)) @@ -160,23 +172,26 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { blk, err := ktypes.DecodeBlock(rawBlk) if err != nil { n.log.Infof("decodeBlock failed for %v: %v", blkid, err) + done() return } if blk.Header.Height != height { n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height) + done() return } gotBlkHash := blk.Header.Hash() if gotBlkHash != blkHash { n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash) + done() return } // re-announce - n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer()) - n.ce.NotifyBlockCommit(blk, ci, blkHash) + n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID) + n.ce.NotifyBlockCommit(blk, ci, blkHash, done) go func() { - n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature + n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature }() } @@ -200,6 +215,7 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in if peerID == from { continue } + n.log.Debugf("advertising block %s (height %d / sz %d / updates %v) to peer %v", blkHash, height, len(rawBlk), ci.ParamUpdates, peerID) resID, err := blockAnnMsg{ @@ -224,12 +240,12 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in } func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDelay time.Duration, - maxAttempts int) (int64, []byte, *types.CommitInfo, error) { + maxAttempts int) (int64, []byte, *types.CommitInfo, peer.ID, error) { var attempts int for { - height, raw, ci, err := n.getBlk(ctx, blkHash) + height, raw, ci, peer, err := n.getBlk(ctx, blkHash) if err == nil { - return height, raw, ci, nil + return height, raw, ci, peer, nil } n.log.Warnf("unable to retrieve block %v (%v), waiting to retry", blkHash, err) @@ -241,14 +257,13 @@ func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDela baseDelay *= 2 attempts++ if attempts >= maxAttempts { - return 0, nil, nil, ErrBlkNotFound + return 0, nil, nil, "", ErrBlkNotFound } } } -func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, error) { +func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, peer.ID, error) { for _, peer := range n.peers() { - n.log.Infof("requesting block %v from %v", blkHash, peer) t0 := time.Now() resID, _ := blockHashReq{Hash: blkHash}.MarshalBinary() resp, err := requestFrom(ctx, n.host, peer, resID, ProtocolIDBlock, blkReadLimit) @@ -269,7 +284,6 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, * n.log.Info("block response too short", "peer", peer, "hash", blkHash) continue } - n.log.Debug("Obtained content for block", "block", blkHash, "elapsed", time.Since(t0)) rd := bytes.NewReader(resp) @@ -297,9 +311,9 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, * continue } - return height, rawBlk, &ci, nil + return height, rawBlk, &ci, peer, nil } - return 0, nil, nil, ErrBlkNotFound + return 0, nil, nil, "", ErrBlkNotFound } func requestBlockHeight(ctx context.Context, host host.Host, peer peer.ID, @@ -397,23 +411,27 @@ func (n *Node) getBlkHeight(ctx context.Context, height int64) (types.Hash, []by } func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Logger) (types.Hash, []byte, *types.CommitInfo, int64, error) { - peers := peerHosts(host) - if len(peers) == 0 { - return types.Hash{}, nil, nil, 0, errors.New("no peers to request block from") + availablePeers := peerHosts(host) + if len(availablePeers) == 0 { + return types.Hash{}, nil, nil, 0, types.ErrPeersNotFound } - // min: 1, max: 10 - cnt := max(len(peers)/5, 1) // 20% of peers // TODO: Not sure what the correct percentage is here + cnt := max(len(availablePeers)/5, 1) // 20% of peers + availablePeers = availablePeers[:cnt] + // incremented when a peer's best height is one less than the requested height + // to help determine if the block has not been committed yet and stop + // requesting the block from other peers if enough peers indicate that the + // block is not available. bestHCnt := 0 - peers = peers[:cnt] var bestHeight int64 - for _, peer := range peers { - if bestHCnt == 5 { // TODO: Not sure what the correct break condition is here - // essentially, we are trying to break from the loop if the blk is not made yet. + for _, peer := range availablePeers { + if bestHCnt == 5 { + // stop requesting the block if there is an indication that + // the block is not made yet. break } - log.Infof("requesting block number %d from %v", height, peer) + t0 := time.Now() resp, err := requestBlockHeight(ctx, host, peer, height, blkReadLimit) if errors.Is(err, ErrNotFound) || errors.Is(err, ErrBlkNotFound) { @@ -426,7 +444,7 @@ func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Log if theirBest == height-1 { bestHCnt++ } - log.Infof("block %d not found on peer; their best height is %d", height, theirBest) + log.Infof("block %d not found on peer %s; their best height is %d", height, peer, theirBest) } else { log.Warnf("block not available on %v", peer) } diff --git a/node/consensus/block.go b/node/consensus/block.go index 13c4c5321..b14f7a5af 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -63,7 +63,8 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error { return fmt.Errorf("block size %d exceeds max block size %d", blockTxnsSize, maxBlockSize) } - // maxVotesPerTx + // Ensure that the number of event and resolution IDs within validator vote transactions votes + // per transaction does not exceed the max consensus limit. maxVotesPerTx := ce.ConsensusParams().MaxVotesPerTx for _, txn := range blk.Txns { if txn.Body.PayloadType == ktypes.PayloadTypeValidatorVoteBodies { @@ -133,11 +134,15 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transacti txHash := types.HashBytes(rawTx) // add the transaction to the mempool - ce.mempool.Store(txHash, tx) + have, rejected := ce.mempool.Store(txHash, tx) + if rejected { + return &ktypes.ResultBroadcastTx{ + Hash: txHash, + }, types.ErrMempoolFull + } - // Announce the transaction to the network - if ce.txAnnouncer != nil { - ce.log.Debugf("broadcasting new tx %v", txHash) + // Announce the transaction to the network only if not previously announced + if ce.txAnnouncer != nil && !have { // We can't use parent context 'cause it's canceled in the caller, which // could be the RPC request. handler. This shouldn't be CE's problem... go ce.txAnnouncer(context.Background(), txHash, rawTx) @@ -260,7 +265,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { ce.mempool.RecheckTxs(ctx, ce.recheckTx) ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), - "appHash", appHash.String(), "updates", ce.state.blockRes.paramUpdates) + "appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns) // update and reset the state fields ce.nextState() @@ -297,6 +302,8 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error { } ce.resetState() + ce.stateInfo.hasBlock.Store(ce.state.lc.height) + return nil } @@ -318,6 +325,8 @@ func (ce *ConsensusEngine) resetState() { ce.stateInfo.lastCommit = *ce.state.lc ce.stateInfo.mtx.Unlock() + ce.stateInfo.hasBlock.Store(ce.state.lc.height) + ce.cancelFnMtx.Lock() ce.blkExecCancelFn = nil ce.longRunningTxs = make([]ktypes.Hash, 0) diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index 55aab63e3..2f47e9ec3 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -96,7 +96,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context, requester if errors.Is(err, context.Canceled) { return err } - if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) { + if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrPeersNotFound) { break // no peers have this block, assume block sync is complete, continue with consensus } ce.log.Warn("unexpected error requesting block from the network", "height", height, "error", err) @@ -130,25 +130,25 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh // syncBlockWithRetry fetches the specified block from the network and keeps retrying until // the block is successfully retrieved from the network. func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error { - _, rawBlk, ci, err := ce.getBlock(ctx, height) + blkID, rawBlk, ci, err := ce.getBlock(ctx, height) if err != nil { return fmt.Errorf("failed to get block from the network: %w", err) } - return ce.applyBlock(ctx, rawBlk, ci) + return ce.applyBlock(ctx, rawBlk, ci, blkID) } // syncBlock fetches the specified block from the network func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error { - _, rawblk, ci, _, err := ce.blkRequester(ctx, height) + blkID, rawblk, ci, _, err := ce.blkRequester(ctx, height) if err != nil { return fmt.Errorf("failed to get block from the network: %w", err) } - return ce.applyBlock(ctx, rawblk, ci) + return ce.applyBlock(ctx, rawblk, ci, blkID) } -func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo) error { +func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo, blkID types.Hash) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() @@ -157,7 +157,7 @@ func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ty return fmt.Errorf("failed to decode block: %w", err) } - if err := ce.processAndCommit(ctx, blk, ci); err != nil { + if err := ce.processAndCommit(ctx, blk, ci, blkID); err != nil { return fmt.Errorf("failed to apply block: %w", err) } diff --git a/node/consensus/engine.go b/node/consensus/engine.go index be1c0a3eb..16bc2316f 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons case *blockAnnounce: preRole := ce.role.Load() - if err := ce.commitBlock(ctx, v.blk, v.ci); err != nil { + if err := ce.commitBlock(ctx, v.blk, v.ci, v.blkID, v.done); err != nil { ce.log.Error("Error processing committed block announcement", "error", err) return } @@ -809,6 +809,8 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, appHash []byte, blk * blkHash: blkHash, } copy(ce.stateInfo.lastCommit.appHash[:], appHash) + + ce.stateInfo.hasBlock.Store(height) } // replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet. @@ -832,7 +834,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight return nil // no more blocks to replay } - err = ce.processAndCommit(ctx, blk, ci) + err = ce.processAndCommit(ctx, blk, ci, blk.Hash()) if err != nil { return fmt.Errorf("failed replaying block: %w", err) } @@ -955,7 +957,7 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error { return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } - if err := ce.processAndCommit(ctx, blk, ci); err != nil { + if err := ce.processAndCommit(ctx, blk, ci, blkHash); err != nil { return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } // recovered to the correct block -> continue to replay blocks from network diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 1978d1af0..f5fd3c802 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -289,7 +289,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commit", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp1.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash) + val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -315,7 +315,7 @@ func TestValidatorStateMachine(t *testing.T) { { name: "commit(InvalidAppHash)", trigger: func(t *testing.T, leader, val *ConsensusEngine) { - val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash) + val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { if val.lastCommitHeight() != 0 { @@ -354,7 +354,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -390,7 +390,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -417,7 +417,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -471,7 +471,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -498,7 +498,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -552,7 +552,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -597,7 +597,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) diff --git a/node/consensus/follower.go b/node/consensus/follower.go index e4ddd55fa..77406ea0e 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -132,12 +132,7 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types // 1. If the node is a sentry node and doesn't have the block. // 2. If the node is a validator and missed the block proposal message. func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool { - // NOTE: If not enough validators have agreed to promote the node as a leader, - // the node will stop receiving any committed blocks from the network and - // is stuck waiting for the votes. The only way to recover the node is to - // restart it, so that it can start if ce.stateInfo.hasBlock.Load() == height { - ce.log.Infof("CE already been notified about the block, height: %d, blockID: %s", height, blkID) return false } @@ -173,41 +168,14 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty ce.log.Infof("Received block with leader update, new leader: %s old leader: %s", hex.EncodeToString(leader.Bytes()), hex.EncodeToString(ce.leader.Bytes())) } - return true - // Leader signature verification is not required as long as the commitInfo includes the signatures // from majority of the validators. There can also scenarios where the node tried to promote a new // leader candidate, but the candidate did not receive enough votes to be promoted as a leader. - // In such cases, the old leader prodces the block, but this node will not accept the blkAnn message + // In such cases, the old leader produces the block, but this node will not accept the blkAnn message // from the old leader, as the node has a different leader now. So accept the committed block as - // long as the block is valid, without worrying about who the leader is. - - // // check if this is the first time we are hearing about this block and not already downloaded it. - // blk, _, err := ce.blockStore.Get(blkID) - // if err != nil { - // if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) { - // ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID) - // return true // we want it - // } - // ce.log.Error("Unexpected error getting block from blockstore", "error", err) - // return false - // } - - // // no need to worry if we are currently processing a different block, commitBlock will take care of it. - // // just ensure that you are processing the block which is for the height after the last committed block. - // blkCommit := &blockAnnounce{ - // ci: ci, - // blk: blk, - // } - - // ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID) - // // Notify only if CE hasn't already acknowledged the block - // go ce.sendConsensusMessage(&consensusMessage{ - // MsgType: blkCommit.Type(), - // Msg: blkCommit, - // Sender: leader.Bytes(), - // }) + // long as the block is accepted by the majority of the validators. + return true } // ProcessBlockProposal is used by the validator's consensus engine to process the new block proposal message. @@ -333,11 +301,16 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg // If the validator node processed a different block, it should rollback and reprocess the block. // Validator nodes can skip the block execution and directly commit the block if they have already processed the block. // The nodes should only commit the block if the appHash is valid, else halt the node. -func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo) error { +func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() - ce.log.Info("In CommitBlock", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "ceLCHeight", ce.state.lc.height, "ceprop", ce.state.blkProp, "ceBlkRes", ce.state.blockRes) + defer func() { + if doneFn != nil && ce.state.lc.height == blk.Header.Height { + // Block has been committed, release the prefetch lock on the block + doneFn() + } + }() if ce.state.lc.height+1 != blk.Header.Height { // only accept/process the block if it is for the next height return nil @@ -351,28 +324,31 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c // - Incorrect AppHash: Halt the node. if ce.role.Load() == types.RoleSentry { - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } // You are a validator if ce.state.blkProp == nil { - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } - if ce.state.blkProp.blkHash != blk.Header.Hash() { - ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "processedBlockID", ce.state.blkProp.blkHash) + if ce.state.blkProp.blkHash != blkID { + ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blkID, "processedBlockID", ce.state.blkProp.blkHash) if err := ce.rollbackState(ctx); err != nil { - ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "error", err) + ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blkID, "error", err) // that's ok??? return fmt.Errorf("error aborting execution of incorrect block proposal: %w", err) } - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } // The block is already processed, just validate the appHash and commit the block if valid. - ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) + oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height) + if oldH != ce.state.lc.height { + return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blkID) + } if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics haltR := fmt.Sprintf("Incorrect ParamUpdates, halting the node. received: %s, computed: %s", ci.ParamUpdates, ce.state.blockRes.paramUpdates) @@ -401,8 +377,7 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c // processAndCommit: used by the sentry nodes and slow validators to process and commit the block. // This is used when the acks are not required to be sent back to the leader, essentially in catchup mode. -func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo) error { - blkID := blk.Header.Hash() +func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) error { if ci == nil { return fmt.Errorf("commitInfo is nil") } @@ -410,7 +385,10 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo ce.log.Info("Processing committed block", "height", blk.Header.Height, "blockID", blkID, "appHash", ci.AppHash) // set the hasBlock to the height of the block - ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) + oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height) + if oldH != ce.state.lc.height { + return fmt.Errorf("block %d already processed, duplicate block announcement received %s", oldH, blkID) + } if err := ce.validateBlock(blk); err != nil { return err @@ -429,7 +407,7 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo ce.state.blkProp = &blockProposal{ height: blk.Header.Height, - blkHash: blk.Header.Hash(), + blkHash: blkID, blk: blk, } diff --git a/node/consensus/messages.go b/node/consensus/messages.go index 7d19c94bf..678779ddd 100644 --- a/node/consensus/messages.go +++ b/node/consensus/messages.go @@ -73,8 +73,12 @@ func (vm *vote) OutOfSync() (*types.OutOfSyncProof, bool) { // that a new block has been committed to the blockchain. // Ensure that the source of the block announce is the leader. type blockAnnounce struct { - blk *ktypes.Block - ci *types.CommitInfo + blk *ktypes.Block + blkID types.Hash + ci *types.CommitInfo + // doneFn is a callback function that is called after the block has been processed + // and committed. This notifies the node to release the prefetch lock on this block. + done func() } func (bam *blockAnnounce) Type() consensusMsgType { @@ -82,7 +86,7 @@ func (bam *blockAnnounce) Type() consensusMsgType { } func (bam *blockAnnounce) String() string { - return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blk.Hash().String(), bam.ci.AppHash.String()) + return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blkID, bam.ci.AppHash.String()) } // resetState is a message that is sent to the consensus engine to @@ -119,7 +123,7 @@ func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block) { } // NotifyBlockCommit is used by the p2p stream handler to notify the consensus engine of a committed block. -func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { +func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) { leaderU, ok := ci.ParamUpdates[ktypes.ParamNameLeader] leader := ce.leader @@ -133,8 +137,10 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit // AcceptCommit() already verified the correctness of the votes, no need to // re-verify here. blkCommit := &blockAnnounce{ - blk: blk, - ci: ci, + blk: blk, + blkID: blkID, + ci: ci, + done: doneFn, } // only notify if the leader doesn't already know about the block diff --git a/node/interfaces.go b/node/interfaces.go index 796de3e08..31a044a66 100644 --- a/node/interfaces.go +++ b/node/interfaces.go @@ -21,7 +21,7 @@ type ConsensusEngine interface { NotifyBlockProposal(blk *ktypes.Block) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool - NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) + NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) NotifyACK(validatorPK []byte, ack types.AckRes) diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index 087f48a09..adc442140 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -2,7 +2,6 @@ package mempool import ( "context" - "fmt" "io" "slices" "sync" @@ -185,8 +184,6 @@ func (mp *Mempool) PeekN(n, szLimit int) []types.NamedTx { } txns = append(txns, tx) } - - fmt.Printf("PeekN: proposedTxs: %d, remainingTxs: %d", len(txns), len(mp.txQ)-len(txns)) return txns } diff --git a/node/node_test.go b/node/node_test.go index 161abb49c..7cabb8ea7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -308,7 +308,7 @@ func (ce *dummyCE) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.Bloc return !ce.rejectCommit } -func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { +func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, _ func()) { if ce.blockCommitHandler != nil { ce.blockCommitHandler(blk, ci, blkID) return @@ -456,6 +456,10 @@ func TestStreamsBlockFetch(t *testing.T) { time.Sleep(100 * time.Millisecond) + resp := make([]byte, 9) + copy(resp[:], noData) + binary.LittleEndian.PutUint64(resp[1:], 1) + // Link and connect the hosts (was here) // time.Sleep(100 * time.Millisecond) @@ -550,19 +554,18 @@ func TestStreamsBlockFetch(t *testing.T) { b, err := io.ReadAll(s) if err != nil { t.Errorf("ReadAll: %v", err) - } else if !bytes.Equal(b, noData) { - t.Error("expected a no-data response, got", b) + } else if !bytes.Equal(b, resp) { // expect noData + bestHeight (1) + t.Errorf("expected %v, got %v", resp, b) } }) t.Run("request by height using requestFrom, unknown", func(t *testing.T) { // t.Parallel() var height int64 - req, _ := blockHeightReq{height}.MarshalBinary() - _, err := requestFrom(ctx, h2, h1.ID(), req, ProtocolIDBlockHeight, 1e4) + _, err := requestBlockHeight(ctx, h2, h1.ID(), height, 1e4) if err == nil { t.Errorf("expected error but got none") - } else if !errors.Is(err, ErrNotFound) { + } else if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrBlkNotFound) { t.Errorf("unexpected error: %v", err) } }) @@ -592,8 +595,7 @@ func TestStreamsBlockFetch(t *testing.T) { t.Run("request by height using requestFrom, known", func(t *testing.T) { // t.Parallel() var height int64 = 1 - req, _ := blockHeightReq{height}.MarshalBinary() - resp, err := requestFrom(ctx, h2, h1.ID(), req, ProtocolIDBlockHeight, 1e4) + resp, err := requestBlockHeight(ctx, h2, h1.ID(), height, 1e4) if err != nil { t.Errorf("ReadAll: %v", err) } else if bytes.Equal(resp, noData) { diff --git a/node/nogossip.go b/node/nogossip.go index 189ece8c7..0a3f359ef 100644 --- a/node/nogossip.go +++ b/node/nogossip.go @@ -50,7 +50,7 @@ func (n *Node) txAnnStreamHandler(s network.Stream) { // First try to get from this stream. rawTx, err := requestTx(s, []byte(getMsg)) if err != nil { - n.log.Warnf("announcer failed to provide %v, trying other peers", txHash) + n.log.Warnf("announcer failed to provide %v due to error %v, trying other peers", txHash, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first rawTx, err = n.getTxWithRetry(context.TODO(), txHash, 500*time.Millisecond, 10) @@ -121,7 +121,7 @@ func (n *Node) advertiseTxToPeer(ctx context.Context, peerID peer.ID, txHash typ // Send a lightweight advertisement with the object ID _, err = newTxHashAnn(txHash).WriteTo(s) if err != nil { - return fmt.Errorf("txann failed: %w", err) + return fmt.Errorf("txann failed to peer %s: %w", peerID, err) } // n.log.Infof("advertised tx content %s to peer %s", txid, peerID) diff --git a/node/types/interfaces.go b/node/types/interfaces.go index 5f57676b3..599654c0c 100644 --- a/node/types/interfaces.go +++ b/node/types/interfaces.go @@ -16,6 +16,8 @@ var ( ErrBlkNotFound = errors.New("block not available") ErrStillProcessing = errors.New("block still being executed") ErrNoResponse = errors.New("stream closed without response") + ErrPeersNotFound = errors.New("no peers available") + ErrMempoolFull = errors.New("mempool is full") ) const HashLen = types.HashLen