Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CE clean shutdown fixes #1130

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions node/consensus/block_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ func (ce *ConsensusEngine) validateBlock(blk *types.Block) error {
// executeBlock executes all the transactions in the block under a single pg consensus transaction,
// enforcing the atomicity of the block execution. It also calculates the appHash for the block and
// precommits the changeset to the pg database.
func (ce *ConsensusEngine) executeBlock() (err error) {
func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
defer func() {
ce.stateInfo.mtx.Lock()
ce.stateInfo.status = Executed
ce.stateInfo.mtx.Unlock()
}()

ctx := context.Background() // TODO: Use block context with the chain params and stuff.

blkProp := ce.state.blkProp

// Begin the block execution session
Expand Down Expand Up @@ -258,11 +256,10 @@ func validatorUpdatesHash(updates map[string]*ktypes.Validator) types.Hash {

// Commit method commits the block to the blockstore and postgres database.
// It also updates the txIndexer and mempool with the transactions in the block.
func (ce *ConsensusEngine) commit() error {
func (ce *ConsensusEngine) commit(ctx context.Context) error {
// TODO: Lock mempool and update the mempool to remove the transactions in the block
// Mempool should not receive any new transactions until this Commit is done as
// we are updating the state and the tx checks should be done against the new state.
ctx := context.Background()
blkProp := ce.state.blkProp
height, appHash := ce.state.blkProp.height, ce.state.blockRes.appHash

Expand All @@ -280,13 +277,17 @@ func (ce *ConsensusEngine) commit() error {
}

// Update the chain meta store with the new height and the dirty
// we need to re-open a new transaction just to write the apphash
// TODO: it would be great to have a way to commit the apphash without
// opening a new transaction. This could leave us in a state where data is
// committed but the apphash is not, which would essentially nuke the chain.
ctxS := context.Background()
tx, err := ce.db.BeginTx(ctxS)
tx, err := ce.db.BeginTx(ctxS) // badly timed shutdown MUST NOT cancel now, we need consistency with consensus tx commit
if err != nil {
return err
}

if err := meta.SetChainState(ctx, tx, height, appHash[:], false); err != nil {
if err := meta.SetChainState(ctxS, tx, height, appHash[:], false); err != nil {
err2 := tx.Rollback(ctxS)
if err2 != nil {
ce.log.Error("Failed to rollback the transaction", "err", err2)
Expand Down
4 changes: 2 additions & 2 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (ce *ConsensusEngine) replayBlockFromNetwork(ctx context.Context) error {
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh
return fmt.Errorf("failed to decode block: %w", err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return err
}

Expand Down
73 changes: 60 additions & 13 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type ConsensusEngine struct {
chainCtx *common.ChainContext

// Channels
newRound chan struct{}
msgChan chan consensusMessage
haltChan chan struct{} // can take a msg or reason for halting the network
resetChan chan int64 // to reset the state of the consensus engine
Expand All @@ -85,6 +86,9 @@ type ConsensusEngine struct {
blkRequester BlkRequester
rstStateBroadcaster ResetStateBroadcaster
discoveryReqBroadcaster DiscoveryReqBroadcaster

// waitgroup to track all the consensus goroutines
wg sync.WaitGroup
}

// ProposalBroadcaster broadcasts the new block proposal message to the network
Expand Down Expand Up @@ -299,6 +303,7 @@ func New(cfg *Config) *ConsensusEngine {
haltChan: make(chan struct{}, 1),
resetChan: make(chan int64, 1),
bestHeightCh: make(chan *discoveryMsg, 1),
newRound: make(chan struct{}, 1),
// interfaces
mempool: cfg.Mempool,
blockStore: cfg.BlockStore,
Expand Down Expand Up @@ -329,17 +334,50 @@ func (ce *ConsensusEngine) Start(ctx context.Context, proposerBroadcaster Propos
ce.discoveryReqBroadcaster = discoveryReqBroadcaster

ce.log.Info("Starting the consensus engine")
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Fast catchup the node with the network height
if err := ce.catchup(ctx); err != nil {
return fmt.Errorf("error catching up: %w", err)
}

// start mining
ce.startMining(ctx)
ce.wg.Add(1)
go func() {
defer ce.wg.Done()

ce.startMining(ctx)
}()

// start the event loop
return ce.runConsensusEventLoop(ctx)
ce.wg.Add(1)
go func() {
defer ce.wg.Done()
defer cancel() // stop CE in case event loop terminated early e.g. halt

ce.runConsensusEventLoop(ctx)
ce.log.Info("Consensus event loop stopped...")
}()

ce.wg.Wait()

ce.close()
ce.log.Info("Consensus engine stopped")
return nil
}

func (ce *ConsensusEngine) close() {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

if ce.state.consensusTx != nil {
ce.log.Info("Rolling back the consensus tx")
err := ce.state.consensusTx.Rollback(context.Background())
if err != nil {
ce.log.Error("Error rolling back the consensus tx", "error", err)
}
}
}

// GenesisInit initializes the node with the genesis state. This included initializing the
Expand Down Expand Up @@ -404,6 +442,7 @@ func (ce *ConsensusEngine) GenesisInit(ctx context.Context) error {
// catchup with the network and reannounce the messages.
func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
// TODO: make these configurable?
ce.log.Info("Starting the consensus event loop...")
catchUpTicker := time.NewTicker(5 * time.Second)
reannounceTicker := time.NewTicker(3 * time.Second)
blkPropTicker := time.NewTicker(1 * time.Second)
Expand All @@ -420,6 +459,12 @@ func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
ce.log.Error("Received halt signal, stopping the consensus engine")
return nil

case <-ce.newRound:
if err := ce.startNewRound(ctx); err != nil {
ce.log.Error("Error starting a new round", "error", err)
return err
}

case <-catchUpTicker.C:
err := ce.doCatchup(ctx) // better name??
if err != nil {
Expand All @@ -430,7 +475,7 @@ func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
ce.reannounceMsgs(ctx)

case height := <-ce.resetChan:
ce.resetBlockProp(height)
ce.resetBlockProp(ctx, height)

case m := <-ce.msgChan:
ce.handleConsensusMessages(ctx, m)
Expand All @@ -442,15 +487,17 @@ func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
}

// startMining starts the mining process based on the role of the node.
func (ce *ConsensusEngine) startMining(ctx context.Context) {
func (ce *ConsensusEngine) startMining(_ context.Context) error {
// Start the mining process if the node is a leader
// validators and sentry nodes get activated when they receive a block proposal or block announce msgs.
if ce.role.Load() == types.RoleLeader {
ce.log.Infof("Starting the leader node")
go ce.startNewRound(ctx)
ce.newRound <- struct{}{}
} else {
ce.log.Infof("Starting the validator/sentry node")
}

return nil
}

// handleConsensusMessages handles the consensus messages based on the message type.
Expand All @@ -471,7 +518,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons
}

case *blockAnnounce:
if err := ce.commitBlock(v.blk, v.appHash); err != nil {
if err := ce.commitBlock(ctx, v.blk, v.appHash); err != nil {
ce.log.Error("Error processing committing block", "error", err)
return
}
Expand All @@ -483,7 +530,7 @@ func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg cons
}

// resetBlockProp aborts the block execution and resets the state to the last committed block.
func (ce *ConsensusEngine) resetBlockProp(height int64) {
func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64) {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

Expand All @@ -497,7 +544,7 @@ func (ce *ConsensusEngine) resetBlockProp(height int64) {
if ce.state.lc.height == height {
if ce.state.blkProp != nil {
ce.log.Info("Resetting the block proposal", "height", height)
if err := ce.resetState(context.Background()); err != nil {
if err := ce.resetState(ctx); err != nil {
ce.log.Error("Error resetting the state", "error", err) // panic? or consensus error?
}
}
Expand Down Expand Up @@ -558,7 +605,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {

// Replay the blocks from the blockstore if the app hasn't played all the blocks yet.
if appHeight < storeHeight {
if err := ce.replayFromBlockStore(appHeight+1, storeHeight); err != nil {
if err := ce.replayFromBlockStore(ctx, appHeight+1, storeHeight); err != nil {
return err
}
}
Expand Down Expand Up @@ -587,7 +634,7 @@ func (ce *ConsensusEngine) setLastCommitInfo(height int64, blkHash types.Hash, a
}

// replayBlocks replays all the blocks from the blockstore if the app hasn't played all the blocks yet.
func (ce *ConsensusEngine) replayFromBlockStore(startHeight, bestHeight int64) error {
func (ce *ConsensusEngine) replayFromBlockStore(ctx context.Context, startHeight, bestHeight int64) error {
height := startHeight
t0 := time.Now()

Expand All @@ -604,7 +651,7 @@ func (ce *ConsensusEngine) replayFromBlockStore(startHeight, bestHeight int64) e
return nil // no more blocks to replay
}

err = ce.processAndCommit(blk, appHash)
err = ce.processAndCommit(ctx, blk, appHash)
if err != nil {
return fmt.Errorf("failed replaying block: %w", err)
}
Expand Down Expand Up @@ -685,13 +732,13 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(blk, appHash); err != nil {
if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
}
} else {
if appHash == ce.state.blockRes.appHash {
// commit the block
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
return fmt.Errorf("failed to commit the block: height: %d, error: %w", ce.state.blkProp.height, err)
}

Expand Down
20 changes: 10 additions & 10 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.blkProp = blkPropMsg
ce.stateInfo.mtx.Unlock()

if err := ce.executeBlock(); err != nil {
if err := ce.executeBlock(ctx); err != nil {
ce.log.Error("Error executing block, sending NACK", "error", err)
go ce.ackBroadcaster(false, blkPropMsg.height, blkPropMsg.blkHash, nil)
return err
Expand All @@ -201,7 +201,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *types.Block, appHash types.Hash) error {
if ce.role.Load() == types.RoleLeader {
return nil
}
Expand All @@ -217,20 +217,20 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err
// - Incorrect AppHash: Halt the node.

if ce.role.Load() == types.RoleSentry {
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

// You are a validator
if ce.state.blkProp == nil {
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

if ce.state.blkProp.blkHash != blk.Header.Hash() {
// ce.state.cancelFunc() // abort the current block execution ??
if err := ce.resetState(context.Background()); err != nil {
if err := ce.resetState(ctx); err != nil {
ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blkID", blk.Header.Hash(), "error", err)
}
return ce.processAndCommit(blk, appHash)
return ce.processAndCommit(ctx, blk, appHash)
}

if ce.state.blockRes == nil {
Expand All @@ -245,7 +245,7 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err
}

// Commit the block
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
ce.log.Errorf("Error committing block: height: %d, error: %v", blk.Header.Height, err)
return err
}
Expand All @@ -257,7 +257,7 @@ func (ce *ConsensusEngine) commitBlock(blk *types.Block, appHash types.Hash) err

// processAndCommit: used by the sentry nodes and slow validators to process and commit the block.
// This is used when the acks are not required to be sent back to the leader, essentially in catchup mode.
func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash) error {
func (ce *ConsensusEngine) processAndCommit(ctx context.Context, blk *types.Block, appHash types.Hash) error {
ce.log.Info("Processing committed block", "height", blk.Header.Height, "hash", blk.Header.Hash(), "appHash", appHash)
if err := ce.validateBlock(blk); err != nil {
ce.log.Errorf("Error validating block: %v", err)
Expand All @@ -276,7 +276,7 @@ func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash
ce.stateInfo.blkProp = ce.state.blkProp
ce.stateInfo.mtx.Unlock()

if err := ce.executeBlock(); err != nil {
if err := ce.executeBlock(ctx); err != nil {
ce.log.Errorf("Error executing block: %v", err)
return err
}
Expand All @@ -288,7 +288,7 @@ func (ce *ConsensusEngine) processAndCommit(blk *types.Block, appHash types.Hash
}

// Commit the block if the appHash is valid
if err := ce.commit(); err != nil {
if err := ce.commit(ctx); err != nil {
ce.log.Errorf("Error committing block: %v", err)
return err
}
Expand Down
Loading
Loading