Skip to content

Commit 2e3b5dc

Browse files
committed
Replay events during restart to avoid tx missing
1 parent 4269298 commit 2e3b5dc

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

internal/consensus/replay.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,11 @@ func (h *Handshaker) ReplayBlocks(
395395
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)
396396

397397
} else if appBlockHeight == storeBlockHeight {
398-
// We're good!
398+
// We're good! But we need to reindex events
399+
err := h.replayEvents(appBlockHeight)
400+
if err != nil {
401+
return nil, err
402+
}
399403
if err := checkAppHashEqualsOneFromState(appHash, state); err != nil {
400404
return nil, err
401405
}
@@ -550,6 +554,22 @@ func (h *Handshaker) replayBlock(
550554
return state, nil
551555
}
552556

557+
// replayEvents will be called during restart to avoid tx missing to be indexed
558+
func (h *Handshaker) replayEvents(height int64) error {
559+
block := h.store.LoadBlock(height)
560+
meta := h.store.LoadBlockMeta(height)
561+
res, err := h.stateStore.LoadFinalizeBlockResponses(height)
562+
if err != nil {
563+
return err
564+
}
565+
validatorUpdates, err := types.PB2TM.ValidatorUpdates(res.ValidatorUpdates)
566+
if err != nil {
567+
return err
568+
}
569+
sm.FireEvents(h.logger, h.eventBus, block, meta.BlockID, res, validatorUpdates)
570+
return nil
571+
}
572+
553573
func checkAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) error {
554574
if !bytes.Equal(appHash, block.AppHash) {
555575
return fmt.Errorf(`block.AppHash does not match AppHash after replay. Got '%X', expected '%X'.

internal/state/execution.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
360360

361361
// Events are fired after everything else.
362362
// NOTE: if we crash between Commit and Save, events wont be fired during replay
363-
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
363+
FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
364364

365365
return state, nil
366366
}
@@ -687,7 +687,7 @@ func (state State) Update(
687687
// Fire NewBlock, NewBlockHeader.
688688
// Fire TxEvent for every tx.
689689
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
690-
func fireEvents(
690+
func FireEvents(
691691
logger log.Logger,
692692
eventBus types.BlockEventPublisher,
693693
block *types.Block,
@@ -811,7 +811,7 @@ func ExecCommitBlock(
811811
}
812812

813813
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
814-
fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
814+
FireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
815815
}
816816

817817
// Commit block

0 commit comments

Comments
 (0)