From 91bfebdf9d754c10743c0f59c303b1b62779d169 Mon Sep 17 00:00:00 2001 From: charithabandi Date: Fri, 21 Feb 2025 12:38:34 -0600 Subject: [PATCH] address comments --- node/block.go | 2 +- node/consensus/block.go | 5 +++-- node/consensus/blocksync.go | 12 ++++++------ node/consensus/engine.go | 6 +++--- node/consensus/follower.go | 21 ++++++++++----------- node/consensus/messages.go | 14 ++++++++------ node/nogossip.go | 2 +- 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/node/block.go b/node/block.go index 3de520cfa..f4fc8feed 100644 --- a/node/block.go +++ b/node/block.go @@ -138,7 +138,7 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) { // First try to get from this stream. rawBlk, err := request(s, []byte(getMsg), blkReadLimit) if err != nil { - n.log.Warnf("announcer failed to provide %v due to error: %w, trying other peers", blkid, err) + n.log.Warnf("announcer failed to provide %v due to error: %v, trying other peers", blkid, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first var gotHeight int64 diff --git a/node/consensus/block.go b/node/consensus/block.go index 7b6010c63..b14f7a5af 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -63,7 +63,8 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error { return fmt.Errorf("block size %d exceeds max block size %d", blockTxnsSize, maxBlockSize) } - // maxVotesPerTx + // Ensure that the number of event and resolution IDs within validator vote transactions votes + // per transaction does not exceed the max consensus limit. maxVotesPerTx := ce.ConsensusParams().MaxVotesPerTx for _, txn := range blk.Txns { if txn.Body.PayloadType == ktypes.PayloadTypeValidatorVoteBodies { @@ -264,7 +265,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { ce.mempool.RecheckTxs(ctx, ce.recheckTx) ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), - "appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns, "updates", ce.state.blockRes.paramUpdates) + "appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns) // update and reset the state fields ce.nextState() diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index 9d29686e8..2f47e9ec3 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -130,25 +130,25 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh // syncBlockWithRetry fetches the specified block from the network and keeps retrying until // the block is successfully retrieved from the network. func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error { - _, rawBlk, ci, err := ce.getBlock(ctx, height) + blkID, rawBlk, ci, err := ce.getBlock(ctx, height) if err != nil { return fmt.Errorf("failed to get block from the network: %w", err) } - return ce.applyBlock(ctx, rawBlk, ci) + return ce.applyBlock(ctx, rawBlk, ci, blkID) } // syncBlock fetches the specified block from the network func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error { - _, rawblk, ci, _, err := ce.blkRequester(ctx, height) + blkID, rawblk, ci, _, err := ce.blkRequester(ctx, height) if err != nil { return fmt.Errorf("failed to get block from the network: %w", err) } - return ce.applyBlock(ctx, rawblk, ci) + return ce.applyBlock(ctx, rawblk, ci, blkID) } -func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo) error { +func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo, blkID types.Hash) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() @@ -157,7 +157,7 @@ func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ty return fmt.Errorf("failed to decode block: %w", err) } - if err := ce.processAndCommit(ctx, blk, ci); err != nil { + if err := ce.processAndCommit(ctx, blk, ci, blkID); err != nil { return fmt.Errorf("failed to apply block: %w", err) } diff --git a/node/consensus/engine.go b/node/consensus/engine.go index 2ec2c171d..16bc2316f 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons case *blockAnnounce: preRole := ce.role.Load() - if err := ce.commitBlock(ctx, v.blk, v.ci, v.done); err != nil { + if err := ce.commitBlock(ctx, v.blk, v.ci, v.blkID, v.done); err != nil { ce.log.Error("Error processing committed block announcement", "error", err) return } @@ -834,7 +834,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight return nil // no more blocks to replay } - err = ce.processAndCommit(ctx, blk, ci) + err = ce.processAndCommit(ctx, blk, ci, blk.Hash()) if err != nil { return fmt.Errorf("failed replaying block: %w", err) } @@ -957,7 +957,7 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error { return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } - if err := ce.processAndCommit(ctx, blk, ci); err != nil { + if err := ce.processAndCommit(ctx, blk, ci, blkHash); err != nil { return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } // recovered to the correct block -> continue to replay blocks from network diff --git a/node/consensus/follower.go b/node/consensus/follower.go index 821906ce6..77406ea0e 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -301,7 +301,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg // If the validator node processed a different block, it should rollback and reprocess the block. // Validator nodes can skip the block execution and directly commit the block if they have already processed the block. // The nodes should only commit the block if the appHash is valid, else halt the node. -func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, doneFn func()) error { +func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() @@ -324,30 +324,30 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c // - Incorrect AppHash: Halt the node. if ce.role.Load() == types.RoleSentry { - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } // You are a validator if ce.state.blkProp == nil { - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } - if ce.state.blkProp.blkHash != blk.Header.Hash() { - ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "processedBlockID", ce.state.blkProp.blkHash) + if ce.state.blkProp.blkHash != blkID { + ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blkID, "processedBlockID", ce.state.blkProp.blkHash) if err := ce.rollbackState(ctx); err != nil { - ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "error", err) + ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blkID, "error", err) // that's ok??? return fmt.Errorf("error aborting execution of incorrect block proposal: %w", err) } - return ce.processAndCommit(ctx, blk, ci) + return ce.processAndCommit(ctx, blk, ci, blkID) } // The block is already processed, just validate the appHash and commit the block if valid. oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height) if oldH != ce.state.lc.height { - return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blk.Header.Hash()) + return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blkID) } if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics @@ -377,8 +377,7 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c // processAndCommit: used by the sentry nodes and slow validators to process and commit the block. // This is used when the acks are not required to be sent back to the leader, essentially in catchup mode. -func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo) error { - blkID := blk.Header.Hash() +func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) error { if ci == nil { return fmt.Errorf("commitInfo is nil") } @@ -408,7 +407,7 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo ce.state.blkProp = &blockProposal{ height: blk.Header.Height, - blkHash: blk.Header.Hash(), + blkHash: blkID, blk: blk, } diff --git a/node/consensus/messages.go b/node/consensus/messages.go index 127462c80..678779ddd 100644 --- a/node/consensus/messages.go +++ b/node/consensus/messages.go @@ -73,8 +73,9 @@ func (vm *vote) OutOfSync() (*types.OutOfSyncProof, bool) { // that a new block has been committed to the blockchain. // Ensure that the source of the block announce is the leader. type blockAnnounce struct { - blk *ktypes.Block - ci *types.CommitInfo + blk *ktypes.Block + blkID types.Hash + ci *types.CommitInfo // doneFn is a callback function that is called after the block has been processed // and committed. This notifies the node to release the prefetch lock on this block. done func() @@ -85,7 +86,7 @@ func (bam *blockAnnounce) Type() consensusMsgType { } func (bam *blockAnnounce) String() string { - return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blk.Hash().String(), bam.ci.AppHash.String()) + return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blkID, bam.ci.AppHash.String()) } // resetState is a message that is sent to the consensus engine to @@ -136,9 +137,10 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit // AcceptCommit() already verified the correctness of the votes, no need to // re-verify here. blkCommit := &blockAnnounce{ - blk: blk, - ci: ci, - done: doneFn, + blk: blk, + blkID: blkID, + ci: ci, + done: doneFn, } // only notify if the leader doesn't already know about the block diff --git a/node/nogossip.go b/node/nogossip.go index 0d1e1fac9..0a3f359ef 100644 --- a/node/nogossip.go +++ b/node/nogossip.go @@ -50,7 +50,7 @@ func (n *Node) txAnnStreamHandler(s network.Stream) { // First try to get from this stream. rawTx, err := requestTx(s, []byte(getMsg)) if err != nil { - n.log.Warnf("announcer failed to provide %v due to error %w, trying other peers", txHash, err) + n.log.Warnf("announcer failed to provide %v due to error %v, trying other peers", txHash, err) // Since we are aware, ask other peers. we could also put this in a goroutine s.Close() // close the announcers stream first rawTx, err = n.getTxWithRetry(context.TODO(), txHash, 500*time.Millisecond, 10)