Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 21, 2025
1 parent 3cb5fc8 commit 91bfebd
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 30 deletions.
2 changes: 1 addition & 1 deletion node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
n.log.Warnf("announcer failed to provide %v due to error: %w, trying other peers", blkid, err)
n.log.Warnf("announcer failed to provide %v due to error: %v, trying other peers", blkid, err)
// Since we are aware, ask other peers. we could also put this in a goroutine
s.Close() // close the announcers stream first
var gotHeight int64
Expand Down
5 changes: 3 additions & 2 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error {
return fmt.Errorf("block size %d exceeds max block size %d", blockTxnsSize, maxBlockSize)
}

// maxVotesPerTx
// Ensure that the number of event and resolution IDs within validator vote transactions votes
// per transaction does not exceed the max consensus limit.
maxVotesPerTx := ce.ConsensusParams().MaxVotesPerTx
for _, txn := range blk.Txns {
if txn.Body.PayloadType == ktypes.PayloadTypeValidatorVoteBodies {
Expand Down Expand Up @@ -264,7 +265,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempool.RecheckTxs(ctx, ce.recheckTx)

ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(),
"appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns, "updates", ce.state.blockRes.paramUpdates)
"appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns)

// update and reset the state fields
ce.nextState()
Expand Down
12 changes: 6 additions & 6 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,25 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh
// syncBlockWithRetry fetches the specified block from the network and keeps retrying until
// the block is successfully retrieved from the network.
func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error {
_, rawBlk, ci, err := ce.getBlock(ctx, height)
blkID, rawBlk, ci, err := ce.getBlock(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawBlk, ci)
return ce.applyBlock(ctx, rawBlk, ci, blkID)
}

// syncBlock fetches the specified block from the network
func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error {
_, rawblk, ci, _, err := ce.blkRequester(ctx, height)
blkID, rawblk, ci, _, err := ce.blkRequester(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawblk, ci)
return ce.applyBlock(ctx, rawblk, ci, blkID)
}

func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo) error {
func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo, blkID types.Hash) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

Expand All @@ -157,7 +157,7 @@ func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ty
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
if err := ce.processAndCommit(ctx, blk, ci, blkID); err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}

Expand Down
6 changes: 3 additions & 3 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons

case *blockAnnounce:
preRole := ce.role.Load()
if err := ce.commitBlock(ctx, v.blk, v.ci, v.done); err != nil {
if err := ce.commitBlock(ctx, v.blk, v.ci, v.blkID, v.done); err != nil {
ce.log.Error("Error processing committed block announcement", "error", err)
return
}
Expand Down Expand Up @@ -834,7 +834,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight
return nil // no more blocks to replay
}

err = ce.processAndCommit(ctx, blk, ci)
err = ce.processAndCommit(ctx, blk, ci, blk.Hash())
if err != nil {
return fmt.Errorf("failed replaying block: %w", err)
}
Expand Down Expand Up @@ -957,7 +957,7 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
if err := ce.processAndCommit(ctx, blk, ci, blkHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}
// recovered to the correct block -> continue to replay blocks from network
Expand Down
21 changes: 10 additions & 11 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, doneFn func()) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

Expand All @@ -324,30 +324,30 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c
// - Incorrect AppHash: Halt the node.

if ce.role.Load() == types.RoleSentry {
return ce.processAndCommit(ctx, blk, ci)
return ce.processAndCommit(ctx, blk, ci, blkID)
}

// You are a validator
if ce.state.blkProp == nil {
return ce.processAndCommit(ctx, blk, ci)
return ce.processAndCommit(ctx, blk, ci, blkID)
}

if ce.state.blkProp.blkHash != blk.Header.Hash() {
ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "processedBlockID", ce.state.blkProp.blkHash)
if ce.state.blkProp.blkHash != blkID {
ce.log.Info("Received committed block is different from the block processed, rollback and process the committed block", "height", blk.Header.Height, "blockID", blkID, "processedBlockID", ce.state.blkProp.blkHash)

if err := ce.rollbackState(ctx); err != nil {
ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blk.Header.Hash(), "error", err)
ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blockID", blkID, "error", err)
// that's ok???
return fmt.Errorf("error aborting execution of incorrect block proposal: %w", err)
}

return ce.processAndCommit(ctx, blk, ci)
return ce.processAndCommit(ctx, blk, ci, blkID)
}

// The block is already processed, just validate the appHash and commit the block if valid.
oldH := ce.stateInfo.hasBlock.Swap(blk.Header.Height)
if oldH != ce.state.lc.height {
return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blk.Header.Hash())
return fmt.Errorf("block %d already processed, duplicate commitBlock %s", oldH, blkID)
}

if !ce.state.blockRes.paramUpdates.Equals(ci.ParamUpdates) { // this is absorbed in apphash anyway, but helps diagnostics
Expand Down Expand Up @@ -377,8 +377,7 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, c

// processAndCommit: used by the sentry nodes and slow validators to process and commit the block.
// This is used when the acks are not required to be sent back to the leader, essentially in catchup mode.
func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo) error {
blkID := blk.Header.Hash()
func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash) error {
if ci == nil {
return fmt.Errorf("commitInfo is nil")
}
Expand Down Expand Up @@ -408,7 +407,7 @@ func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *ktypes.Blo

ce.state.blkProp = &blockProposal{
height: blk.Header.Height,
blkHash: blk.Header.Hash(),
blkHash: blkID,
blk: blk,
}

Expand Down
14 changes: 8 additions & 6 deletions node/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (vm *vote) OutOfSync() (*types.OutOfSyncProof, bool) {
// that a new block has been committed to the blockchain.
// Ensure that the source of the block announce is the leader.
type blockAnnounce struct {
blk *ktypes.Block
ci *types.CommitInfo
blk *ktypes.Block
blkID types.Hash
ci *types.CommitInfo
// doneFn is a callback function that is called after the block has been processed
// and committed. This notifies the node to release the prefetch lock on this block.
done func()
Expand All @@ -85,7 +86,7 @@ func (bam *blockAnnounce) Type() consensusMsgType {
}

func (bam *blockAnnounce) String() string {
return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blk.Hash().String(), bam.ci.AppHash.String())
return fmt.Sprintf("BlockAnnounce {height: %d, blkHash: %s, appHash: %s}", bam.blk.Header.Height, bam.blkID, bam.ci.AppHash.String())
}

// resetState is a message that is sent to the consensus engine to
Expand Down Expand Up @@ -136,9 +137,10 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit
// AcceptCommit() already verified the correctness of the votes, no need to
// re-verify here.
blkCommit := &blockAnnounce{
blk: blk,
ci: ci,
done: doneFn,
blk: blk,
blkID: blkID,
ci: ci,
done: doneFn,
}

// only notify if the leader doesn't already know about the block
Expand Down
2 changes: 1 addition & 1 deletion node/nogossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (n *Node) txAnnStreamHandler(s network.Stream) {
// First try to get from this stream.
rawTx, err := requestTx(s, []byte(getMsg))
if err != nil {
n.log.Warnf("announcer failed to provide %v due to error %w, trying other peers", txHash, err)
n.log.Warnf("announcer failed to provide %v due to error %v, trying other peers", txHash, err)
// Since we are aware, ask other peers. we could also put this in a goroutine
s.Close() // close the announcers stream first
rawTx, err = n.getTxWithRetry(context.TODO(), txHash, 500*time.Millisecond, 10)
Expand Down

0 comments on commit 91bfebd

Please sign in to comment.