Skip to content

Commit

Permalink
Rollback support to preemptively cancel and rollback current block ex…
Browse files Browse the repository at this point in the history
…ecution
  • Loading branch information
charithabandi committed Dec 12, 2024
1 parent 684c2ef commit c2a4406
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 67 deletions.
10 changes: 10 additions & 0 deletions core/types/messages.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package types

import "time"

// This file contains the messages exchanged between the consensus engine and the block processor.

type BlockExecRequest struct {
Expand Down Expand Up @@ -40,3 +42,11 @@ type ConsensusParams struct {
// single transaction.
MaxVotesPerTx int64
}

type BlockExecutionStatus struct {
StartTime time.Time
EndTime time.Time
Height int64
TxIDs []Hash
TxStatus map[string]bool
}
7 changes: 7 additions & 0 deletions node/accounts/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,10 @@ func (a *Accounts) updateAccount(ctx context.Context, tx sql.Executor, account [
}
return nil
}

func (a *Accounts) Rollback() {
a.mtx.Lock()
defer a.mtx.Unlock()

a.updates = make(map[string]*types.Account)
}
1 change: 1 addition & 0 deletions node/block_processor/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TxApp interface {
Execute(ctx *common.TxContext, db sql.DB, tx *ktypes.Transaction) *txapp.TxResponse
Finalize(ctx context.Context, db sql.DB, block *common.BlockContext) (finalValidators []*ktypes.Validator, err error)
Commit() error
Rollback()
GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error
ApplyMempool(ctx *common.TxContext, db sql.DB, tx *types.Transaction) error

Expand Down
48 changes: 41 additions & 7 deletions node/block_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type BlockProcessor struct {
height int64
chainCtx *common.ChainContext

status *blockExecStatus
statusMu sync.RWMutex // very granular mutex to protect access to the block execution status

// consensus TX
consensusTx sql.PreparedTx

Expand Down Expand Up @@ -130,7 +133,23 @@ func (bp *BlockProcessor) Rollback(ctx context.Context, height int64, appHash kt
// set the block proposer back to it's previous state
bp.height = height
bp.appHash = appHash
// TODO: how about validatorset and consensus params? rethink rollback

readTx, err := bp.db.BeginReadTx(ctx)
if err != nil {
return fmt.Errorf("failed to begin read transaction: %w", err)
}
defer readTx.Rollback(ctx)

networkParams, err := meta.LoadParams(ctx, readTx)
if err != nil {
return fmt.Errorf("failed to load the network parameters: %w", err)
}

bp.chainCtx.NetworkParameters = networkParams
// how about updates to the migration params? do we need to rollback them as well?

// Rollback internal state updates to the validators, accounts and mempool.
bp.txapp.Rollback()

return nil
}
Expand Down Expand Up @@ -261,6 +280,7 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe

bp.consensusTx, err = bp.db.BeginPreparedTx(ctx)
if err != nil {
bp.consensusTx = nil // safety measure
return nil, fmt.Errorf("failed to begin the consensus transaction: %w", err)
}

Expand All @@ -272,6 +292,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}

txResults := make([]ktypes.TxResult, len(req.Block.Txns))

bp.initBlockExecutionStatus(req.Block)

for i, tx := range req.Block.Txns {
decodedTx := &ktypes.Transaction{}
if err := decodedTx.UnmarshalBinary(tx); err != nil {
Expand All @@ -297,15 +320,18 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}

select {
case <-ctx.Done(): // TODO: is this the best way to abort the block execution?
bp.log.Info("Block execution cancelled", "height", req.Height)
return nil, nil // TODO: or error? or trigger resetState?
case <-ctx.Done():
return nil, ctx.Err() // notify the caller about the context cancellation or deadline exceeded error
default:
res := bp.txapp.Execute(txCtx, bp.consensusTx, decodedTx)
txResult := ktypes.TxResult{
Code: uint32(res.ResponseCode),
Gas: res.Spend,
}

// bookkeeping for the block execution status
bp.updateBlockExecutionStatus(txHash)

if res.Error != nil {
if sql.IsFatalDBError(res.Error) {
return nil, fmt.Errorf("fatal db error during block execution: %w", res.Error)
Expand All @@ -321,6 +347,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}
}

// record the end time of the block execution
bp.recordBlockExecEndTime()

_, err = bp.txapp.Finalize(ctx, bp.consensusTx, blockCtx)
if err != nil {
return nil, fmt.Errorf("failed to finalize the block execution: %w", err)
Expand All @@ -330,11 +359,14 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
return nil, fmt.Errorf("failed to set the chain state: %w", err)
}

// TODO: update the consensus params in the meta store

// Create a new changeset processor
csp := newChangesetProcessor()
// "migrator" module subscribes to the changeset processor to store changesets during the migration
csErrChan := make(chan error, 1)
defer close(csErrChan)

// TODO: Subscribe to the changesets
go csp.BroadcastChangesets(ctx)

Expand All @@ -355,9 +387,6 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe

nextHash := bp.nextAppHash(bp.appHash, types.Hash(appHash), valUpdatesHash, accountsHash, txResultsHash)

bp.height = req.Height
bp.appHash = nextHash

bp.log.Info("Executed Block", "height", req.Height, "blkHash", req.BlockID, "appHash", nextHash)

return &ktypes.BlockExecResult{
Expand Down Expand Up @@ -413,6 +442,11 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest)
bp.log.Warn("Failed to create snapshot of the database", "err", err)
}

bp.clearBlockExecutionStatus() // TODO: not very sure where to clear this

bp.height = req.Height
copy(bp.appHash[:], req.AppHash[:])

bp.log.Info("Committed Block", "height", req.Height, "appHash", req.AppHash.String())
return nil
}
Expand Down
81 changes: 81 additions & 0 deletions node/block_processor/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package blockprocessor

import (
"slices"
"time"

ktypes "github.com/kwilteam/kwil-db/core/types"
)

type blockExecStatus struct {
startTime, endTime time.Time
height int64
txIDs []ktypes.Hash
txStatus map[string]bool
}

// Used by the rpc server to get the execution status of the block being processed.
// end_time is not set if the block is still being processed.
func (bp *BlockProcessor) BlockExecutionStatus() ktypes.BlockExecutionStatus {
bp.statusMu.RLock()
defer bp.statusMu.RUnlock()

if bp.status == nil {
return ktypes.BlockExecutionStatus{}
}

status := &ktypes.BlockExecutionStatus{
StartTime: bp.status.startTime,
EndTime: bp.status.endTime,
Height: bp.status.height,
TxIDs: slices.Clone(bp.status.txIDs),
TxStatus: make(map[string]bool),
}

for k, v := range bp.status.txStatus {
status.TxStatus[k] = v
}

return *status
}

func (bp *BlockProcessor) initBlockExecutionStatus(blk *ktypes.Block) {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

status := &blockExecStatus{
startTime: time.Now(),
height: blk.Header.Height,
txStatus: make(map[string]bool),
txIDs: make([]ktypes.Hash, len(blk.Txns)),
}

for i, tx := range blk.Txns {
txID := ktypes.HashBytes(tx)
status.txIDs[i] = txID
status.txStatus[txID.String()] = false // not needed, just for clarity
}

bp.status = status
}

func (bp *BlockProcessor) clearBlockExecutionStatus() {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status = nil
}

func (bp *BlockProcessor) updateBlockExecutionStatus(txID ktypes.Hash) {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status.txStatus[txID.String()] = true
}

func (bp *BlockProcessor) recordBlockExecEndTime() {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status.endTime = time.Now()
}
32 changes: 14 additions & 18 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempool.Remove(txHash)
}

// TODO: reapply existing transaction (checkTX)
// get all the transactions from mempool and recheck them, the transactions should be checked
// in the order of nonce (stable sort to maintain relative order)
// ce.blockProcessor.CheckTx(ctx, tx, true)
// recheck the transactions in the mempool
ce.mempool.RecheckTxs(ctx, ce.blockProcessor.CheckTx)

// update the role of the node based on the final validator set at the end of the commit.
ce.updateValidatorSetAndRole()
Expand All @@ -130,29 +128,24 @@ func (ce *ConsensusEngine) nextState() {
blk: ce.state.blkProp.blk,
}

ce.state.blkProp = nil
ce.state.blockRes = nil
ce.state.votes = make(map[string]*vote)
ce.state.consensusTx = nil

// update the stateInfo
ce.stateInfo.mtx.Lock()
ce.stateInfo.status = Committed
ce.stateInfo.blkProp = nil
ce.stateInfo.height = ce.state.lc.height
ce.stateInfo.mtx.Unlock()
ce.resetState()
}

func (ce *ConsensusEngine) resetState(ctx context.Context) error {
func (ce *ConsensusEngine) rollbackState(ctx context.Context) error {
// Revert back any state changes occurred due to the current block
if err := ce.blockProcessor.Rollback(ctx, ce.state.lc.height, ce.state.lc.appHash); err != nil {
return err
}

ce.resetState()

return nil
}

func (ce *ConsensusEngine) resetState() {
ce.state.blkProp = nil
ce.state.blockRes = nil
ce.state.votes = make(map[string]*vote)
ce.state.consensusTx = nil

// update the stateInfo
ce.stateInfo.mtx.Lock()
Expand All @@ -161,5 +154,8 @@ func (ce *ConsensusEngine) resetState(ctx context.Context) error {
ce.stateInfo.height = ce.state.lc.height
ce.stateInfo.mtx.Unlock()

return nil
ce.cancelFnMtx.Lock()
ce.blkExecCancelFn = nil
ce.longRunningTxs = make([]ktypes.Hash, 0)
ce.cancelFnMtx.Unlock()
}
37 changes: 25 additions & 12 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/kwilteam/kwil-db/node/meta"
"github.com/kwilteam/kwil-db/node/pg"
"github.com/kwilteam/kwil-db/node/types"
"github.com/kwilteam/kwil-db/node/types/sql"
)

const (
Expand Down Expand Up @@ -60,6 +59,13 @@ type ConsensusEngine struct {
// copy of the minimal state info for the p2p layer usage.
stateInfo StateInfo

cancelFnMtx sync.Mutex // protects blkExecCancelFn, longRunningTxs and numResets
blkExecCancelFn context.CancelFunc
// list of txs to be removed from the mempool
// only used by the leader and protected by the cancelFnMtx
longRunningTxs []ktypes.Hash
numResets int64

// Channels
newRound chan struct{}
msgChan chan consensusMessage
Expand Down Expand Up @@ -161,8 +167,6 @@ type StateInfo struct {
type state struct {
mtx sync.RWMutex

consensusTx sql.PreparedTx

blkProp *blockProposal
blockRes *blockResult
lc *lastCommit
Expand Down Expand Up @@ -626,7 +630,7 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
}

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.resetState(ctx); err != nil {
if err := ce.rollbackState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

Expand Down Expand Up @@ -672,20 +676,29 @@ func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64) {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

// If we are currently executing any transactions corresponding to the blk at height +1
// 1. Cancel the execution context -> so that the transactions stop
// 2. Rollback the consensus tx
// 3. Reset the blkProp and blockRes
// 4. This should never happen after the commit phase, (blk should have never made it to the blockstore)

// We will only honor the reset request if it's from the leader (already verified by now)
// and the height is same as the last committed block height and the
// block is still executing or waiting for the block commit message.
ce.log.Info("Reset msg: ", "height", height)
if ce.state.lc.height == height {
if ce.state.blkProp != nil {
ce.log.Info("Resetting the block proposal", "height", height)
if err := ce.resetState(ctx); err != nil {
ce.log.Error("Error resetting the state", "error", err) // panic? or consensus error?
// cancel the context
ce.cancelFnMtx.Lock()
if ce.blkExecCancelFn != nil {
ce.blkExecCancelFn()
}
ce.cancelFnMtx.Unlock()

if err := ce.rollbackState(ctx); err != nil {
ce.log.Error("Error rolling back the state", "error", err)
}

} else {
ce.log.Info("Block already committed or executed, nothing to reset", "height", height)
}
} else {
ce.log.Warn("Invalid reset request", "height", height, "lastCommittedHeight", ce.state.lc.height)
}
}

Expand Down
3 changes: 3 additions & 0 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,9 @@ func (d *dummyTxApp) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transact
func (d *dummyTxApp) Commit() error {
return nil
}

func (d *dummyTxApp) Rollback() {}

func (d *dummyTxApp) GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error {
return nil
}
Expand Down
Loading

0 comments on commit c2a4406

Please sign in to comment.