Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback support to preemptively cancel and rollback current block ex… #1150

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/types/messages.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package types

import "time"

// This file contains the messages exchanged between the consensus engine and the block processor.

type BlockExecRequest struct {
Expand Down Expand Up @@ -40,3 +42,11 @@ type ConsensusParams struct {
// single transaction.
MaxVotesPerTx int64
}

type BlockExecutionStatus struct {
StartTime time.Time
EndTime time.Time
Height int64
TxIDs []Hash
TxStatus map[string]bool
}
7 changes: 7 additions & 0 deletions node/accounts/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,10 @@ func (a *Accounts) updateAccount(ctx context.Context, tx sql.Executor, account [
}
return nil
}

func (a *Accounts) Rollback() {
a.mtx.Lock()
defer a.mtx.Unlock()

a.updates = make(map[string]*types.Account)
}
1 change: 1 addition & 0 deletions node/block_processor/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TxApp interface {
Execute(ctx *common.TxContext, db sql.DB, tx *ktypes.Transaction) *txapp.TxResponse
Finalize(ctx context.Context, db sql.DB, block *common.BlockContext) (finalValidators []*ktypes.Validator, err error)
Commit() error
Rollback()
GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error
ApplyMempool(ctx *common.TxContext, db sql.DB, tx *types.Transaction) error

Expand Down
48 changes: 41 additions & 7 deletions node/block_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type BlockProcessor struct {
height int64
chainCtx *common.ChainContext

status *blockExecStatus
statusMu sync.RWMutex // very granular mutex to protect access to the block execution status

// consensus TX
consensusTx sql.PreparedTx

Expand Down Expand Up @@ -130,7 +133,23 @@ func (bp *BlockProcessor) Rollback(ctx context.Context, height int64, appHash kt
// set the block proposer back to it's previous state
bp.height = height
bp.appHash = appHash
// TODO: how about validatorset and consensus params? rethink rollback

readTx, err := bp.db.BeginReadTx(ctx)
if err != nil {
return fmt.Errorf("failed to begin read transaction: %w", err)
}
defer readTx.Rollback(ctx)

networkParams, err := meta.LoadParams(ctx, readTx)
if err != nil {
return fmt.Errorf("failed to load the network parameters: %w", err)
}

bp.chainCtx.NetworkParameters = networkParams
// how about updates to the migration params? do we need to rollback them as well?

// Rollback internal state updates to the validators, accounts and mempool.
bp.txapp.Rollback()

return nil
}
Expand Down Expand Up @@ -261,6 +280,7 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe

bp.consensusTx, err = bp.db.BeginPreparedTx(ctx)
if err != nil {
bp.consensusTx = nil // safety measure
return nil, fmt.Errorf("failed to begin the consensus transaction: %w", err)
}

Expand All @@ -272,6 +292,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}

txResults := make([]ktypes.TxResult, len(req.Block.Txns))

bp.initBlockExecutionStatus(req.Block)

for i, tx := range req.Block.Txns {
decodedTx := &ktypes.Transaction{}
if err := decodedTx.UnmarshalBinary(tx); err != nil {
Expand All @@ -297,15 +320,18 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}

select {
case <-ctx.Done(): // TODO: is this the best way to abort the block execution?
bp.log.Info("Block execution cancelled", "height", req.Height)
return nil, nil // TODO: or error? or trigger resetState?
case <-ctx.Done():
return nil, ctx.Err() // notify the caller about the context cancellation or deadline exceeded error
default:
res := bp.txapp.Execute(txCtx, bp.consensusTx, decodedTx)
txResult := ktypes.TxResult{
Code: uint32(res.ResponseCode),
Gas: res.Spend,
}

// bookkeeping for the block execution status
bp.updateBlockExecutionStatus(txHash)

if res.Error != nil {
if sql.IsFatalDBError(res.Error) {
return nil, fmt.Errorf("fatal db error during block execution: %w", res.Error)
Expand All @@ -321,6 +347,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}
}

// record the end time of the block execution
bp.recordBlockExecEndTime()

_, err = bp.txapp.Finalize(ctx, bp.consensusTx, blockCtx)
if err != nil {
return nil, fmt.Errorf("failed to finalize the block execution: %w", err)
Expand All @@ -330,11 +359,14 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
return nil, fmt.Errorf("failed to set the chain state: %w", err)
}

// TODO: update the consensus params in the meta store

// Create a new changeset processor
csp := newChangesetProcessor()
// "migrator" module subscribes to the changeset processor to store changesets during the migration
csErrChan := make(chan error, 1)
defer close(csErrChan)

// TODO: Subscribe to the changesets
go csp.BroadcastChangesets(ctx)

Expand All @@ -355,9 +387,6 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe

nextHash := bp.nextAppHash(bp.appHash, types.Hash(appHash), valUpdatesHash, accountsHash, txResultsHash)

bp.height = req.Height
bp.appHash = nextHash

bp.log.Info("Executed Block", "height", req.Height, "blkHash", req.BlockID, "appHash", nextHash)

return &ktypes.BlockExecResult{
Expand Down Expand Up @@ -413,6 +442,11 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest)
bp.log.Warn("Failed to create snapshot of the database", "err", err)
}

bp.clearBlockExecutionStatus() // TODO: not very sure where to clear this

bp.height = req.Height
copy(bp.appHash[:], req.AppHash[:])

bp.log.Info("Committed Block", "height", req.Height, "appHash", req.AppHash.String())
return nil
}
Expand Down
81 changes: 81 additions & 0 deletions node/block_processor/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package blockprocessor

import (
"slices"
"time"

ktypes "github.com/kwilteam/kwil-db/core/types"
)

type blockExecStatus struct {
startTime, endTime time.Time
height int64
txIDs []ktypes.Hash
txStatus map[string]bool
}

// Used by the rpc server to get the execution status of the block being processed.
// end_time is not set if the block is still being processed.
func (bp *BlockProcessor) BlockExecutionStatus() ktypes.BlockExecutionStatus {
bp.statusMu.RLock()
defer bp.statusMu.RUnlock()

if bp.status == nil {
return ktypes.BlockExecutionStatus{}
}

status := &ktypes.BlockExecutionStatus{
StartTime: bp.status.startTime,
EndTime: bp.status.endTime,
Height: bp.status.height,
TxIDs: slices.Clone(bp.status.txIDs),
TxStatus: make(map[string]bool),
}

for k, v := range bp.status.txStatus {
status.TxStatus[k] = v
}

return *status
}

func (bp *BlockProcessor) initBlockExecutionStatus(blk *ktypes.Block) {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

status := &blockExecStatus{
startTime: time.Now(),
height: blk.Header.Height,
txStatus: make(map[string]bool),
txIDs: make([]ktypes.Hash, len(blk.Txns)),
}

for i, tx := range blk.Txns {
txID := ktypes.HashBytes(tx)
status.txIDs[i] = txID
status.txStatus[txID.String()] = false // not needed, just for clarity
}

bp.status = status
}

func (bp *BlockProcessor) clearBlockExecutionStatus() {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status = nil
}

func (bp *BlockProcessor) updateBlockExecutionStatus(txID ktypes.Hash) {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status.txStatus[txID.String()] = true
}

func (bp *BlockProcessor) recordBlockExecEndTime() {
bp.statusMu.Lock()
defer bp.statusMu.Unlock()

bp.status.endTime = time.Now()
}
32 changes: 14 additions & 18 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempool.Remove(txHash)
}

// TODO: reapply existing transaction (checkTX)
// get all the transactions from mempool and recheck them, the transactions should be checked
// in the order of nonce (stable sort to maintain relative order)
// ce.blockProcessor.CheckTx(ctx, tx, true)
// recheck the transactions in the mempool
ce.mempool.RecheckTxs(ctx, ce.blockProcessor.CheckTx)

// update the role of the node based on the final validator set at the end of the commit.
ce.updateValidatorSetAndRole()
Expand All @@ -130,29 +128,24 @@ func (ce *ConsensusEngine) nextState() {
blk: ce.state.blkProp.blk,
}

ce.state.blkProp = nil
ce.state.blockRes = nil
ce.state.votes = make(map[string]*vote)
ce.state.consensusTx = nil

// update the stateInfo
ce.stateInfo.mtx.Lock()
ce.stateInfo.status = Committed
ce.stateInfo.blkProp = nil
ce.stateInfo.height = ce.state.lc.height
ce.stateInfo.mtx.Unlock()
ce.resetState()
}

func (ce *ConsensusEngine) resetState(ctx context.Context) error {
func (ce *ConsensusEngine) rollbackState(ctx context.Context) error {
// Revert back any state changes occurred due to the current block
if err := ce.blockProcessor.Rollback(ctx, ce.state.lc.height, ce.state.lc.appHash); err != nil {
return err
}

ce.resetState()

return nil
}

func (ce *ConsensusEngine) resetState() {
ce.state.blkProp = nil
ce.state.blockRes = nil
ce.state.votes = make(map[string]*vote)
ce.state.consensusTx = nil

// update the stateInfo
ce.stateInfo.mtx.Lock()
Expand All @@ -161,5 +154,8 @@ func (ce *ConsensusEngine) resetState(ctx context.Context) error {
ce.stateInfo.height = ce.state.lc.height
ce.stateInfo.mtx.Unlock()

return nil
ce.cancelFnMtx.Lock()
ce.blkExecCancelFn = nil
ce.longRunningTxs = make([]ktypes.Hash, 0)
ce.cancelFnMtx.Unlock()
}
37 changes: 25 additions & 12 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/kwilteam/kwil-db/node/meta"
"github.com/kwilteam/kwil-db/node/pg"
"github.com/kwilteam/kwil-db/node/types"
"github.com/kwilteam/kwil-db/node/types/sql"
)

const (
Expand Down Expand Up @@ -60,6 +59,13 @@ type ConsensusEngine struct {
// copy of the minimal state info for the p2p layer usage.
stateInfo StateInfo

cancelFnMtx sync.Mutex // protects blkExecCancelFn, longRunningTxs and numResets
blkExecCancelFn context.CancelFunc
// list of txs to be removed from the mempool
// only used by the leader and protected by the cancelFnMtx
longRunningTxs []ktypes.Hash
numResets int64

// Channels
newRound chan struct{}
msgChan chan consensusMessage
Expand Down Expand Up @@ -161,8 +167,6 @@ type StateInfo struct {
type state struct {
mtx sync.RWMutex

consensusTx sql.PreparedTx

blkProp *blockProposal
blockRes *blockResult
lc *lastCommit
Expand Down Expand Up @@ -626,7 +630,7 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
}

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.resetState(ctx); err != nil {
if err := ce.rollbackState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

Expand Down Expand Up @@ -672,20 +676,29 @@ func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64) {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

// If we are currently executing any transactions corresponding to the blk at height +1
// 1. Cancel the execution context -> so that the transactions stop
// 2. Rollback the consensus tx
// 3. Reset the blkProp and blockRes
// 4. This should never happen after the commit phase, (blk should have never made it to the blockstore)

// We will only honor the reset request if it's from the leader (already verified by now)
// and the height is same as the last committed block height and the
// block is still executing or waiting for the block commit message.
ce.log.Info("Reset msg: ", "height", height)
if ce.state.lc.height == height {
if ce.state.blkProp != nil {
ce.log.Info("Resetting the block proposal", "height", height)
if err := ce.resetState(ctx); err != nil {
ce.log.Error("Error resetting the state", "error", err) // panic? or consensus error?
// cancel the context
ce.cancelFnMtx.Lock()
if ce.blkExecCancelFn != nil {
ce.blkExecCancelFn()
}
ce.cancelFnMtx.Unlock()

if err := ce.rollbackState(ctx); err != nil {
ce.log.Error("Error rolling back the state", "error", err)
}

} else {
ce.log.Info("Block already committed or executed, nothing to reset", "height", height)
}
} else {
ce.log.Warn("Invalid reset request", "height", height, "lastCommittedHeight", ce.state.lc.height)
}
}

Expand Down
Loading
Loading