From 3cb5fc8c0f12c60b29d205dd77a294c169caeef3 Mon Sep 17 00:00:00 2001 From: charithabandi Date: Wed, 19 Feb 2025 14:41:57 -0600 Subject: [PATCH] Give CE access to release block fetcher lock upon commit --- app/node/build.go | 2 +- node/block.go | 72 ++++++++++++++++++++++------------- node/consensus/block.go | 18 ++++++--- node/consensus/blocksync.go | 2 +- node/consensus/engine.go | 4 +- node/consensus/engine_test.go | 18 ++++----- node/consensus/follower.go | 57 +++++++++------------------ node/consensus/messages.go | 10 +++-- node/interfaces.go | 2 +- node/mempool/mempool.go | 3 -- node/node_test.go | 18 +++++---- node/nogossip.go | 4 +- node/types/interfaces.go | 2 + 13 files changed, 112 insertions(+), 100 deletions(-) diff --git a/app/node/build.go b/app/node/build.go index e1c37cf1e..450daeace 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -530,7 +530,7 @@ func buildNode(d *coreDependencies, mp *mempool.Mempool, bs *store.BlockStore, failBuild(err, "failed to create node") } - logger.Infof("This node is %s @ %s", node.ID(), node.Addrs()) + logger.Infof("This node is %s @ %s, peerID: %s", node.ID(), node.Addrs(), p2p.Host().ID()) return node } diff --git a/node/block.go b/node/block.go index 528a4c396..3de520cfa 100644 --- a/node/block.go +++ b/node/block.go @@ -115,6 +115,9 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { // If we are a validator and this is the commit ann for a proposed block // that we already started executing, consensus engine will handle it. if !n.ce.AcceptCommit(height, blkHash, hdr, ci, sig) { + // this either means that the ce already has the block or it is not + // ready to accept it yet. In either case, we don't need to do anything + // here. return } @@ -122,7 +125,6 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { // code like a sentry node might do. need, done := n.bki.PreFetch(blkHash) - defer done() if !need { n.log.Debug("ALREADY HAVE OR FETCHING BLOCK") return // we have or are currently fetching it, do nothing, assuming we have already re-announced @@ -131,28 +133,38 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { n.log.Debug("retrieving new block", "blockID", blkid) t0 := time.Now() + peerID := s.Conn().RemotePeer() + // First try to get from this stream. rawBlk, err := request(s, []byte(getMsg), blkReadLimit) if err != nil { - n.log.Warnf("announcer failed to provide %v, trying other peers", blkid) + n.log.Warnf("announcer failed to provide %v due to error: %w, trying other peers", blkid, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first var gotHeight int64 var gotCI *types.CommitInfo - - gotHeight, rawBlk, gotCI, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10) + var id peer.ID + gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10) if err != nil { n.log.Errorf("unable to retrieve tx %v: %v", blkid, err) + done() return } if gotHeight != height { n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight) + done() return } if gotCI != nil && gotCI.AppHash != ci.AppHash { n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash) + done() return } + // Ensure that the peerID from which the block was downloaded is a valid one. + if id != "" { + n.log.Errorf("getblk response had unexpected peerID: %v", id) + } + peerID = id } n.log.Debugf("obtained content for block %q in %v", blkid, time.Since(t0)) @@ -160,23 +172,26 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { blk, err := ktypes.DecodeBlock(rawBlk) if err != nil { n.log.Infof("decodeBlock failed for %v: %v", blkid, err) + done() return } if blk.Header.Height != height { n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height) + done() return } gotBlkHash := blk.Header.Hash() if gotBlkHash != blkHash { n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash) + done() return } // re-announce - n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer()) - n.ce.NotifyBlockCommit(blk, ci, blkHash) + n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID) + n.ce.NotifyBlockCommit(blk, ci, blkHash, done) go func() { - n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature + n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature }() } @@ -200,6 +215,7 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in if peerID == from { continue } + n.log.Debugf("advertising block %s (height %d / sz %d / updates %v) to peer %v", blkHash, height, len(rawBlk), ci.ParamUpdates, peerID) resID, err := blockAnnMsg{ @@ -224,12 +240,12 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in } func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDelay time.Duration, - maxAttempts int) (int64, []byte, *types.CommitInfo, error) { + maxAttempts int) (int64, []byte, *types.CommitInfo, peer.ID, error) { var attempts int for { - height, raw, ci, err := n.getBlk(ctx, blkHash) + height, raw, ci, peer, err := n.getBlk(ctx, blkHash) if err == nil { - return height, raw, ci, nil + return height, raw, ci, peer, nil } n.log.Warnf("unable to retrieve block %v (%v), waiting to retry", blkHash, err) @@ -241,14 +257,13 @@ func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDela baseDelay *= 2 attempts++ if attempts >= maxAttempts { - return 0, nil, nil, ErrBlkNotFound + return 0, nil, nil, "", ErrBlkNotFound } } } -func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, error) { +func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, peer.ID, error) { for _, peer := range n.peers() { - n.log.Infof("requesting block %v from %v", blkHash, peer) t0 := time.Now() resID, _ := blockHashReq{Hash: blkHash}.MarshalBinary() resp, err := requestFrom(ctx, n.host, peer, resID, ProtocolIDBlock, blkReadLimit) @@ -269,7 +284,6 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, * n.log.Info("block response too short", "peer", peer, "hash", blkHash) continue } - n.log.Debug("Obtained content for block", "block", blkHash, "elapsed", time.Since(t0)) rd := bytes.NewReader(resp) @@ -297,9 +311,9 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, * continue } - return height, rawBlk, &ci, nil + return height, rawBlk, &ci, peer, nil } - return 0, nil, nil, ErrBlkNotFound + return 0, nil, nil, "", ErrBlkNotFound } func requestBlockHeight(ctx context.Context, host host.Host, peer peer.ID, @@ -397,23 +411,27 @@ func (n *Node) getBlkHeight(ctx context.Context, height int64) (types.Hash, []by } func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Logger) (types.Hash, []byte, *types.CommitInfo, int64, error) { - peers := peerHosts(host) - if len(peers) == 0 { - return types.Hash{}, nil, nil, 0, errors.New("no peers to request block from") + availablePeers := peerHosts(host) + if len(availablePeers) == 0 { + return types.Hash{}, nil, nil, 0, types.ErrPeersNotFound } - // min: 1, max: 10 - cnt := max(len(peers)/5, 1) // 20% of peers // TODO: Not sure what the correct percentage is here + cnt := max(len(availablePeers)/5, 1) // 20% of peers + availablePeers = availablePeers[:cnt] + // incremented when a peer's best height is one less than the requested height + // to help determine if the block has not been committed yet and stop + // requesting the block from other peers if enough peers indicate that the + // block is not available. bestHCnt := 0 - peers = peers[:cnt] var bestHeight int64 - for _, peer := range peers { - if bestHCnt == 5 { // TODO: Not sure what the correct break condition is here - // essentially, we are trying to break from the loop if the blk is not made yet. + for _, peer := range availablePeers { + if bestHCnt == 5 { + // stop requesting the block if there is an indication that + // the block is not made yet. break } - log.Infof("requesting block number %d from %v", height, peer) + t0 := time.Now() resp, err := requestBlockHeight(ctx, host, peer, height, blkReadLimit) if errors.Is(err, ErrNotFound) || errors.Is(err, ErrBlkNotFound) { @@ -426,7 +444,7 @@ func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Log if theirBest == height-1 { bestHCnt++ } - log.Infof("block %d not found on peer; their best height is %d", height, theirBest) + log.Infof("block %d not found on peer %s; their best height is %d", height, peer, theirBest) } else { log.Warnf("block not available on %v", peer) } diff --git a/node/consensus/block.go b/node/consensus/block.go index 13c4c5321..7b6010c63 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -133,11 +133,15 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transacti txHash := types.HashBytes(rawTx) // add the transaction to the mempool - ce.mempool.Store(txHash, tx) + have, rejected := ce.mempool.Store(txHash, tx) + if rejected { + return &ktypes.ResultBroadcastTx{ + Hash: txHash, + }, types.ErrMempoolFull + } - // Announce the transaction to the network - if ce.txAnnouncer != nil { - ce.log.Debugf("broadcasting new tx %v", txHash) + // Announce the transaction to the network only if not previously announced + if ce.txAnnouncer != nil && !have { // We can't use parent context 'cause it's canceled in the caller, which // could be the RPC request. handler. This shouldn't be CE's problem... go ce.txAnnouncer(context.Background(), txHash, rawTx) @@ -260,7 +264,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { ce.mempool.RecheckTxs(ctx, ce.recheckTx) ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), - "appHash", appHash.String(), "updates", ce.state.blockRes.paramUpdates) + "appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns, "updates", ce.state.blockRes.paramUpdates) // update and reset the state fields ce.nextState() @@ -297,6 +301,8 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error { } ce.resetState() + ce.stateInfo.hasBlock.Store(ce.state.lc.height) + return nil } @@ -318,6 +324,8 @@ func (ce *ConsensusEngine) resetState() { ce.stateInfo.lastCommit = *ce.state.lc ce.stateInfo.mtx.Unlock() + ce.stateInfo.hasBlock.Store(ce.state.lc.height) + ce.cancelFnMtx.Lock() ce.blkExecCancelFn = nil ce.longRunningTxs = make([]ktypes.Hash, 0) diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index 55aab63e3..9d29686e8 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -96,7 +96,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context, requester if errors.Is(err, context.Canceled) { return err } - if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) { + if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrPeersNotFound) { break // no peers have this block, assume block sync is complete, continue with consensus } ce.log.Warn("unexpected error requesting block from the network", "height", height, "error", err) diff --git a/node/consensus/engine.go b/node/consensus/engine.go index be1c0a3eb..2ec2c171d 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons case *blockAnnounce: preRole := ce.role.Load() - if err := ce.commitBlock(ctx, v.blk, v.ci); err != nil { + if err := ce.commitBlock(ctx, v.blk, v.ci, v.done); err != nil { ce.log.Error("Error processing committed block announcement", "error", err) return } @@ -809,6 +809,8 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, appHash []byte, blk * blkHash: blkHash, } copy(ce.stateInfo.lastCommit.appHash[:], appHash) + + ce.stateInfo.hasBlock.Store(height) } // replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet. diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index d61ef7fa0..c1ed689a2 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -285,7 +285,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commit", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp1.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash) + val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -311,7 +311,7 @@ func TestValidatorStateMachine(t *testing.T) { { name: "commit(InvalidAppHash)", trigger: func(t *testing.T, leader, val *ConsensusEngine) { - val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash) + val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { if val.lastCommitHeight() != 0 { @@ -350,7 +350,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -386,7 +386,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -413,7 +413,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -467,7 +467,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -494,7 +494,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -548,7 +548,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) @@ -593,7 +593,7 @@ func TestValidatorStateMachine(t *testing.T) { name: "commitNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val) - val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash) + val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Committed, 1, zeroHash) diff --git a/node/consensus/follower.go b/node/consensus/follower.go index e4ddd55fa..821906ce6 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -132,12 +132,7 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types // 1. If the node is a sentry node and doesn't have the block. // 2. If the node is a validator and missed the block proposal message. func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool { - // NOTE: If not enough validators have agreed to promote the node as a leader, - // the node will stop receiving any committed blocks from the network and - // is stuck waiting for the votes. The only way to recover the node is to - // restart it, so that it can start if ce.stateInfo.hasBlock.Load() == height { - ce.log.Infof("CE already been notified about the block, height: %d, blockID: %s", height, blkID) return false } @@ -173,41 +168,14 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty ce.log.Infof("Received block with leader update, new leader: %s old leader: %s", hex.EncodeToString(leader.Bytes()), hex.EncodeToString(ce.leader.Bytes())) } - return true - // Leader signature verification is not required as long as the commitInfo includes the signatures // from majority of the validators. There can also scenarios where the node tried to promote a new // leader candidate, but the candidate did not receive enough votes to be promoted as a leader. - // In such cases, the old leader prodces the block, but this node will not accept the blkAnn message + // In such cases, the old leader produces the block, but this node will not accept the blkAnn message // from the old leader, as the node has a different leader now. So accept the committed block as - // long as the block is valid, without worrying about who the leader is. - - // // check if this is the first time we are hearing about this block and not already downloaded it. - // blk, _, err := ce.blockStore.Get(blkID) - // if err != nil { - // if errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrBlkNotFound) { - // ce.log.Debugf("Block not found in the store, request it from the network", "height", height, "blockID", blkID) - // return true // we want it - // } - // ce.log.Error("Unexpected error getting block from blockstore", "error", err) - // return false - // } - - // // no need to worry if we are currently processing a different block, commitBlock will take care of it. - // // just ensure that you are processing the block which is for the height after the last committed block. - // blkCommit := &blockAnnounce{ - // ci: ci, - // blk: blk, - // } - - // ce.log.Infof("Notifying the CE of the blkAnn msg as we already have the block", "height", height, "blockID", blkID) - // // Notify only if CE hasn't already acknowledged the block - // go ce.sendConsensusMessage(&consensusMessage{ - // MsgType: blkCommit.Type(), - // Msg: blkCommit, - // Sender: leader.Bytes(), - // }) + // long as the block is accepted by the majority of the validators. + return true } // ProcessBlockProposal is used by the validator's consensus engine to process the new block proposal message. @@ -333,11 +301,16 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg // If the validator node processed a different block, it should rollback and reprocess the block. // Validator nodes can skip the block execution and directly commit the block if they have already processed the block. // The nodes should only commit the block if the appHash is valid, else halt the node. -func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo) error { +func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, doneFn func()) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() - ce.log.Info("In CommitBlock", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "ceLCHeight", ce.state.lc.height, "ceprop", ce.state.blkProp, "ceBlkRes", ce.state.blockRes) + defer func() { + if doneFn != nil && ce.state.lc.height == blk.Header.Height { + // Block has been committed, release the prefetch lock on the block + doneFn() + } + }() if ce.state.lc.height+1 != blk.Header.Height { // only accept/process the block if it is for the next height return nil @@ -372,7 +345,10 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c } // The block is already processed, just validate the appHash and commit the block if valid. - ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) + oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height) + if oldH != ce.state.lc.height { + return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blk.Header.Hash()) + } if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics haltR := fmt.Sprintf("Incorrect ParamUpdates, halting the node. received: %s, computed: %s", ci.ParamUpdates, ce.state.blockRes.paramUpdates) @@ -410,7 +386,10 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo ce.log.Info("Processing committed block", "height", blk.Header.Height, "blockID", blkID, "appHash", ci.AppHash) // set the hasBlock to the height of the block - ce.stateInfo.hasBlock.CompareAndSwap(ce.state.lc.height, blk.Header.Height) + oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height) + if oldH != ce.state.lc.height { + return fmt.Errorf("block %d already processed, duplicate block announcement received %s", oldH, blkID) + } if err := ce.validateBlock(blk); err != nil { return err diff --git a/node/consensus/messages.go b/node/consensus/messages.go index 7d19c94bf..127462c80 100644 --- a/node/consensus/messages.go +++ b/node/consensus/messages.go @@ -75,6 +75,9 @@ func (vm *vote) OutOfSync() (*types.OutOfSyncProof, bool) { type blockAnnounce struct { blk *ktypes.Block ci *types.CommitInfo + // doneFn is a callback function that is called after the block has been processed + // and committed. This notifies the node to release the prefetch lock on this block. + done func() } func (bam *blockAnnounce) Type() consensusMsgType { @@ -119,7 +122,7 @@ func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block) { } // NotifyBlockCommit is used by the p2p stream handler to notify the consensus engine of a committed block. -func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { +func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) { leaderU, ok := ci.ParamUpdates[ktypes.ParamNameLeader] leader := ce.leader @@ -133,8 +136,9 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit // AcceptCommit() already verified the correctness of the votes, no need to // re-verify here. blkCommit := &blockAnnounce{ - blk: blk, - ci: ci, + blk: blk, + ci: ci, + done: doneFn, } // only notify if the leader doesn't already know about the block diff --git a/node/interfaces.go b/node/interfaces.go index 796de3e08..31a044a66 100644 --- a/node/interfaces.go +++ b/node/interfaces.go @@ -21,7 +21,7 @@ type ConsensusEngine interface { NotifyBlockProposal(blk *ktypes.Block) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool - NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) + NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) NotifyACK(validatorPK []byte, ack types.AckRes) diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index 4d062ed91..b188f5615 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -2,7 +2,6 @@ package mempool import ( "context" - "fmt" "io" "slices" "sync" @@ -185,8 +184,6 @@ func (mp *Mempool) PeekN(n, szLimit int) []types.NamedTx { } txns = append(txns, tx) } - - fmt.Printf("PeekN: proposedTxs: %d, remainingTxs: %d", len(txns), len(mp.txQ)-len(txns)) return txns } diff --git a/node/node_test.go b/node/node_test.go index e8351d90e..6480d5d8e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -304,7 +304,7 @@ func (ce *dummyCE) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.Bloc return !ce.rejectCommit } -func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) { +func (ce *dummyCE) NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, _ func()) { if ce.blockCommitHandler != nil { ce.blockCommitHandler(blk, ci, blkID) return @@ -452,6 +452,10 @@ func TestStreamsBlockFetch(t *testing.T) { time.Sleep(100 * time.Millisecond) + resp := make([]byte, 9) + copy(resp[:], noData) + binary.LittleEndian.PutUint64(resp[1:], 1) + // Link and connect the hosts (was here) // time.Sleep(100 * time.Millisecond) @@ -546,19 +550,18 @@ func TestStreamsBlockFetch(t *testing.T) { b, err := io.ReadAll(s) if err != nil { t.Errorf("ReadAll: %v", err) - } else if !bytes.Equal(b, noData) { - t.Error("expected a no-data response, got", b) + } else if !bytes.Equal(b, resp) { // expect noData + bestHeight (1) + t.Errorf("expected %v, got %v", resp, b) } }) t.Run("request by height using requestFrom, unknown", func(t *testing.T) { // t.Parallel() var height int64 - req, _ := blockHeightReq{height}.MarshalBinary() - _, err := requestFrom(ctx, h2, h1.ID(), req, ProtocolIDBlockHeight, 1e4) + _, err := requestBlockHeight(ctx, h2, h1.ID(), height, 1e4) if err == nil { t.Errorf("expected error but got none") - } else if !errors.Is(err, ErrNotFound) { + } else if !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrBlkNotFound) { t.Errorf("unexpected error: %v", err) } }) @@ -588,8 +591,7 @@ func TestStreamsBlockFetch(t *testing.T) { t.Run("request by height using requestFrom, known", func(t *testing.T) { // t.Parallel() var height int64 = 1 - req, _ := blockHeightReq{height}.MarshalBinary() - resp, err := requestFrom(ctx, h2, h1.ID(), req, ProtocolIDBlockHeight, 1e4) + resp, err := requestBlockHeight(ctx, h2, h1.ID(), height, 1e4) if err != nil { t.Errorf("ReadAll: %v", err) } else if bytes.Equal(resp, noData) { diff --git a/node/nogossip.go b/node/nogossip.go index 189ece8c7..0d1e1fac9 100644 --- a/node/nogossip.go +++ b/node/nogossip.go @@ -50,7 +50,7 @@ func (n *Node) txAnnStreamHandler(s network.Stream) { // First try to get from this stream. rawTx, err := requestTx(s, []byte(getMsg)) if err != nil { - n.log.Warnf("announcer failed to provide %v, trying other peers", txHash) + n.log.Warnf("announcer failed to provide %v due to error %w, trying other peers", txHash, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first rawTx, err = n.getTxWithRetry(context.TODO(), txHash, 500*time.Millisecond, 10) @@ -121,7 +121,7 @@ func (n *Node) advertiseTxToPeer(ctx context.Context, peerID peer.ID, txHash typ // Send a lightweight advertisement with the object ID _, err = newTxHashAnn(txHash).WriteTo(s) if err != nil { - return fmt.Errorf("txann failed: %w", err) + return fmt.Errorf("txann failed to peer %s: %w", peerID, err) } // n.log.Infof("advertised tx content %s to peer %s", txid, peerID) diff --git a/node/types/interfaces.go b/node/types/interfaces.go index 5f57676b3..599654c0c 100644 --- a/node/types/interfaces.go +++ b/node/types/interfaces.go @@ -16,6 +16,8 @@ var ( ErrBlkNotFound = errors.New("block not available") ErrStillProcessing = errors.New("block still being executed") ErrNoResponse = errors.New("stream closed without response") + ErrPeersNotFound = errors.New("no peers available") + ErrMempoolFull = errors.New("mempool is full") ) const HashLen = types.HashLen