Skip to content

Commit

Permalink
Give CE access to release block fetcher lock upon commit
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 21, 2025
1 parent f711f1a commit a43523e
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 121 deletions.
2 changes: 1 addition & 1 deletion app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func buildNode(d *coreDependencies, mp *mempool.Mempool, bs *store.BlockStore,
failBuild(err, "failed to create node")
}

logger.Infof("This node is %s @ %s", node.ID(), node.Addrs())
logger.Infof("This node is %s @ %s, peerID: %s", node.ID(), node.Addrs(), p2p.Host().ID())
return node
}

Expand Down
72 changes: 45 additions & 27 deletions node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
// If we are a validator and this is the commit ann for a proposed block
// that we already started executing, consensus engine will handle it.
if !n.ce.AcceptCommit(height, blkHash, hdr, ci, sig) {
// this either means that the ce already has the block or it is not
// ready to accept it yet. In either case, we don't need to do anything
// here.
return
}

// Possibly ce will handle it regardless. For now, below is block store
// code like a sentry node might do.

need, done := n.bki.PreFetch(blkHash)
defer done()
if !need {
n.log.Debug("ALREADY HAVE OR FETCHING BLOCK")
return // we have or are currently fetching it, do nothing, assuming we have already re-announced
Expand All @@ -131,52 +133,65 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Debug("retrieving new block", "blockID", blkid)
t0 := time.Now()

peerID := s.Conn().RemotePeer()

// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
n.log.Warnf("announcer failed to provide %v, trying other peers", blkid)
n.log.Warnf("announcer failed to provide %v due to error: %v, trying other peers", blkid, err)
// Since we are aware, ask other peers. we could also put this in a goroutine
s.Close() // close the announcers stream first
var gotHeight int64
var gotCI *types.CommitInfo

gotHeight, rawBlk, gotCI, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
var id peer.ID
gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
if err != nil {
n.log.Errorf("unable to retrieve tx %v: %v", blkid, err)
done()
return
}
if gotHeight != height {
n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight)
done()
return
}
if gotCI != nil && gotCI.AppHash != ci.AppHash {
n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash)
done()
return
}
// Ensure that the peerID from which the block was downloaded is a valid one.
if id != "" {
n.log.Errorf("getblk response had unexpected peerID: %v", id)
}
peerID = id
}

n.log.Debugf("obtained content for block %q in %v", blkid, time.Since(t0))

blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
n.log.Infof("decodeBlock failed for %v: %v", blkid, err)
done()
return
}
if blk.Header.Height != height {
n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height)
done()
return
}
gotBlkHash := blk.Header.Hash()
if gotBlkHash != blkHash {
n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash)
done()
return
}

// re-announce
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer())
n.ce.NotifyBlockCommit(blk, ci, blkHash)
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID)
n.ce.NotifyBlockCommit(blk, ci, blkHash, done)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature
}()
}

Expand All @@ -200,6 +215,7 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in
if peerID == from {
continue
}

n.log.Debugf("advertising block %s (height %d / sz %d / updates %v) to peer %v",
blkHash, height, len(rawBlk), ci.ParamUpdates, peerID)
resID, err := blockAnnMsg{
Expand All @@ -224,12 +240,12 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in
}

func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDelay time.Duration,
maxAttempts int) (int64, []byte, *types.CommitInfo, error) {
maxAttempts int) (int64, []byte, *types.CommitInfo, peer.ID, error) {
var attempts int
for {
height, raw, ci, err := n.getBlk(ctx, blkHash)
height, raw, ci, peer, err := n.getBlk(ctx, blkHash)
if err == nil {
return height, raw, ci, nil
return height, raw, ci, peer, nil
}

n.log.Warnf("unable to retrieve block %v (%v), waiting to retry", blkHash, err)
Expand All @@ -241,14 +257,13 @@ func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDela
baseDelay *= 2
attempts++
if attempts >= maxAttempts {
return 0, nil, nil, ErrBlkNotFound
return 0, nil, nil, "", ErrBlkNotFound
}
}
}

func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, error) {
func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, peer.ID, error) {
for _, peer := range n.peers() {
n.log.Infof("requesting block %v from %v", blkHash, peer)
t0 := time.Now()
resID, _ := blockHashReq{Hash: blkHash}.MarshalBinary()
resp, err := requestFrom(ctx, n.host, peer, resID, ProtocolIDBlock, blkReadLimit)
Expand All @@ -269,7 +284,6 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *
n.log.Info("block response too short", "peer", peer, "hash", blkHash)
continue
}

n.log.Debug("Obtained content for block", "block", blkHash, "elapsed", time.Since(t0))

rd := bytes.NewReader(resp)
Expand Down Expand Up @@ -297,9 +311,9 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *
continue
}

return height, rawBlk, &ci, nil
return height, rawBlk, &ci, peer, nil
}
return 0, nil, nil, ErrBlkNotFound
return 0, nil, nil, "", ErrBlkNotFound
}

func requestBlockHeight(ctx context.Context, host host.Host, peer peer.ID,
Expand Down Expand Up @@ -397,23 +411,27 @@ func (n *Node) getBlkHeight(ctx context.Context, height int64) (types.Hash, []by
}

func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Logger) (types.Hash, []byte, *types.CommitInfo, int64, error) {
peers := peerHosts(host)
if len(peers) == 0 {
return types.Hash{}, nil, nil, 0, errors.New("no peers to request block from")
availablePeers := peerHosts(host)
if len(availablePeers) == 0 {
return types.Hash{}, nil, nil, 0, types.ErrPeersNotFound
}

// min: 1, max: 10
cnt := max(len(peers)/5, 1) // 20% of peers // TODO: Not sure what the correct percentage is here
cnt := max(len(availablePeers)/5, 1) // 20% of peers
availablePeers = availablePeers[:cnt]
// incremented when a peer's best height is one less than the requested height
// to help determine if the block has not been committed yet and stop
// requesting the block from other peers if enough peers indicate that the
// block is not available.
bestHCnt := 0
peers = peers[:cnt]
var bestHeight int64

for _, peer := range peers {
if bestHCnt == 5 { // TODO: Not sure what the correct break condition is here
// essentially, we are trying to break from the loop if the blk is not made yet.
for _, peer := range availablePeers {
if bestHCnt == 5 {
// stop requesting the block if there is an indication that
// the block is not made yet.
break
}
log.Infof("requesting block number %d from %v", height, peer)

t0 := time.Now()
resp, err := requestBlockHeight(ctx, host, peer, height, blkReadLimit)
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrBlkNotFound) {
Expand All @@ -426,7 +444,7 @@ func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Log
if theirBest == height-1 {
bestHCnt++
}
log.Infof("block %d not found on peer; their best height is %d", height, theirBest)
log.Infof("block %d not found on peer %s; their best height is %d", height, peer, theirBest)
} else {
log.Warnf("block not available on %v", peer)
}
Expand Down
21 changes: 15 additions & 6 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error {
return fmt.Errorf("block size %d exceeds max block size %d", blockTxnsSize, maxBlockSize)
}

// maxVotesPerTx
// Ensure that the number of event and resolution IDs within validator vote transactions votes
// per transaction does not exceed the max consensus limit.
maxVotesPerTx := ce.ConsensusParams().MaxVotesPerTx
for _, txn := range blk.Txns {
if txn.Body.PayloadType == ktypes.PayloadTypeValidatorVoteBodies {
Expand Down Expand Up @@ -133,11 +134,15 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transacti
txHash := types.HashBytes(rawTx)

// add the transaction to the mempool
ce.mempool.Store(txHash, tx)
have, rejected := ce.mempool.Store(txHash, tx)
if rejected {
return &ktypes.ResultBroadcastTx{
Hash: txHash,
}, types.ErrMempoolFull
}

// Announce the transaction to the network
if ce.txAnnouncer != nil {
ce.log.Debugf("broadcasting new tx %v", txHash)
// Announce the transaction to the network only if not previously announced
if ce.txAnnouncer != nil && !have {
// We can't use parent context 'cause it's canceled in the caller, which
// could be the RPC request. handler. This shouldn't be CE's problem...
go ce.txAnnouncer(context.Background(), txHash, rawTx)
Expand Down Expand Up @@ -260,7 +265,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempool.RecheckTxs(ctx, ce.recheckTx)

ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(),
"appHash", appHash.String(), "updates", ce.state.blockRes.paramUpdates)
"appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns)

// update and reset the state fields
ce.nextState()
Expand Down Expand Up @@ -297,6 +302,8 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error {
}

ce.resetState()
ce.stateInfo.hasBlock.Store(ce.state.lc.height)

return nil
}

Expand All @@ -318,6 +325,8 @@ func (ce *ConsensusEngine) resetState() {
ce.stateInfo.lastCommit = *ce.state.lc
ce.stateInfo.mtx.Unlock()

ce.stateInfo.hasBlock.Store(ce.state.lc.height)

ce.cancelFnMtx.Lock()
ce.blkExecCancelFn = nil
ce.longRunningTxs = make([]ktypes.Hash, 0)
Expand Down
14 changes: 7 additions & 7 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context, requester
if errors.Is(err, context.Canceled) {
return err
}
if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) {
if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrPeersNotFound) {
break // no peers have this block, assume block sync is complete, continue with consensus
}
ce.log.Warn("unexpected error requesting block from the network", "height", height, "error", err)
Expand Down Expand Up @@ -130,25 +130,25 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh
// syncBlockWithRetry fetches the specified block from the network and keeps retrying until
// the block is successfully retrieved from the network.
func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error {
_, rawBlk, ci, err := ce.getBlock(ctx, height)
blkID, rawBlk, ci, err := ce.getBlock(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawBlk, ci)
return ce.applyBlock(ctx, rawBlk, ci, blkID)
}

// syncBlock fetches the specified block from the network
func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error {
_, rawblk, ci, _, err := ce.blkRequester(ctx, height)
blkID, rawblk, ci, _, err := ce.blkRequester(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block from the network: %w", err)
}

return ce.applyBlock(ctx, rawblk, ci)
return ce.applyBlock(ctx, rawblk, ci, blkID)
}

func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo) error {
func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *types.CommitInfo, blkID types.Hash) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

Expand All @@ -157,7 +157,7 @@ func (ce *ConsensusEngine) applyBlock(ctx context.Context, rawBlk []byte, ci *ty
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
if err := ce.processAndCommit(ctx, blk, ci, blkID); err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}

Expand Down
8 changes: 5 additions & 3 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons

case *blockAnnounce:
preRole := ce.role.Load()
if err := ce.commitBlock(ctx, v.blk, v.ci); err != nil {
if err := ce.commitBlock(ctx, v.blk, v.ci, v.blkID, v.done); err != nil {
ce.log.Error("Error processing committed block announcement", "error", err)
return
}
Expand Down Expand Up @@ -809,6 +809,8 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, appHash []byte, blk *
blkHash: blkHash,
}
copy(ce.stateInfo.lastCommit.appHash[:], appHash)

ce.stateInfo.hasBlock.Store(height)
}

// replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet.
Expand All @@ -832,7 +834,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight
return nil // no more blocks to replay
}

err = ce.processAndCommit(ctx, blk, ci)
err = ce.processAndCommit(ctx, blk, ci, blk.Hash())
if err != nil {
return fmt.Errorf("failed replaying block: %w", err)
}
Expand Down Expand Up @@ -955,7 +957,7 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
if err := ce.processAndCommit(ctx, blk, ci, blkHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}
// recovered to the correct block -> continue to replay blocks from network
Expand Down
Loading

0 comments on commit a43523e

Please sign in to comment.