Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CE clean shutdown fixes #1130

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions node/consensus/block_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ func (ce *ConsensusEngine) validateBlock(blk *types.Block) error {
// executeBlock executes all the transactions in the block under a single pg consensus transaction,
// enforcing the atomicity of the block execution. It also calculates the appHash for the block and
// precommits the changeset to the pg database.
func (ce *ConsensusEngine) executeBlock() (err error) {
func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
defer func() {
ce.stateInfo.mtx.Lock()
ce.stateInfo.status = Executed
ce.stateInfo.mtx.Unlock()
}()

ctx := context.Background() // TODO: Use block context with the chain params and stuff.

blkProp := ce.state.blkProp

// Begin the block execution session
Expand Down Expand Up @@ -258,11 +256,10 @@ func validatorUpdatesHash(updates map[string]*ktypes.Validator) types.Hash {

// Commit method commits the block to the blockstore and postgres database.
// It also updates the txIndexer and mempool with the transactions in the block.
func (ce *ConsensusEngine) commit() error {
func (ce *ConsensusEngine) commit(ctx context.Context) error {
// TODO: Lock mempool and update the mempool to remove the transactions in the block
// Mempool should not receive any new transactions until this Commit is done as
// we are updating the state and the tx checks should be done against the new state.
ctx := context.Background()
blkProp := ce.state.blkProp
height, appHash := ce.state.blkProp.height, ce.state.blockRes.appHash

Expand All @@ -280,13 +277,17 @@ func (ce *ConsensusEngine) commit() error {
}

// Update the chain meta store with the new height and the dirty
// we need to re-open a new transaction just to write the apphash
// TODO: it would be great to have a way to commit the apphash without
// opening a new transaction. This could leave us in a state where data is
// committed but the apphash is not, which would essentially nuke the chain.
ctxS := context.Background()
tx, err := ce.db.BeginTx(ctxS)
tx, err := ce.db.BeginTx(ctxS) // badly timed shutdown MUST NOT cancel now, we need consistency with consensus tx commit
if err != nil {
return err
}

if err := meta.SetChainState(ctx, tx, height, appHash[:], false); err != nil {
if err := meta.SetChainState(ctxS, tx, height, appHash[:], false); err != nil {
err2 := tx.Rollback(ctxS)
if err2 != nil {
ce.log.Error("Failed to rollback the transaction", "err", err2)
Expand Down
4 changes: 2 additions & 2 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context) error {
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return err
}

Expand Down
49 changes: 39 additions & 10 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type ConsensusEngine struct {
blkRequester BlkRequester
rstStateBroadcaster ResetStateBroadcaster
discoveryReqBroadcaster DiscoveryReqBroadcaster

// waitgroup to track all the consensus goroutines
wg sync.WaitGroup
}

// ProposalBroadcaster broadcasts the new block proposal message to the network
Expand Down Expand Up @@ -339,7 +342,26 @@ func (ce *ConsensusEngine) Start(ctx context.Context, proposerBroadcaster Propos
ce.startMining(ctx)

// start the event loop
return ce.runConsensusEventLoop(ctx)
ce.wg.Add(1)
go ce.runConsensusEventLoop(ctx)

ce.wg.Wait()
ce.close()
ce.log.Info("Consensus engine stopped")
return nil
}

func (ce *ConsensusEngine) close() {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

if ce.state.consensusTx != nil {
ce.log.Info("Rolling back the consensus tx")
err := ce.state.consensusTx.Rollback(context.Background())
if err != nil {
ce.log.Error("Error rolling back the consensus tx", "error", err)
}
}
}

// GenesisInit initializes the node with the genesis state. This included initializing the
Expand Down Expand Up @@ -403,7 +425,13 @@ func (ce *ConsensusEngine) GenesisInit(ctx context.Context) error {
// Apart from the above events, the node also periodically checks if it needs to
// catchup with the network and reannounce the messages.
func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
defer func() {
ce.log.Info("Consensus event loop stopped...")
ce.wg.Done()
}()

// TODO: make these configurable?
ce.log.Info("Starting the consensus event loop...")
catchUpTicker := time.NewTicker(5 * time.Second)
reannounceTicker := time.NewTicker(3 * time.Second)
blkPropTicker := time.NewTicker(1 * time.Second)
Expand All @@ -430,7 +458,7 @@ func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
ce.reannounceMsgs(ctx)

case height := <-ce.resetChan:
ce.resetBlockProp(height)
ce.resetBlockProp(ctx, height)

case m := <-ce.msgChan:
ce.handleConsensusMessages(ctx, m)
Expand All @@ -447,6 +475,7 @@ func (ce *ConsensusEngine) startMining(ctx context.Context) {
// validators and sentry nodes get activated when they receive a block proposal or block announce msgs.
if ce.role.Load() == types.RoleLeader {
ce.log.Infof("Starting the leader node")
ce.wg.Add(1)
go ce.startNewRound(ctx)
} else {
ce.log.Infof("Starting the validator/sentry node")
Expand All @@ -471,7 +500,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons
}

case *blockAnnounce:
if err := ce.commitBlock(v.blk, v.appHash); err != nil {
if err := ce.commitBlock(ctx, v.blk, v.appHash); err != nil {
ce.log.Error("Error processing committing block", "error", err)
return
}
Expand All @@ -483,7 +512,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons
}

// resetBlockProp aborts the block execution and resets the state to the last committed block.
func (ce *ConsensusEngine) resetBlockProp(height int64) {
func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64) {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

Expand All @@ -497,7 +526,7 @@ func (ce *ConsensusEngine) resetBlockProp(height int64) {
if ce.state.lc.height == height {
if ce.state.blkProp != nil {
ce.log.Info("Resetting the block proposal", "height", height)
if err := ce.resetState(context.Background()); err != nil {
if err := ce.resetState(ctx); err != nil {
ce.log.Error("Error resetting the state", "error", err) // panic? or consensus error?
}
}
Expand Down Expand Up @@ -558,7 +587,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {

// Replay the blocks from the blockstore if the app hasn't played all the blocks yet.
if appHeight < storeHeight {
if err := ce.replayFromBlockStore(appHeight+1, storeHeight); err != nil {
if err := ce.replayFromBlockStore(ctx, appHeight+1, storeHeight); err != nil {
return err
}
}
Expand Down Expand Up @@ -587,7 +616,7 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, blkHash types.Hash, a
}

// replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet.
func (ce *ConsensusEngine) replayFromBlockStore(startHeight, bestHeight int64) error {
func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight, bestHeight int64) error {
height := startHeight
t0 := time.Now()

Expand All @@ -604,7 +633,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(startHeight, bestHeight int64) e
return nil // no more blocks to replay
}

err = ce.processAndCommit(blk, appHash)
err = ce.processAndCommit(ctx, blk, appHash)
if err != nil {
return fmt.Errorf("failed replaying block: %w", err)
}
Expand Down Expand Up @@ -685,13 +714,13 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
}
} else {
if appHash == ce.state.blockRes.appHash {
// commit the block
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
return fmt.Errorf("failed to commit the block: height: %d, error: %w", ce.state.blkProp.height, err)
}

Expand Down
20 changes: 10 additions & 10 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.blkProp = blkPropMsg
ce.stateInfo.mtx.Unlock()

if err := ce.executeBlock(); err != nil {
if err := ce.executeBlock(ctx); err != nil {
ce.log.Error("Error executing block, sending NACK", "error", err)
go ce.ackBroadcaster(false, blkPropMsg.height, blkPropMsg.blkHash, nil)
return err
Expand All @@ -201,7 +201,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *types.Block, appHash types.Hash) error {
if ce.role.Load() == types.RoleLeader {
return nil
}
Expand All @@ -217,20 +217,20 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err
// - Incorrect AppHash: Halt the node.

if ce.role.Load() == types.RoleSentry {
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

// You are a validator
if ce.state.blkProp == nil {
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

if ce.state.blkProp.blkHash != blk.Header.Hash() {
// ce.state.cancelFunc() // abort the current block execution ??
if err := ce.resetState(context.Background()); err != nil {
if err := ce.resetState(ctx); err != nil {
ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blkID", blk.Header.Hash(), "error", err)
}
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

if ce.state.blockRes == nil {
Expand All @@ -245,7 +245,7 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err
}

// Commit the block
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
ce.log.Errorf("Error committing block: height: %d, error: %v", blk.Header.Height, err)
return err
}
Expand All @@ -257,7 +257,7 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err

// processAndCommit: used by the sentry nodes and slow validators to process and commit the block.
// This is used when the acks are not required to be sent back to the leader, essentially in catchup mode.
func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash) error {
func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *types.Block, appHash types.Hash) error {
ce.log.Info("Processing committed block", "height", blk.Header.Height, "hash", blk.Header.Hash(), "appHash", appHash)
if err := ce.validateBlock(blk); err != nil {
ce.log.Errorf("Error validating block: %v", err)
Expand All @@ -276,7 +276,7 @@ func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash
ce.stateInfo.blkProp = ce.state.blkProp
ce.stateInfo.mtx.Unlock()

if err := ce.executeBlock(); err != nil {
if err := ce.executeBlock(ctx); err != nil {
ce.log.Errorf("Error executing block: %v", err)
return err
}
Expand All @@ -288,7 +288,7 @@ func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash
}

// Commit the block if the appHash is valid
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
ce.log.Errorf("Error committing block: %v", err)
return err
}
Expand Down
9 changes: 7 additions & 2 deletions node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var lastReset int64 = 0

// startNewRound starts a new round of consensus process.
func (ce *ConsensusEngine) startNewRound(ctx context.Context) error {
defer ce.wg.Done()

ce.log.Info("Starting a new round", "height", ce.state.lc.height+1)
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()
Expand Down Expand Up @@ -55,7 +57,7 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error {
ce.stateInfo.mtx.Unlock()

// Execute the block and generate the appHash
if err := ce.executeBlock(); err != nil {
if err := ce.executeBlock(ctx); err != nil {
ce.log.Errorf("Error executing the block: %v", err)
return err
}
Expand All @@ -77,6 +79,8 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error {
return err
}
go ce.rstStateBroadcaster(ce.state.lc.height)

ce.wg.Add(1)
go ce.startNewRound(ctx)
return nil
}
Expand Down Expand Up @@ -171,7 +175,7 @@ func (ce *ConsensusEngine) processVotes(ctx context.Context) error {
ce.log.Info("Majority of the validators have accepted the block, proceeding to commit the block",
"height", ce.state.blkProp.blk.Header.Height, "hash", ce.state.blkProp.blkHash, "acks", acks, "nacks", nacks)
// Commit the block and broadcast the blockAnn message
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
ce.log.Errorf("Error committing the block (process votes): %v", err)
return err
}
Expand All @@ -183,6 +187,7 @@ func (ce *ConsensusEngine) processVotes(ctx context.Context) error {
// start the next round
ce.nextState()

ce.wg.Add(1)
go func() { // must not sleep with ce.state mutex locked
// Wait for the timeout to start the next round
select {
Expand Down
Loading