Skip to content

Commit

Permalink
Give CE access to release block fetcher lock upon commit
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 21, 2025
1 parent 2bc23cf commit 3cb5fc8
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 100 deletions.
2 changes: 1 addition & 1 deletion app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func buildNode(d *coreDependencies, mp *mempool.Mempool, bs *store.BlockStore,
failBuild(err, "failed to create node")
}

logger.Infof("This node is %s @ %s", node.ID(), node.Addrs())
logger.Infof("This node is %s @ %s, peerID: %s", node.ID(), node.Addrs(), p2p.Host().ID())
return node
}

Expand Down
72 changes: 45 additions & 27 deletions node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
// If we are a validator and this is the commit ann for a proposed block
// that we already started executing, consensus engine will handle it.
if !n.ce.AcceptCommit(height, blkHash, hdr, ci, sig) {
// this either means that the ce already has the block or it is not
// ready to accept it yet. In either case, we don't need to do anything
// here.
return
}

// Possibly ce will handle it regardless. For now, below is block store
// code like a sentry node might do.

need, done := n.bki.PreFetch(blkHash)
defer done()
if !need {
n.log.Debug("ALREADY HAVE OR FETCHING BLOCK")
return // we have or are currently fetching it, do nothing, assuming we have already re-announced
Expand All @@ -131,52 +133,65 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Debug("retrieving new block", "blockID", blkid)
t0 := time.Now()

peerID := s.Conn().RemotePeer()

// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
n.log.Warnf("announcer failed to provide %v, trying other peers", blkid)
n.log.Warnf("announcer failed to provide %v due to error: %w, trying other peers", blkid, err)
// Since we are aware, ask other peers. we could also put this in a goroutine
s.Close() // close the announcers stream first
var gotHeight int64
var gotCI *types.CommitInfo

gotHeight, rawBlk, gotCI, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
var id peer.ID
gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
if err != nil {
n.log.Errorf("unable to retrieve tx %v: %v", blkid, err)
done()
return
}
if gotHeight != height {
n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight)
done()
return
}
if gotCI != nil && gotCI.AppHash != ci.AppHash {
n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash)
done()
return
}
// Ensure that the peerID from which the block was downloaded is a valid one.
if id != "" {
n.log.Errorf("getblk response had unexpected peerID: %v", id)
}
peerID = id
}

n.log.Debugf("obtained content for block %q in %v", blkid, time.Since(t0))

blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
n.log.Infof("decodeBlock failed for %v: %v", blkid, err)
done()
return
}
if blk.Header.Height != height {
n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height)
done()
return
}
gotBlkHash := blk.Header.Hash()
if gotBlkHash != blkHash {
n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash)
done()
return
}

// re-announce
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, s.Conn().RemotePeer())
n.ce.NotifyBlockCommit(blk, ci, blkHash)
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID)
n.ce.NotifyBlockCommit(blk, ci, blkHash, done)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, s.Conn().RemotePeer(), reqMsg.LeaderSig) // re-announce with the leader's signature
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature
}()
}

Expand All @@ -200,6 +215,7 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in
if peerID == from {
continue
}

n.log.Debugf("advertising block %s (height %d / sz %d / updates %v) to peer %v",
blkHash, height, len(rawBlk), ci.ParamUpdates, peerID)
resID, err := blockAnnMsg{
Expand All @@ -224,12 +240,12 @@ func (n *Node) announceRawBlk(ctx context.Context, blkHash types.Hash, height in
}

func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDelay time.Duration,
maxAttempts int) (int64, []byte, *types.CommitInfo, error) {
maxAttempts int) (int64, []byte, *types.CommitInfo, peer.ID, error) {
var attempts int
for {
height, raw, ci, err := n.getBlk(ctx, blkHash)
height, raw, ci, peer, err := n.getBlk(ctx, blkHash)
if err == nil {
return height, raw, ci, nil
return height, raw, ci, peer, nil
}

n.log.Warnf("unable to retrieve block %v (%v), waiting to retry", blkHash, err)
Expand All @@ -241,14 +257,13 @@ func (n *Node) getBlkWithRetry(ctx context.Context, blkHash types.Hash, baseDela
baseDelay *= 2
attempts++
if attempts >= maxAttempts {
return 0, nil, nil, ErrBlkNotFound
return 0, nil, nil, "", ErrBlkNotFound
}
}
}

func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, error) {
func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *types.CommitInfo, peer.ID, error) {
for _, peer := range n.peers() {
n.log.Infof("requesting block %v from %v", blkHash, peer)
t0 := time.Now()
resID, _ := blockHashReq{Hash: blkHash}.MarshalBinary()
resp, err := requestFrom(ctx, n.host, peer, resID, ProtocolIDBlock, blkReadLimit)
Expand All @@ -269,7 +284,6 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *
n.log.Info("block response too short", "peer", peer, "hash", blkHash)
continue
}

n.log.Debug("Obtained content for block", "block", blkHash, "elapsed", time.Since(t0))

rd := bytes.NewReader(resp)
Expand Down Expand Up @@ -297,9 +311,9 @@ func (n *Node) getBlk(ctx context.Context, blkHash types.Hash) (int64, []byte, *
continue
}

return height, rawBlk, &ci, nil
return height, rawBlk, &ci, peer, nil
}
return 0, nil, nil, ErrBlkNotFound
return 0, nil, nil, "", ErrBlkNotFound
}

func requestBlockHeight(ctx context.Context, host host.Host, peer peer.ID,
Expand Down Expand Up @@ -397,23 +411,27 @@ func (n *Node) getBlkHeight(ctx context.Context, height int64) (types.Hash, []by
}

func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Logger) (types.Hash, []byte, *types.CommitInfo, int64, error) {
peers := peerHosts(host)
if len(peers) == 0 {
return types.Hash{}, nil, nil, 0, errors.New("no peers to request block from")
availablePeers := peerHosts(host)
if len(availablePeers) == 0 {
return types.Hash{}, nil, nil, 0, types.ErrPeersNotFound
}

// min: 1, max: 10
cnt := max(len(peers)/5, 1) // 20% of peers // TODO: Not sure what the correct percentage is here
cnt := max(len(availablePeers)/5, 1) // 20% of peers
availablePeers = availablePeers[:cnt]
// incremented when a peer's best height is one less than the requested height
// to help determine if the block has not been committed yet and stop
// requesting the block from other peers if enough peers indicate that the
// block is not available.
bestHCnt := 0
peers = peers[:cnt]
var bestHeight int64

for _, peer := range peers {
if bestHCnt == 5 { // TODO: Not sure what the correct break condition is here
// essentially, we are trying to break from the loop if the blk is not made yet.
for _, peer := range availablePeers {
if bestHCnt == 5 {
// stop requesting the block if there is an indication that
// the block is not made yet.
break
}
log.Infof("requesting block number %d from %v", height, peer)

t0 := time.Now()
resp, err := requestBlockHeight(ctx, host, peer, height, blkReadLimit)
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrBlkNotFound) {
Expand All @@ -426,7 +444,7 @@ func getBlkHeight(ctx context.Context, height int64, host host.Host, log log.Log
if theirBest == height-1 {
bestHCnt++
}
log.Infof("block %d not found on peer; their best height is %d", height, theirBest)
log.Infof("block %d not found on peer %s; their best height is %d", height, peer, theirBest)
} else {
log.Warnf("block not available on %v", peer)
}
Expand Down
18 changes: 13 additions & 5 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,15 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transacti
txHash := types.HashBytes(rawTx)

// add the transaction to the mempool
ce.mempool.Store(txHash, tx)
have, rejected := ce.mempool.Store(txHash, tx)
if rejected {
return &ktypes.ResultBroadcastTx{
Hash: txHash,
}, types.ErrMempoolFull
}

// Announce the transaction to the network
if ce.txAnnouncer != nil {
ce.log.Debugf("broadcasting new tx %v", txHash)
// Announce the transaction to the network only if not previously announced
if ce.txAnnouncer != nil && !have {
// We can't use parent context 'cause it's canceled in the caller, which
// could be the RPC request. handler. This shouldn't be CE's problem...
go ce.txAnnouncer(context.Background(), txHash, rawTx)
Expand Down Expand Up @@ -260,7 +264,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempool.RecheckTxs(ctx, ce.recheckTx)

ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(),
"appHash", appHash.String(), "updates", ce.state.blockRes.paramUpdates)
"appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns, "updates", ce.state.blockRes.paramUpdates)

// update and reset the state fields
ce.nextState()
Expand Down Expand Up @@ -297,6 +301,8 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error {
}

ce.resetState()
ce.stateInfo.hasBlock.Store(ce.state.lc.height)

return nil
}

Expand All @@ -318,6 +324,8 @@ func (ce *ConsensusEngine) resetState() {
ce.stateInfo.lastCommit = *ce.state.lc
ce.stateInfo.mtx.Unlock()

ce.stateInfo.hasBlock.Store(ce.state.lc.height)

ce.cancelFnMtx.Lock()
ce.blkExecCancelFn = nil
ce.longRunningTxs = make([]ktypes.Hash, 0)
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context, requester
if errors.Is(err, context.Canceled) {
return err
}
if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) {
if errors.Is(err, types.ErrBlkNotFound) || errors.Is(err, types.ErrNotFound) || errors.Is(err, types.ErrPeersNotFound) {
break // no peers have this block, assume block sync is complete, continue with consensus
}
ce.log.Warn("unexpected error requesting block from the network", "height", height, "error", err)
Expand Down
4 changes: 3 additions & 1 deletion node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons

case *blockAnnounce:
preRole := ce.role.Load()
if err := ce.commitBlock(ctx, v.blk, v.ci); err != nil {
if err := ce.commitBlock(ctx, v.blk, v.ci, v.done); err != nil {
ce.log.Error("Error processing committed block announcement", "error", err)
return
}
Expand Down Expand Up @@ -809,6 +809,8 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, appHash []byte, blk *
blkHash: blkHash,
}
copy(ce.stateInfo.lastCommit.appHash[:], appHash)

ce.stateInfo.hasBlock.Store(height)
}

// replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet.
Expand Down
18 changes: 9 additions & 9 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commit",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp1.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash)
val.NotifyBlockCommit(blkProp1.blk, ci, blkProp1.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -311,7 +311,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "commit(InvalidAppHash)",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash)
val.NotifyBlockCommit(blkProp1.blk, &types.CommitInfo{AppHash: ktypes.Hash{}}, blkProp1.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
if val.lastCommitHeight() != 0 {
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -413,7 +413,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -494,7 +494,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestValidatorStateMachine(t *testing.T) {
name: "commitNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
ci := addVotes(t, blkProp2.blkHash, blockAppHash, leader, val)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash)
val.NotifyBlockCommit(blkProp2.blk, ci, blkProp2.blkHash, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand Down
Loading

0 comments on commit 3cb5fc8

Please sign in to comment.