diff --git a/core/types/messages.go b/core/types/messages.go index 364b38223..7013aa791 100644 --- a/core/types/messages.go +++ b/core/types/messages.go @@ -1,5 +1,7 @@ package types +import "time" + // This file contains the messages exchanged between the consensus engine and the block processor. type BlockExecRequest struct { @@ -40,3 +42,11 @@ type ConsensusParams struct { // single transaction. MaxVotesPerTx int64 } + +type BlockExecutionStatus struct { + StartTime time.Time + EndTime time.Time + Height int64 + TxIDs []Hash + TxStatus map[string]bool +} diff --git a/node/accounts/accounts.go b/node/accounts/accounts.go index 50da09cd5..75cf6866a 100644 --- a/node/accounts/accounts.go +++ b/node/accounts/accounts.go @@ -303,3 +303,10 @@ func (a *Accounts) updateAccount(ctx context.Context, tx sql.Executor, account [ } return nil } + +func (a *Accounts) Rollback() { + a.mtx.Lock() + defer a.mtx.Unlock() + + a.updates = make(map[string]*types.Account) +} diff --git a/node/block_processor/interfaces.go b/node/block_processor/interfaces.go index 330b1b1b6..7c2894bab 100644 --- a/node/block_processor/interfaces.go +++ b/node/block_processor/interfaces.go @@ -36,6 +36,7 @@ type TxApp interface { Execute(ctx *common.TxContext, db sql.DB, tx *ktypes.Transaction) *txapp.TxResponse Finalize(ctx context.Context, db sql.DB, block *common.BlockContext) (finalValidators []*ktypes.Validator, err error) Commit() error + Rollback() GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error ApplyMempool(ctx *common.TxContext, db sql.DB, tx *types.Transaction) error diff --git a/node/block_processor/processor.go b/node/block_processor/processor.go index c3486d0f6..3fdeb0c27 100644 --- a/node/block_processor/processor.go +++ b/node/block_processor/processor.go @@ -42,6 +42,9 @@ type BlockProcessor struct { height int64 chainCtx *common.ChainContext + status *blockExecStatus + statusMu sync.RWMutex // very granular mutex to protect access to the block execution status + // consensus TX consensusTx sql.PreparedTx @@ -130,7 +133,23 @@ func (bp *BlockProcessor) Rollback(ctx context.Context, height int64, appHash kt // set the block proposer back to it's previous state bp.height = height bp.appHash = appHash - // TODO: how about validatorset and consensus params? rethink rollback + + readTx, err := bp.db.BeginReadTx(ctx) + if err != nil { + return fmt.Errorf("failed to begin read transaction: %w", err) + } + defer readTx.Rollback(ctx) + + networkParams, err := meta.LoadParams(ctx, readTx) + if err != nil { + return fmt.Errorf("failed to load the network parameters: %w", err) + } + + bp.chainCtx.NetworkParameters = networkParams + // how about updates to the migration params? do we need to rollback them as well? + + // Rollback internal state updates to the validators, accounts and mempool. + bp.txapp.Rollback() return nil } @@ -261,6 +280,7 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe bp.consensusTx, err = bp.db.BeginPreparedTx(ctx) if err != nil { + bp.consensusTx = nil // safety measure return nil, fmt.Errorf("failed to begin the consensus transaction: %w", err) } @@ -272,6 +292,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe } txResults := make([]ktypes.TxResult, len(req.Block.Txns)) + + bp.initBlockExecutionStatus(req.Block) + for i, tx := range req.Block.Txns { decodedTx := &ktypes.Transaction{} if err := decodedTx.UnmarshalBinary(tx); err != nil { @@ -297,15 +320,18 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe } select { - case <-ctx.Done(): // TODO: is this the best way to abort the block execution? - bp.log.Info("Block execution cancelled", "height", req.Height) - return nil, nil // TODO: or error? or trigger resetState? + case <-ctx.Done(): + return nil, ctx.Err() // notify the caller about the context cancellation or deadline exceeded error default: res := bp.txapp.Execute(txCtx, bp.consensusTx, decodedTx) txResult := ktypes.TxResult{ Code: uint32(res.ResponseCode), Gas: res.Spend, } + + // bookkeeping for the block execution status + bp.updateBlockExecutionStatus(txHash) + if res.Error != nil { if sql.IsFatalDBError(res.Error) { return nil, fmt.Errorf("fatal db error during block execution: %w", res.Error) @@ -321,6 +347,9 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe } } + // record the end time of the block execution + bp.recordBlockExecEndTime() + _, err = bp.txapp.Finalize(ctx, bp.consensusTx, blockCtx) if err != nil { return nil, fmt.Errorf("failed to finalize the block execution: %w", err) @@ -330,11 +359,14 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe return nil, fmt.Errorf("failed to set the chain state: %w", err) } + // TODO: update the consensus params in the meta store + // Create a new changeset processor csp := newChangesetProcessor() // "migrator" module subscribes to the changeset processor to store changesets during the migration csErrChan := make(chan error, 1) defer close(csErrChan) + // TODO: Subscribe to the changesets go csp.BroadcastChangesets(ctx) @@ -355,9 +387,6 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe nextHash := bp.nextAppHash(bp.appHash, types.Hash(appHash), valUpdatesHash, accountsHash, txResultsHash) - bp.height = req.Height - bp.appHash = nextHash - bp.log.Info("Executed Block", "height", req.Height, "blkHash", req.BlockID, "appHash", nextHash) return &ktypes.BlockExecResult{ @@ -413,6 +442,11 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest) bp.log.Warn("Failed to create snapshot of the database", "err", err) } + bp.clearBlockExecutionStatus() // TODO: not very sure where to clear this + + bp.height = req.Height + copy(bp.appHash[:], req.AppHash[:]) + bp.log.Info("Committed Block", "height", req.Height, "appHash", req.AppHash.String()) return nil } diff --git a/node/block_processor/status.go b/node/block_processor/status.go new file mode 100644 index 000000000..0ca7f15d1 --- /dev/null +++ b/node/block_processor/status.go @@ -0,0 +1,81 @@ +package blockprocessor + +import ( + "slices" + "time" + + ktypes "github.com/kwilteam/kwil-db/core/types" +) + +type blockExecStatus struct { + startTime, endTime time.Time + height int64 + txIDs []ktypes.Hash + txStatus map[string]bool +} + +// Used by the rpc server to get the execution status of the block being processed. +// end_time is not set if the block is still being processed. +func (bp *BlockProcessor) BlockExecutionStatus() ktypes.BlockExecutionStatus { + bp.statusMu.RLock() + defer bp.statusMu.RUnlock() + + if bp.status == nil { + return ktypes.BlockExecutionStatus{} + } + + status := &ktypes.BlockExecutionStatus{ + StartTime: bp.status.startTime, + EndTime: bp.status.endTime, + Height: bp.status.height, + TxIDs: slices.Clone(bp.status.txIDs), + TxStatus: make(map[string]bool), + } + + for k, v := range bp.status.txStatus { + status.TxStatus[k] = v + } + + return *status +} + +func (bp *BlockProcessor) initBlockExecutionStatus(blk *ktypes.Block) { + bp.statusMu.Lock() + defer bp.statusMu.Unlock() + + status := &blockExecStatus{ + startTime: time.Now(), + height: blk.Header.Height, + txStatus: make(map[string]bool), + txIDs: make([]ktypes.Hash, len(blk.Txns)), + } + + for i, tx := range blk.Txns { + txID := ktypes.HashBytes(tx) + status.txIDs[i] = txID + status.txStatus[txID.String()] = false // not needed, just for clarity + } + + bp.status = status +} + +func (bp *BlockProcessor) clearBlockExecutionStatus() { + bp.statusMu.Lock() + defer bp.statusMu.Unlock() + + bp.status = nil +} + +func (bp *BlockProcessor) updateBlockExecutionStatus(txID ktypes.Hash) { + bp.statusMu.Lock() + defer bp.statusMu.Unlock() + + bp.status.txStatus[txID.String()] = true +} + +func (bp *BlockProcessor) recordBlockExecEndTime() { + bp.statusMu.Lock() + defer bp.statusMu.Unlock() + + bp.status.endTime = time.Now() +} diff --git a/node/consensus/block.go b/node/consensus/block.go index fc4ba0a7b..5aafc9231 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -110,10 +110,8 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { ce.mempool.Remove(txHash) } - // TODO: reapply existing transaction (checkTX) - // get all the transactions from mempool and recheck them, the transactions should be checked - // in the order of nonce (stable sort to maintain relative order) - // ce.blockProcessor.CheckTx(ctx, tx, true) + // recheck the transactions in the mempool + ce.mempool.RecheckTxs(ctx, ce.blockProcessor.CheckTx) // update the role of the node based on the final validator set at the end of the commit. ce.updateValidatorSetAndRole() @@ -130,29 +128,24 @@ func (ce *ConsensusEngine) nextState() { blk: ce.state.blkProp.blk, } - ce.state.blkProp = nil - ce.state.blockRes = nil - ce.state.votes = make(map[string]*vote) - ce.state.consensusTx = nil - - // update the stateInfo - ce.stateInfo.mtx.Lock() - ce.stateInfo.status = Committed - ce.stateInfo.blkProp = nil - ce.stateInfo.height = ce.state.lc.height - ce.stateInfo.mtx.Unlock() + ce.resetState() } -func (ce *ConsensusEngine) resetState(ctx context.Context) error { +func (ce *ConsensusEngine) rollbackState(ctx context.Context) error { // Revert back any state changes occurred due to the current block if err := ce.blockProcessor.Rollback(ctx, ce.state.lc.height, ce.state.lc.appHash); err != nil { return err } + ce.resetState() + + return nil +} + +func (ce *ConsensusEngine) resetState() { ce.state.blkProp = nil ce.state.blockRes = nil ce.state.votes = make(map[string]*vote) - ce.state.consensusTx = nil // update the stateInfo ce.stateInfo.mtx.Lock() @@ -161,5 +154,8 @@ func (ce *ConsensusEngine) resetState(ctx context.Context) error { ce.stateInfo.height = ce.state.lc.height ce.stateInfo.mtx.Unlock() - return nil + ce.cancelFnMtx.Lock() + ce.blkExecCancelFn = nil + ce.longRunningTxs = make([]ktypes.Hash, 0) + ce.cancelFnMtx.Unlock() } diff --git a/node/consensus/engine.go b/node/consensus/engine.go index e3c3462ce..340213095 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -19,7 +19,6 @@ import ( "github.com/kwilteam/kwil-db/node/meta" "github.com/kwilteam/kwil-db/node/pg" "github.com/kwilteam/kwil-db/node/types" - "github.com/kwilteam/kwil-db/node/types/sql" ) const ( @@ -60,6 +59,13 @@ type ConsensusEngine struct { // copy of the minimal state info for the p2p layer usage. stateInfo StateInfo + cancelFnMtx sync.Mutex // protects blkExecCancelFn, longRunningTxs and numResets + blkExecCancelFn context.CancelFunc + // list of txs to be removed from the mempool + // only used by the leader and protected by the cancelFnMtx + longRunningTxs []ktypes.Hash + numResets int64 + // Channels newRound chan struct{} msgChan chan consensusMessage @@ -161,8 +167,6 @@ type StateInfo struct { type state struct { mtx sync.RWMutex - consensusTx sql.PreparedTx - blkProp *blockProposal blockRes *blockResult lc *lastCommit @@ -626,7 +630,7 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error { } if blkHash != ce.state.blkProp.blkHash { // processed incorrect block - if err := ce.resetState(ctx); err != nil { + if err := ce.rollbackState(ctx); err != nil { return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } @@ -672,20 +676,29 @@ func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64) { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() - // If we are currently executing any transactions corresponding to the blk at height +1 - // 1. Cancel the execution context -> so that the transactions stop - // 2. Rollback the consensus tx - // 3. Reset the blkProp and blockRes - // 4. This should never happen after the commit phase, (blk should have never made it to the blockstore) - + // We will only honor the reset request if it's from the leader (already verified by now) + // and the height is same as the last committed block height and the + // block is still executing or waiting for the block commit message. ce.log.Info("Reset msg: ", "height", height) if ce.state.lc.height == height { if ce.state.blkProp != nil { ce.log.Info("Resetting the block proposal", "height", height) - if err := ce.resetState(ctx); err != nil { - ce.log.Error("Error resetting the state", "error", err) // panic? or consensus error? + // cancel the context + ce.cancelFnMtx.Lock() + if ce.blkExecCancelFn != nil { + ce.blkExecCancelFn() + } + ce.cancelFnMtx.Unlock() + + if err := ce.rollbackState(ctx); err != nil { + ce.log.Error("Error rolling back the state", "error", err) } + + } else { + ce.log.Info("Block already committed or executed, nothing to reset", "height", height) } + } else { + ce.log.Warn("Invalid reset request", "height", height, "lastCommittedHeight", ce.state.lc.height) } } diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 431ff9ede..2499bfee2 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -275,7 +275,7 @@ func TestValidatorStateMachine(t *testing.T) { { name: "blkPropNew", trigger: func(t *testing.T, leader, val *ConsensusEngine) { - val.NotifyBlockProposal(blkProp1.blk) + val.NotifyBlockProposal(blkProp2.blk) }, verify: func(t *testing.T, leader, val *ConsensusEngine) error { return verifyStatus(t, val, Executed, 0, blkProp2.blkHash) @@ -553,8 +553,10 @@ func TestValidatorStateMachine(t *testing.T) { val := New(ceConfigs[1]) blkProp1, err = leader.createBlockProposal() assert.NoError(t, err) + time.Sleep(300 * time.Millisecond) // just to ensure that the block hashes are different due to start time blkProp2, err = leader.createBlockProposal() assert.NoError(t, err) + t.Logf("blkProp1: %s, blkProp2: %s", blkProp1.blkHash.String(), blkProp2.blkHash.String()) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -779,6 +781,9 @@ func (d *dummyTxApp) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transact func (d *dummyTxApp) Commit() error { return nil } + +func (d *dummyTxApp) Rollback() {} + func (d *dummyTxApp) GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error { return nil } diff --git a/node/consensus/follower.go b/node/consensus/follower.go index bd3b65bb4..f80679a34 100644 --- a/node/consensus/follower.go +++ b/node/consensus/follower.go @@ -162,7 +162,7 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg } ce.log.Info("Aborting execution of stale block proposal", "height", blkPropMsg.height, "blkHash", ce.state.blkProp.blkHash) - if err := ce.resetState(ctx); err != nil { + if err := ce.rollbackState(ctx); err != nil { ce.log.Error("Error aborting execution of block", "height", blkPropMsg.height, "blkID", ce.state.blkProp.blkHash, "error", err) return err } @@ -183,7 +183,17 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg ce.stateInfo.blkProp = blkPropMsg ce.stateInfo.mtx.Unlock() - if err := ce.executeBlock(ctx, blkPropMsg); err != nil { + // execCtx is applicable only for the duration of the block execution + // This is used to react to the leader's reset message by cancelling the block execution. + execCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Set the cancel function for the block execution + ce.cancelFnMtx.Lock() + ce.blkExecCancelFn = cancel + ce.cancelFnMtx.Unlock() + + if err := ce.executeBlock(execCtx, blkPropMsg); err != nil { ce.log.Error("Error executing block, sending NACK", "error", err) go ce.ackBroadcaster(false, blkPropMsg.height, blkPropMsg.blkHash, nil) return err @@ -227,8 +237,7 @@ func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, a } if ce.state.blkProp.blkHash != blk.Header.Hash() { - // ce.state.cancelFunc() // abort the current block execution ?? - if err := ce.resetState(ctx); err != nil { + if err := ce.rollbackState(ctx); err != nil { ce.log.Error("error aborting execution of incorrect block proposal", "height", blk.Header.Height, "blkID", blk.Header.Hash(), "error", err) } return ce.processAndCommit(ctx, blk, appHash) diff --git a/node/consensus/interfaces.go b/node/consensus/interfaces.go index aa711f6cc..0b84d7d30 100644 --- a/node/consensus/interfaces.go +++ b/node/consensus/interfaces.go @@ -4,6 +4,7 @@ import ( "context" ktypes "github.com/kwilteam/kwil-db/core/types" + "github.com/kwilteam/kwil-db/node/mempool" "github.com/kwilteam/kwil-db/node/types" "github.com/kwilteam/kwil-db/node/types/sql" ) @@ -21,6 +22,7 @@ type DB interface { type Mempool interface { PeekN(maxSize int) []types.NamedTx Remove(txid types.Hash) + RecheckTxs(ctx context.Context, checkFn mempool.CheckFn) } // BlockStore includes both txns and blocks @@ -33,6 +35,7 @@ type BlockStore interface { GetByHeight(height int64) (types.Hash, *ktypes.Block, types.Hash, error) StoreResults(hash types.Hash, results []ktypes.TxResult) error // Results(hash types.Hash) ([]types.TxResult, error) + } type BlockProcessor interface { @@ -45,4 +48,6 @@ type BlockProcessor interface { CheckTx(ctx context.Context, tx *ktypes.Transaction, recheck bool) error GetValidators() []*ktypes.Validator + + BlockExecutionStatus() ktypes.BlockExecutionStatus } diff --git a/node/consensus/leader.go b/node/consensus/leader.go index ab82e2723..95b09b4ea 100644 --- a/node/consensus/leader.go +++ b/node/consensus/leader.go @@ -12,22 +12,26 @@ import ( "github.com/kwilteam/kwil-db/node/types" ) -var lastReset int64 = 0 - -// Leader is the node that proposes the block and drives the consensus process: +// The Leader is responsible for proposing blocks and managing the consensus process: // 1. Prepare Phase: // - Create a block proposal // - Broadcast the block proposal // - Process the block and generate the appHash -// - Wait for the votes from the validators -// - Enter the commit phase if majority of the validators execute the block correctly +// - Wait for votes from validators +// - Enter the commit phase if the majority of validators approve the block // // 2. Commit Phase: -// - Commit the block and start next prepare phase -// - This phase includes committing the block to the block store, flushing out the mempool -// updating the chain state, creating snapshots, committing the pg db state etc. +// - Commit the block and initiate the next prepare phase +// - This phase includes committing the block to the block store, clearing the mempool, +// updating the chain state, creating snapshots, committing the pg db state, etc. +// +// The Leader can also issue ResetState messages using "kwil-admin reset " +// When a leader receives a ResetState request, it will broadcast the ResetState message to the network to +// halt the current block execution if the current block height equals reset-block-height + 1. The leader will stop processing +// the current block, revert any state changes made, and remove the problematic transactions from the mempool before +// reproposing the block. -// startNewRound starts a new round of consensus process (Prepare Phase). +// startNewRound initiates a new round of the consensus process (Prepare Phase). func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { ce.state.mtx.Lock() defer ce.state.mtx.Unlock() @@ -58,8 +62,50 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { ce.stateInfo.blkProp = blkProp ce.stateInfo.mtx.Unlock() + // execCtx is applicable only for the duration of the block execution + // This is used to give leader the ability to cancel the block execution. + execCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Set the cancel function for the block execution + ce.cancelFnMtx.Lock() + ce.blkExecCancelFn = cancel + ce.cancelFnMtx.Unlock() + // Execute the block and generate the appHash - if err := ce.executeBlock(ctx, blkProp); err != nil { + if err := ce.executeBlock(execCtx, blkProp); err != nil { + // check if the error is due to context cancellation + ce.log.Errorf("Error executing the block: %v", err) + if execCtx.Err() != nil && errors.Is(err, context.Canceled) { + ce.log.Warn("Block execution cancelled by the leader", "height", blkProp.height, "hash", blkProp.blkHash) + + // trigger a reset state message + go ce.rstStateBroadcaster(ce.state.lc.height) + + ce.cancelFnMtx.Lock() + // Remove the long running transactions from the mempool + ce.log.Info("Removing long running transactions from the mempool as per leader's request", "txIDs", ce.longRunningTxs) + for _, txID := range ce.longRunningTxs { + ce.mempool.Remove(txID) + } + ce.numResets++ + ce.cancelFnMtx.Unlock() + + if err := ce.rollbackState(ctx); err != nil { + ce.log.Errorf("Error resetting the state: %v", err) + return fmt.Errorf("error resetting the state: %v", err) + } + + // Recheck the transactions in the mempool + ce.mempoolMtx.Lock() + ce.mempool.RecheckTxs(ctx, ce.blockProcessor.CheckTx) + ce.mempoolMtx.Unlock() + + // signal ce to start a new round + ce.newRound <- struct{}{} + return nil + } + ce.log.Errorf("Error executing the block: %v", err) return err } @@ -72,22 +118,6 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error { height: blkProp.height, } - // TODO: test resetState - if ce.state.blkProp.height%10 == 0 && lastReset != ce.state.blkProp.height { - lastReset = ce.state.blkProp.height - ce.log.Info("Resetting the state (for testing purposes)", "height", lastReset, " blkHash", ce.state.blkProp.blkHash) - if err := ce.resetState(ctx); err != nil { - ce.log.Errorf("Error resetting the state: %v", err) - return err - } - go ce.rstStateBroadcaster(ce.state.lc.height) - - // signal ce to start a new round - ce.newRound <- struct{}{} - - return nil - } - ce.processVotes(ctx) return nil } @@ -244,3 +274,49 @@ func (ce *ConsensusEngine) ValidatorSetHash() types.Hash { return hasher.Sum(nil) } + +// CancelBlockExecution is used by the leader to manually cancel the block execution +// if it is taking too long to execute. This method takes the height of the block to +// be cancelled and the list of long transaction IDs to be evicted from the mempool. +// One concern is: what if the block finishes execution and the leader tries to cancel it, +// and the resolutions update some internal state that cannot be reverted? +func (ce *ConsensusEngine) CancelBlockExecution(height int64, txIDs []types.Hash) error { + ce.log.Info("Block execution cancel request received", "height", height) + // Ensure we are cancelling the block execution for the current block + ce.stateInfo.mtx.RLock() + defer ce.stateInfo.mtx.RUnlock() + + // Check if the height is the same as the current block height + if height != ce.stateInfo.height+1 { + ce.log.Warn("Cannot cancel block execution, block height does not match", "height", height, "current", ce.stateInfo.height+1) + return fmt.Errorf("cannot cancel block execution for block height %d, currently executing %d", height, ce.stateInfo.height+1) + } + + // Check if a block is proposed + if ce.stateInfo.blkProp == nil { + ce.log.Warn("Cannot cancel block execution, no block is proposed yet", "height", height) + return fmt.Errorf("cannot cancel block execution, no block is proposed yet") + } + + // Cannot cancel if the block is already finished executing or committed + if ce.stateInfo.status != Proposed { + ce.log.Warn("Cannot cancel block execution, block is already executed or committed", "height", height) + return fmt.Errorf("cannot cancel block execution, block is already executed or committed") + } + + // Cancel the block execution + ce.cancelFnMtx.Lock() + defer ce.cancelFnMtx.Unlock() + + ce.longRunningTxs = append([]ktypes.Hash{}, txIDs...) + + if ce.blkExecCancelFn != nil { + ce.log.Info("Cancelling block execution", "height", height, "txIDs", txIDs) + ce.blkExecCancelFn() + } else { + ce.log.Error("Block execution cancel function not set") + return errors.New("block execution cancel function not set") + } + + return nil +} diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index c8d3ad7a8..24be092f9 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -1,6 +1,7 @@ package mempool import ( + "context" "slices" "sync" @@ -114,3 +115,16 @@ func (mp *Mempool) PeekN(n int) []types.NamedTx { copy(txns, mp.txQ) return txns } + +type CheckFn func(ctx context.Context, tx *ktypes.Transaction, recheck bool) error + +func (mp *Mempool) RecheckTxs(ctx context.Context, fn CheckFn) { + mp.mtx.RLock() + defer mp.mtx.RUnlock() + + for _, tx := range mp.txQ { + if err := fn(ctx, tx.Tx, true); err != nil { + mp.remove(tx.Hash) + } + } +} diff --git a/node/node_live_test.go b/node/node_live_test.go index 836f9276a..08eabdb63 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -365,6 +365,8 @@ func (d *dummyTxApp) Commit() error { return nil } +func (d *dummyTxApp) Rollback() {} + func (d *dummyTxApp) GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error { return nil } diff --git a/node/txapp/interfaces.go b/node/txapp/interfaces.go index 9c2ce8338..7077f4ecc 100644 --- a/node/txapp/interfaces.go +++ b/node/txapp/interfaces.go @@ -17,6 +17,7 @@ type Accounts interface { GetAccount(ctx context.Context, tx sql.Executor, acctID []byte) (*types.Account, error) ApplySpend(ctx context.Context, tx sql.Executor, acctID []byte, amount *big.Int, nonce int64) error Commit() error + Rollback() } type Validators interface { @@ -24,6 +25,7 @@ type Validators interface { GetValidatorPower(ctx context.Context, tx sql.Executor, pubKey []byte) (int64, error) GetValidators() []*types.Validator Commit() error + Rollback() } // Rebroadcaster is a service that marks events for rebroadcasting. diff --git a/node/txapp/routes_test.go b/node/txapp/routes_test.go index 242084ce3..b71271cb7 100644 --- a/node/txapp/routes_test.go +++ b/node/txapp/routes_test.go @@ -327,6 +327,8 @@ func (a *mockAccount) Commit() error { return nil } +func (a *mockAccount) Rollback() {} + type mockValidator struct { getVoterFn getVoterPowerFunc } @@ -347,6 +349,8 @@ func (v *mockValidator) Commit() error { return nil } +func (v *mockValidator) Rollback() {} + func getSigner(hexPrivKey string) auth.Signer { bts, err := hex.DecodeString(hexPrivKey) if err != nil { diff --git a/node/txapp/txapp.go b/node/txapp/txapp.go index c10ecae43..578e371ef 100644 --- a/node/txapp/txapp.go +++ b/node/txapp/txapp.go @@ -169,6 +169,13 @@ func (r *TxApp) Commit() error { return nil } +func (r *TxApp) Rollback() { + r.Accounts.Rollback() + r.Validators.Rollback() + + r.mempool.reset() // will issue recheck before next block +} + // processVotes confirms resolutions that have been approved by the network, // expires resolutions that have expired, and properly credits proposers and voters. func (r *TxApp) processVotes(ctx context.Context, db sql.DB, block *common.BlockContext) error { diff --git a/node/voting/voting.go b/node/voting/voting.go index d6ba56bed..8f57da132 100644 --- a/node/voting/voting.go +++ b/node/voting/voting.go @@ -672,3 +672,10 @@ func (v *VoteStore) ValidatorUpdates() map[string]*types.Validator { return v.valUpdates } + +func (v *VoteStore) Rollback() { + v.mtx.Lock() + defer v.mtx.Unlock() + + v.valUpdates = make(map[string]*types.Validator) +}