Skip to content

Commit d0a382b

Browse files
committed
Add EVM txs eviction logic
1 parent 530feb8 commit d0a382b

File tree

7 files changed

+203
-43
lines changed

7 files changed

+203
-43
lines changed

config/toml.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ pending-size = {{ .Mempool.PendingSize }}
409409
410410
max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }}
411411
412-
pending-ttl-duration = {{ .Mempool.PendingTTLDuration }}
412+
pending-ttl-duration = "{{ .Mempool.PendingTTLDuration }}"
413413
414414
pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }}
415415

internal/mempool/mempool.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewTxMempool(
127127
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
128128
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
129129
}),
130-
pendingTxs: NewPendingTxs(),
130+
pendingTxs: NewPendingTxs(cfg),
131131
failedCheckTxCounts: map[types.NodeID]uint64{},
132132
peerManager: peerManager,
133133
}
@@ -340,7 +340,9 @@ func (txmp *TxMempool) CheckTx(
340340
return err
341341
}
342342
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))
343-
txmp.pendingTxs.Insert(wtx, res, txInfo)
343+
if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil {
344+
return err
345+
}
344346
}
345347
}
346348

@@ -362,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
362364

363365
// remove the committed transaction from the transaction store and indexes
364366
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
365-
txmp.removeTx(wtx, false)
367+
txmp.removeTx(wtx, false, true)
366368
return nil
367369
}
368370

@@ -401,7 +403,7 @@ func (txmp *TxMempool) Flush() {
401403
txmp.timestampIndex.Reset()
402404

403405
for _, wtx := range txmp.txStore.GetAllTxs() {
404-
txmp.removeTx(wtx, false)
406+
txmp.removeTx(wtx, false, false)
405407
}
406408

407409
atomic.SwapInt64(&txmp.sizeBytes, 0)
@@ -513,7 +515,7 @@ func (txmp *TxMempool) Update(
513515

514516
// remove the committed transaction from the transaction store and indexes
515517
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
516-
txmp.removeTx(wtx, false)
518+
txmp.removeTx(wtx, false, false)
517519
}
518520
}
519521

@@ -634,7 +636,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
634636
// - The transaction, toEvict, can be removed while a concurrent
635637
// reCheckTx callback is being executed for the same transaction.
636638
for _, toEvict := range evictTxs {
637-
txmp.removeTx(toEvict, true)
639+
txmp.removeTx(toEvict, true, true)
638640
txmp.logger.Debug(
639641
"evicted existing good transaction; mempool full",
640642
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
@@ -745,7 +747,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
745747
panic("corrupted reCheckTx cursor")
746748
}
747749

748-
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
750+
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
749751
}
750752
}
751753

@@ -871,13 +873,13 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
871873
return true
872874
}
873875

874-
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
876+
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
875877
if txmp.txStore.IsTxRemoved(wtx.hash) {
876878
return
877879
}
878880

879881
txmp.txStore.RemoveTx(wtx)
880-
txmp.priorityIndex.RemoveTx(wtx)
882+
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
881883
txmp.heightIndex.Remove(wtx)
882884
txmp.timestampIndex.Remove(wtx)
883885

@@ -889,6 +891,20 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
889891
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
890892

891893
wtx.removeHandler(removeFromCache)
894+
895+
if shouldReenqueue {
896+
for _, reenqueue := range toBeReenqueued {
897+
txmp.removeTx(reenqueue, removeFromCache, false)
898+
}
899+
for _, reenqueue := range toBeReenqueued {
900+
rtx := reenqueue.tx
901+
go func() {
902+
if err := txmp.CheckTx(context.Background(), rtx, nil, TxInfo{}); err != nil {
903+
txmp.logger.Error(fmt.Sprintf("failed to reenqueue transaction %X due to %s", rtx.Hash(), err))
904+
}
905+
}()
906+
}
907+
}
892908
}
893909

894910
func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) {
@@ -967,7 +983,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
967983
}
968984

969985
// remove pending txs that have expired
970-
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
986+
txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) {
971987
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))
972988
txmp.expire(blockHeight, wtx)
973989
})

internal/mempool/mempool_test.go

+121-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
// transaction priority based on the value in the key/value pair.
3131
type application struct {
3232
*kvstore.Application
33+
34+
occupiedNonces map[string][]uint64
3335
}
3436

3537
type testTx struct {
@@ -38,6 +40,7 @@ type testTx struct {
3840
}
3941

4042
func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
43+
4144
var (
4245
priority int64
4346
sender string
@@ -58,7 +61,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
5861
GasWanted: 1,
5962
}}, nil
6063
}
61-
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
64+
nonce, err := strconv.ParseUint(string(parts[3]), 10, 64)
6265
if err != nil {
6366
// could not parse
6467
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
@@ -67,15 +70,50 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
6770
GasWanted: 1,
6871
}}, nil
6972
}
73+
if app.occupiedNonces == nil {
74+
app.occupiedNonces = make(map[string][]uint64)
75+
}
76+
if _, exists := app.occupiedNonces[account]; !exists {
77+
app.occupiedNonces[account] = []uint64{}
78+
}
79+
active := true
80+
for i := uint64(0); i < nonce; i++ {
81+
found := false
82+
for _, occ := range app.occupiedNonces[account] {
83+
if occ == i {
84+
found = true
85+
break
86+
}
87+
}
88+
if !found {
89+
active = false
90+
break
91+
}
92+
}
93+
app.occupiedNonces[account] = append(app.occupiedNonces[account], nonce)
7094
return &abci.ResponseCheckTxV2{
7195
ResponseCheckTx: &abci.ResponseCheckTx{
7296
Priority: v,
7397
Code: code.CodeTypeOK,
7498
GasWanted: 1,
7599
},
76-
EVMNonce: uint64(nonce),
77-
EVMSenderAddress: account,
78-
IsEVM: true,
100+
EVMNonce: nonce,
101+
EVMSenderAddress: account,
102+
IsEVM: true,
103+
IsPendingTransaction: !active,
104+
Checker: func() abci.PendingTxCheckerResponse { return abci.Pending },
105+
ExpireTxHandler: func() {
106+
idx := -1
107+
for i, n := range app.occupiedNonces[account] {
108+
if n == nonce {
109+
idx = i
110+
break
111+
}
112+
}
113+
if idx >= 0 {
114+
app.occupiedNonces[account] = append(app.occupiedNonces[account][:idx], app.occupiedNonces[account][idx+1:]...)
115+
}
116+
},
79117
}, nil
80118
}
81119

@@ -470,12 +508,14 @@ func TestTxMempool_Prioritization(t *testing.T) {
470508
txs := [][]byte{
471509
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
472510
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
473-
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
474-
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
475511
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
476512
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
477513
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
478514
}
515+
evmTxs := [][]byte{
516+
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
517+
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
518+
}
479519

480520
// copy the slice of txs and shuffle the order randomly
481521
txsCopy := make([][]byte, len(txs))
@@ -484,6 +524,16 @@ func TestTxMempool_Prioritization(t *testing.T) {
484524
rng.Shuffle(len(txsCopy), func(i, j int) {
485525
txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i]
486526
})
527+
txs = [][]byte{
528+
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
529+
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
530+
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
531+
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
532+
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
533+
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
534+
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
535+
}
536+
txsCopy = append(txsCopy, evmTxs...)
487537

488538
for i := range txsCopy {
489539
require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID}))
@@ -504,6 +554,71 @@ func TestTxMempool_Prioritization(t *testing.T) {
504554
}
505555
}
506556

557+
func TestTxMempool_PendingStoreSize(t *testing.T) {
558+
ctx, cancel := context.WithCancel(context.Background())
559+
defer cancel()
560+
561+
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
562+
if err := client.Start(ctx); err != nil {
563+
t.Fatal(err)
564+
}
565+
t.Cleanup(client.Wait)
566+
567+
txmp := setup(t, client, 100)
568+
txmp.config.PendingSize = 1
569+
peerID := uint16(1)
570+
571+
address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
572+
573+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 1)), nil, TxInfo{SenderID: peerID}))
574+
err := txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 2)), nil, TxInfo{SenderID: peerID})
575+
require.Error(t, err)
576+
require.Contains(t, err.Error(), "mempool pending set is full")
577+
}
578+
579+
func TestTxMempool_EVMEviction(t *testing.T) {
580+
ctx, cancel := context.WithCancel(context.Background())
581+
defer cancel()
582+
583+
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
584+
if err := client.Start(ctx); err != nil {
585+
t.Fatal(err)
586+
}
587+
t.Cleanup(client.Wait)
588+
589+
txmp := setup(t, client, 100)
590+
txmp.config.Size = 1
591+
peerID := uint16(1)
592+
593+
address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
594+
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
595+
596+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 0)), nil, TxInfo{SenderID: peerID}))
597+
// this should evict the previous tx
598+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 2, 0)), nil, TxInfo{SenderID: peerID}))
599+
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
600+
require.Equal(t, int64(2), txmp.priorityIndex.txs[0].priority)
601+
602+
txmp.config.Size = 2
603+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 3, 1)), nil, TxInfo{SenderID: peerID}))
604+
require.Equal(t, 0, txmp.pendingTxs.Size())
605+
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
606+
// this would evict the tx with priority 2 and cause the tx with priority 3 to go pending
607+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 4, 0)), nil, TxInfo{SenderID: peerID}))
608+
time.Sleep(1 * time.Second) // reenqueue is async
609+
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
610+
tx := txmp.priorityIndex.txs[0]
611+
require.Equal(t, 1, txmp.pendingTxs.Size())
612+
613+
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
614+
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
615+
txmp.removeTx(tx, true, false)
616+
// should not reenqueue
617+
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
618+
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
619+
require.Equal(t, 1, txmp.pendingTxs.Size())
620+
}
621+
507622
func TestTxMempool_CheckTxSamePeer(t *testing.T) {
508623
ctx, cancel := context.WithCancel(context.Background())
509624
defer cancel()

internal/mempool/priority_queue.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,11 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
6060
pq.mtx.RLock()
6161
defer pq.mtx.RUnlock()
6262

63-
txs := make([]*WrappedTx, len(pq.txs))
64-
copy(txs, pq.txs)
63+
txs := []*WrappedTx{}
64+
txs = append(txs, pq.txs...)
65+
for _, queue := range pq.evmQueue {
66+
txs = append(txs, queue[1:]...)
67+
}
6568

6669
sort.Slice(txs, func(i, j int) bool {
6770
return txs[i].priority < txs[j].priority
@@ -111,18 +114,19 @@ func (pq *TxPriorityQueue) NumTxs() int {
111114
return len(pq.txs) + pq.numQueuedUnsafe()
112115
}
113116

114-
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
117+
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx int) {
115118
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
116119
for i, t := range queue {
117120
if t.tx.Key() == tx.tx.Key() {
118121
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
119122
if len(pq.evmQueue[tx.evmAddress]) == 0 {
120123
delete(pq.evmQueue, tx.evmAddress)
121124
}
122-
break
125+
return i
123126
}
124127
}
125128
}
129+
return -1
126130
}
127131

128132
func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
@@ -135,21 +139,27 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
135139
}
136140

137141
// RemoveTx removes a specific transaction from the priority queue.
138-
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
142+
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx, shouldReenqueue bool) (toBeReenqueued []*WrappedTx) {
139143
pq.mtx.Lock()
140144
defer pq.mtx.Unlock()
141145

146+
var removedIdx int
147+
142148
if idx, ok := pq.findTxIndexUnsafe(tx); ok {
143149
heap.Remove(pq, idx)
144150
if tx.isEVM {
145-
pq.removeQueuedEvmTxUnsafe(tx)
146-
if len(pq.evmQueue[tx.evmAddress]) > 0 {
151+
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
152+
if !shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 {
147153
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
148154
}
149155
}
150156
} else if tx.isEVM {
151-
pq.removeQueuedEvmTxUnsafe(tx)
157+
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
158+
}
159+
if tx.isEVM && shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 && removedIdx >= 0 {
160+
toBeReenqueued = pq.evmQueue[tx.evmAddress][removedIdx:]
152161
}
162+
return
153163
}
154164

155165
func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {

0 commit comments

Comments
 (0)