Skip to content

Commit

Permalink
mempool store before checktx, remove if fail
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Feb 24, 2025
1 parent 0c84efb commit 9dfcbe2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
12 changes: 7 additions & 5 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,21 @@ func (ce *ConsensusEngine) recheckTx(ctx context.Context, tx *ktypes.Transaction
// sync is set to 1, in which case the BroadcastTx returns only after it is
// successfully executed in a committed block.
func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (types.Hash, *ktypes.TxResult, error) {
if err := ce.CheckTx(ctx, tx); err != nil {
return types.Hash{}, nil, err
}

rawTx := tx.Bytes()
txHash := types.HashBytes(rawTx)

// add the transaction to the mempool
// Add the transaction to the mempool. This must be done before CheckTx
// because it modifies pending account state in the case that it passes.
have, rejected := ce.mempool.Store(txHash, tx)
if rejected {
return txHash, nil, ktypes.ErrMempoolFull
}

if err := ce.CheckTx(ctx, tx); err != nil {
ce.mempool.Remove(txHash)
return types.Hash{}, nil, err
}

// Announce the transaction to the network only if not previously announced
if ce.txAnnouncer != nil && !have {
// We can't use parent context 'cause it's canceled in the caller, which
Expand Down
1 change: 1 addition & 0 deletions node/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (mp *Mempool) Remove(txid types.Hash) {
}

func (mp *Mempool) remove(txid types.Hash) {
delete(mp.fetching, txid)
tx, have := mp.txns[txid]
if !have {
return
Expand Down
23 changes: 13 additions & 10 deletions node/nogossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (n *Node) txAnnStreamHandler(s network.Stream) {
var fetched bool
defer func() {
if !fetched { // release prefetch
n.mp.Store(txHash, nil)
n.mp.Remove(txHash)
}
}()

Expand Down Expand Up @@ -72,19 +72,22 @@ func (n *Node) txAnnStreamHandler(s network.Stream) {
// while we were fetching it

// store in mempool since it was not in tx index and thus not confirmed
_, rejected := n.mp.Store(txHash, &tx)
if rejected { // our mempool is full
return
}

ctx := context.Background()
if err := n.ce.CheckTx(ctx, &tx); err != nil {
n.log.Warnf("tx %v failed check: %v", txHash, err)
} else {
fetched = true
_, rejected := n.mp.Store(txHash, &tx)
if rejected { // our mempool is full
return
}

// re-announce
go n.announceTx(context.Background(), txHash, rawTx, s.Conn().RemotePeer())
return // must mempool.Remove (fetched must still be false here)
}

fetched = true // otherwise it will be removed on return

// re-announce
go n.announceTx(context.Background(), txHash, rawTx, s.Conn().RemotePeer())

}

func (n *Node) announceTx(ctx context.Context, txHash types.Hash, rawTx []byte, from peer.ID) {
Expand Down

0 comments on commit 9dfcbe2

Please sign in to comment.