Skip to content

Commit bd9d427

Browse files
committed
EVM transaction replacement
1 parent 0c51651 commit bd9d427

File tree

12 files changed

+778
-302
lines changed

12 files changed

+778
-302
lines changed

abci/types/types.pb.go

+608-279
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mempool/mempool.go

+34-11
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
364364

365365
// remove the committed transaction from the transaction store and indexes
366366
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
367-
txmp.removeTx(wtx, false, true)
367+
txmp.removeTx(wtx, false, true, true)
368368
return nil
369369
}
370370

@@ -403,7 +403,7 @@ func (txmp *TxMempool) Flush() {
403403
txmp.timestampIndex.Reset()
404404

405405
for _, wtx := range txmp.txStore.GetAllTxs() {
406-
txmp.removeTx(wtx, false, false)
406+
txmp.removeTx(wtx, false, false, true)
407407
}
408408

409409
atomic.SwapInt64(&txmp.sizeBytes, 0)
@@ -515,7 +515,17 @@ func (txmp *TxMempool) Update(
515515

516516
// remove the committed transaction from the transaction store and indexes
517517
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
518-
txmp.removeTx(wtx, false, false)
518+
txmp.removeTx(wtx, false, false, true)
519+
}
520+
if execTxResult[i].EvmTxInfo != nil {
521+
// remove any tx that has the same nonce (because the committed tx
522+
// may be from block proposal and is never in the local mempool)
523+
if wtx, _ := txmp.priorityIndex.GetTxWithSameNonce(&WrappedTx{
524+
evmAddress: execTxResult[i].EvmTxInfo.SenderAddress,
525+
evmNonce: execTxResult[i].EvmTxInfo.Nonce,
526+
}); wtx != nil {
527+
txmp.removeTx(wtx, false, false, true)
528+
}
519529
}
520530
}
521531

@@ -636,7 +646,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
636646
// - The transaction, toEvict, can be removed while a concurrent
637647
// reCheckTx callback is being executed for the same transaction.
638648
for _, toEvict := range evictTxs {
639-
txmp.removeTx(toEvict, true, true)
649+
txmp.removeTx(toEvict, true, true, true)
640650
txmp.logger.Debug(
641651
"evicted existing good transaction; mempool full",
642652
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
@@ -655,11 +665,19 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
655665
txInfo.SenderID: {},
656666
}
657667

668+
replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx)
669+
if shouldDrop {
670+
return nil
671+
}
672+
658673
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
659674
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
660675
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
661676

662-
if txmp.insertTx(wtx) {
677+
if replaced != nil {
678+
txmp.removeTx(replaced, true, false, false)
679+
}
680+
if txmp.insertTx(wtx, replaced == nil) {
663681
txmp.logger.Debug(
664682
"inserted good transaction",
665683
"priority", wtx.priority,
@@ -747,7 +765,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
747765
panic("corrupted reCheckTx cursor")
748766
}
749767

750-
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
768+
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true, true)
751769
}
752770
}
753771

@@ -853,13 +871,15 @@ func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
853871
return nil
854872
}
855873

856-
func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
874+
func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool {
857875
if txmp.isInMempool(wtx.tx) {
858876
return false
859877
}
860878

861879
txmp.txStore.SetTx(wtx)
862-
txmp.priorityIndex.PushTx(wtx)
880+
if updatePriorityIndex {
881+
txmp.priorityIndex.PushTx(wtx)
882+
}
863883
txmp.heightIndex.Insert(wtx)
864884
txmp.timestampIndex.Insert(wtx)
865885

@@ -873,13 +893,16 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
873893
return true
874894
}
875895

876-
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
896+
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool, updatePriorityIndex bool) {
877897
if txmp.txStore.IsTxRemoved(wtx.hash) {
878898
return
879899
}
880900

881901
txmp.txStore.RemoveTx(wtx)
882-
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
902+
toBeReenqueued := []*WrappedTx{}
903+
if updatePriorityIndex {
904+
toBeReenqueued = txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
905+
}
883906
txmp.heightIndex.Remove(wtx)
884907
txmp.timestampIndex.Remove(wtx)
885908

@@ -894,7 +917,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReen
894917

895918
if shouldReenqueue {
896919
for _, reenqueue := range toBeReenqueued {
897-
txmp.removeTx(reenqueue, removeFromCache, false)
920+
txmp.removeTx(reenqueue, removeFromCache, false, true)
898921
}
899922
for _, reenqueue := range toBeReenqueued {
900923
rtx := reenqueue.tx

internal/mempool/mempool_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ func TestTxMempool_EVMEviction(t *testing.T) {
612612

613613
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
614614
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
615-
txmp.removeTx(tx, true, false)
615+
txmp.removeTx(tx, true, false, true)
616616
// should not reenqueue
617617
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
618618
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec

internal/mempool/priority_queue.go

+50-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)
1212

1313
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
1414
type TxPriorityQueue struct {
15-
mtx sync.RWMutex
16-
txs []*WrappedTx // priority heap
15+
mtx sync.RWMutex
16+
txs []*WrappedTx // priority heap
17+
// invariant 1: no duplicate nonce in the same queue
18+
// invariant 2: no nonce gap in the same queue
19+
// invariant 3: head of the queue must be in heap
1720
evmQueue map[string][]*WrappedTx // sorted by nonce
1821
}
1922

@@ -50,6 +53,51 @@ func NewTxPriorityQueue() *TxPriorityQueue {
5053
return pq
5154
}
5255

56+
func (pq *TxPriorityQueue) GetTxWithSameNonce(tx *WrappedTx) (*WrappedTx, int) {
57+
pq.mtx.RLock()
58+
defer pq.mtx.RUnlock()
59+
return pq.getTxWithSameNonceUnsafe(tx)
60+
}
61+
62+
func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx, int) {
63+
queue, ok := pq.evmQueue[tx.evmAddress]
64+
if !ok {
65+
return nil, -1
66+
}
67+
idx := binarySearch(queue, tx)
68+
if idx < len(queue) && queue[idx].evmNonce == tx.evmNonce {
69+
return queue[idx], idx
70+
}
71+
return nil, -1
72+
}
73+
74+
func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
75+
if !tx.isEVM {
76+
return nil, false
77+
}
78+
pq.mtx.Lock()
79+
defer pq.mtx.Unlock()
80+
queue, ok := pq.evmQueue[tx.evmAddress]
81+
if ok && len(queue) > 0 {
82+
existing, idx := pq.getTxWithSameNonceUnsafe(tx)
83+
if existing != nil {
84+
if tx.priority > existing.priority {
85+
// should replace
86+
// replace heap if applicable
87+
if hi, ok := pq.findTxIndexUnsafe(existing); ok {
88+
heap.Remove(pq, hi)
89+
heap.Push(pq, tx) // need to be in the heap since it has the same nonce
90+
}
91+
pq.evmQueue[tx.evmAddress][idx] = tx // replace queue item in-place
92+
return existing, false
93+
}
94+
// tx should be dropped since it's dominated by an existing tx
95+
return nil, true
96+
}
97+
}
98+
return nil, false
99+
}
100+
53101
// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
54102
// evicted to make room for another *WrappedTx with higher priority. If no such
55103
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx

internal/mempool/priority_queue_test.go

+75-2
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,11 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
7979
{sender: "2", isEVM: false, priority: 9},
8080
{sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce
8181
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
82-
{sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7},
8382
{sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8},
8483
{sender: "6", isEVM: false, priority: 6},
8584
{sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5},
8685
},
87-
expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7},
86+
expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7},
8887
},
8988
{
9089
name: "PriorityWithEVMAndNonEVM",
@@ -371,3 +370,77 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) {
371370
})
372371
require.Equal(t, numTxs-2, pq.NumTxs())
373372
}
373+
374+
func TestTxPriorityQueue_TryReplacement(t *testing.T) {
375+
for _, test := range []struct {
376+
tx *WrappedTx
377+
existing []*WrappedTx
378+
expectedReplaced bool
379+
expectedDropped bool
380+
expectedQueue []*WrappedTx
381+
expectedHeap []*WrappedTx
382+
}{
383+
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
384+
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
385+
{
386+
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
387+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
388+
}, false, false, []*WrappedTx{
389+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
390+
}, []*WrappedTx{
391+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
392+
},
393+
},
394+
{
395+
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{
396+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
397+
}, false, true, []*WrappedTx{
398+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
399+
}, []*WrappedTx{
400+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
401+
},
402+
},
403+
{
404+
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{
405+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
406+
}, true, false, []*WrappedTx{
407+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
408+
}, []*WrappedTx{
409+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")},
410+
},
411+
},
412+
{
413+
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
414+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
415+
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 99, tx: []byte("ghi")},
416+
}, true, false, []*WrappedTx{
417+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
418+
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
419+
}, []*WrappedTx{
420+
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
421+
},
422+
},
423+
} {
424+
pq := NewTxPriorityQueue()
425+
for _, e := range test.existing {
426+
pq.PushTx(e)
427+
}
428+
replaced, dropped := pq.TryReplacement(test.tx)
429+
if test.expectedReplaced {
430+
require.NotNil(t, replaced)
431+
} else {
432+
require.Nil(t, replaced)
433+
}
434+
require.Equal(t, test.expectedDropped, dropped)
435+
for i, q := range pq.evmQueue[test.tx.evmAddress] {
436+
require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key())
437+
require.Equal(t, test.expectedQueue[i].priority, q.priority)
438+
require.Equal(t, test.expectedQueue[i].evmNonce, q.evmNonce)
439+
}
440+
for i, q := range pq.txs {
441+
require.Equal(t, test.expectedHeap[i].tx.Key(), q.tx.Key())
442+
require.Equal(t, test.expectedHeap[i].priority, q.priority)
443+
require.Equal(t, test.expectedHeap[i].evmNonce, q.evmNonce)
444+
}
445+
}
446+
}

internal/mempool/reactor_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
167167
secondaryReactor.observePanic = observePanic
168168

169169
firstTx := &WrappedTx{}
170-
primaryMempool.insertTx(firstTx)
170+
primaryMempool.insertTx(firstTx, true)
171171

172172
// run the router
173173
rts.start(ctx, t)
@@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
180180
wg.Add(1)
181181
go func() {
182182
defer wg.Done()
183-
primaryMempool.insertTx(next)
183+
primaryMempool.insertTx(next, true)
184184
}()
185185
}
186186

internal/mempool/tx.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type WrappedTx struct {
7979
// IsBefore returns true if the WrappedTx is before the given WrappedTx
8080
// this applies to EVM transactions only
8181
func (wtx *WrappedTx) IsBefore(tx *WrappedTx) bool {
82-
return wtx.evmNonce < tx.evmNonce || (wtx.evmNonce == tx.evmNonce && wtx.timestamp.Before(tx.timestamp))
82+
return wtx.evmNonce < tx.evmNonce
8383
}
8484

8585
func (wtx *WrappedTx) Size() int {

proto/tendermint/abci/types.proto

+7
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ message ResponseDeliverTx {
308308
repeated Event events = 7
309309
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
310310
string codespace = 8;
311+
EvmTxInfo evm_tx_info = 9;
311312
}
312313

313314
message ResponseEndBlock {
@@ -458,6 +459,7 @@ message ExecTxResult {
458459
repeated Event events = 7
459460
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
460461
string codespace = 8;
462+
EvmTxInfo evm_tx_info = 9;
461463
}
462464

463465
// TxResult contains results of executing the transaction.
@@ -564,6 +566,11 @@ message Evidence {
564566
int64 total_voting_power = 5;
565567
}
566568

569+
message EvmTxInfo {
570+
string senderAddress = 1;
571+
uint64 nonce = 2;
572+
}
573+
567574
//----------------------------------------
568575
// State Sync Types
569576

proto/tendermint/crypto/keys.pb.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/tendermint/mempool/types.pb.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/tendermint/p2p/conn.pb.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/tendermint/types/types.pb.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)