Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make missing txs check atomic #269

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (m emptyMempool) GetTxsForKeys(txKeys []types.TxKey) types.Txs {
return types.Txs{}
}

func (m emptyMempool) SafeGetTxsForKeys(txKeys []types.TxKey) (types.Txs, []types.TxKey) {
return types.Txs{}, []types.TxKey{}
}

var _ mempool.Mempool = emptyMempool{}

func (emptyMempool) TxStore() *mempool.TxStore { return nil }
Expand Down
3 changes: 1 addition & 2 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2502,13 +2502,12 @@ func (cs *State) tryCreateProposalBlock(ctx context.Context, height int64, round
// Build a proposal block from mempool txs. If cs.config.GossipTransactionKeyOnly=true
// proposals only contain txKeys so we rebuild the block using mempool txs
func (cs *State) buildProposalBlock(height int64, header types.Header, lastCommit *types.Commit, evidence []types.Evidence, proposerAddress types.Address, txKeys []types.TxKey) *types.Block {
missingTxs := cs.blockExec.GetMissingTxs(txKeys)
txs, missingTxs := cs.blockExec.SafeGetTxsByKeys(txKeys)
if len(missingTxs) > 0 {
cs.metrics.ProposalMissingTxs.Set(float64(len(missingTxs)))
cs.logger.Debug("Missing txs when trying to build block", "missing_txs", cs.blockExec.GetMissingTxs(txKeys))
return nil
}
txs := cs.blockExec.GetTxsForKeys(txKeys)
block := cs.state.MakeBlock(height, cs.blockExec.GetTxsForKeys(txKeys), lastCommit, evidence, proposerAddress)
block.Version = header.Version
block.Data.Txs = txs
Expand Down
17 changes: 17 additions & 0 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,23 @@ func (txmp *TxMempool) GetTxsForKeys(txKeys []types.TxKey) types.Txs {
return txs
}

func (txmp *TxMempool) SafeGetTxsForKeys(txKeys []types.TxKey) (types.Txs, []types.TxKey) {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

txs := make([]types.Tx, 0, len(txKeys))
missing := []types.TxKey{}
for _, txKey := range txKeys {
wtx := txmp.txStore.GetTxByHash(txKey)
if wtx == nil {
missing = append(missing, txKey)
continue
}
txs = append(txs, wtx.tx)
}
return txs, missing
}

// Flush empties the mempool. It acquires a read-lock, fetches all the
// transactions currently in the transaction store and removes each transaction
// from the store and all indexes and finally resets the cache.
Expand Down
115 changes: 97 additions & 18 deletions internal/mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/mempool/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Mempool interface {

GetTxsForKeys(txKeys []types.TxKey) types.Txs

// Similar to GetTxsForKeys except that it would return a list
// indicating missing keys.
SafeGetTxsForKeys(txKeys []types.TxKey) (types.Txs, []types.TxKey)

// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
// maxGas and that the total estimated gas used is less than maxGasEstimated.
Expand Down
4 changes: 4 additions & 0 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ func (blockExec *BlockExecutor) GetMissingTxs(txKeys []types.TxKey) []types.TxKe
return missingTxKeys
}

func (blockExec *BlockExecutor) SafeGetTxsByKeys(txKeys []types.TxKey) (types.Txs, []types.TxKey) {
return blockExec.mempool.SafeGetTxsForKeys(txKeys)
}

func (blockExec *BlockExecutor) CheckTxFromPeerProposal(ctx context.Context, tx types.Tx) {
// Ignore errors from CheckTx because there could be benign errors due to the same tx being
// inserted into the mempool from gossiping. Since such simultaneous insertion could result in
Expand Down
14 changes: 7 additions & 7 deletions internal/state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("TxStore").Return(nil)

eventBus := eventbus.NewDefault(logger)
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("TxStore").Return(nil)

blockExec := sm.NewBlockExecutor(
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestPrepareProposalErrorOnNonExistingRemoved(t *testing.T) {
evpool.On("PendingEvidence", mock.Anything).Return([]types.Evidence{}, int64(0))

mp := &mpmocks.Mempool{}
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs{})

app := abcimocks.NewApplication(t)

Expand Down Expand Up @@ -766,7 +766,7 @@ func TestPrepareProposalReorderTxs(t *testing.T) {

txs := factory.MakeNTxs(height, 10)
mp := &mpmocks.Mempool{}
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs(txs))
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs(txs))

trs := txsToTxRecords(types.Txs(txs))
trs = trs[2:]
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestPrepareProposalErrorOnTooManyTxs(t *testing.T) {
maxDataBytes := types.MaxDataBytes(state.ConsensusParams.Block.MaxBytes, 0, nValidators)
txs := factory.MakeNTxs(height, maxDataBytes/bytesPerTx+2) // +2 so that tx don't fit
mp := &mpmocks.Mempool{}
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs(txs))
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs(txs))

trs := txsToTxRecords(types.Txs(txs))

Expand Down Expand Up @@ -880,7 +880,7 @@ func TestPrepareProposalErrorOnPrepareProposalError(t *testing.T) {

txs := factory.MakeNTxs(height, 10)
mp := &mpmocks.Mempool{}
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs(txs))
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs(txs))

cm := &abciclientmocks.Client{}
cm.On("IsRunning").Return(true)
Expand Down Expand Up @@ -984,7 +984,7 @@ func TestCreateProposalAbsentVoteExtensions(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.Txs{})

blockExec := sm.NewBlockExecutor(
stateStore,
Expand Down
Loading