Skip to content

Commit d6cf26b

Browse files
committed
adjust locking for replacement
1 parent 124468c commit d6cf26b

File tree

4 files changed

+42
-26
lines changed

4 files changed

+42
-26
lines changed

internal/mempool/mempool.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -665,19 +665,11 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
665665
txInfo.SenderID: {},
666666
}
667667

668-
replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx)
669-
if shouldDrop {
668+
if txmp.isInMempool(wtx.tx) {
670669
return nil
671670
}
672671

673-
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
674-
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
675-
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
676-
677-
if replaced != nil {
678-
txmp.removeTx(replaced, true, false, false)
679-
}
680-
if txmp.insertTx(wtx, replaced == nil) {
672+
if txmp.insertTx(wtx) {
681673
txmp.logger.Debug(
682674
"inserted good transaction",
683675
"priority", wtx.priority,
@@ -688,6 +680,10 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
688680
txmp.notifyTxsAvailable()
689681
}
690682

683+
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
684+
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
685+
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
686+
691687
return nil
692688
}
693689

@@ -871,15 +867,16 @@ func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
871867
return nil
872868
}
873869

874-
func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool {
875-
if txmp.isInMempool(wtx.tx) {
870+
func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
871+
replacedTx, inserted := txmp.priorityIndex.PushTx(wtx)
872+
if !inserted {
876873
return false
877874
}
875+
if replacedTx != nil {
876+
txmp.removeTx(replacedTx, true, false, false)
877+
}
878878

879879
txmp.txStore.SetTx(wtx)
880-
if updatePriorityIndex {
881-
txmp.priorityIndex.PushTx(wtx)
882-
}
883880
txmp.heightIndex.Insert(wtx)
884881
txmp.timestampIndex.Insert(wtx)
885882

internal/mempool/priority_queue.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,10 @@ func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx,
7171
return nil, -1
7272
}
7373

74-
func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
74+
func (pq *TxPriorityQueue) tryReplacementUnsafe(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) {
7575
if !tx.isEVM {
7676
return nil, false
7777
}
78-
pq.mtx.Lock()
79-
defer pq.mtx.Unlock()
8078
queue, ok := pq.evmQueue[tx.evmAddress]
8179
if ok && len(queue) > 0 {
8280
existing, idx := pq.getTxWithSameNonceUnsafe(tx)
@@ -338,11 +336,25 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
338336
//}
339337

340338
// PushTx adds a valid transaction to the priority queue. It is thread safe.
341-
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
339+
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) (*WrappedTx, bool) {
342340
pq.mtx.Lock()
343341
defer pq.mtx.Unlock()
344342

343+
replacedTx, shouldDrop := pq.tryReplacementUnsafe(tx)
344+
345+
// tx was not inserted, and nothing was replaced
346+
if shouldDrop {
347+
return nil, false
348+
}
349+
350+
// tx replaced an existing transaction
351+
if replacedTx != nil {
352+
return replacedTx, true
353+
}
354+
355+
// tx was not inserted yet, so insert it
345356
pq.pushTxUnsafe(tx)
357+
return nil, true
346358
}
347359

348360
func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {

internal/mempool/priority_queue_test.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) {
7070
{sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9},
7171
{sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7},
7272
},
73-
expectedOutput: []int64{1, 2, 3},
73+
expectedOutput: []int64{1, 3},
7474
},
7575
{
7676
name: "PriorityWithEVMAndNonEVMDuplicateNonce",
@@ -380,17 +380,23 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
380380
expectedQueue []*WrappedTx
381381
expectedHeap []*WrappedTx
382382
}{
383-
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
384-
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}},
383+
// non-evm transaction is inserted into empty queue
384+
{&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: false}}, []*WrappedTx{{isEVM: false}}},
385+
// evm transaction is inserted into empty queue
386+
{&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}},
387+
// evm transaction (new nonce) is inserted into queue with existing tx (lower nonce)
385388
{
386389
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{
387390
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
388391
}, false, false, []*WrappedTx{
389392
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
393+
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
390394
}, []*WrappedTx{
391395
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
396+
{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")},
392397
},
393398
},
399+
// evm transaction (new nonce) is not inserted because it's a duplicate nonce and same priority
394400
{
395401
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{
396402
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
@@ -400,6 +406,7 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
400406
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
401407
},
402408
},
409+
// evm transaction (new nonce) replaces the existing nonce transaction because its priority is higher
403410
{
404411
&WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{
405412
{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")},
@@ -425,13 +432,13 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) {
425432
for _, e := range test.existing {
426433
pq.PushTx(e)
427434
}
428-
replaced, dropped := pq.TryReplacement(test.tx)
435+
replaced, inserted := pq.PushTx(test.tx)
429436
if test.expectedReplaced {
430437
require.NotNil(t, replaced)
431438
} else {
432439
require.Nil(t, replaced)
433440
}
434-
require.Equal(t, test.expectedDropped, dropped)
441+
require.Equal(t, test.expectedDropped, !inserted)
435442
for i, q := range pq.evmQueue[test.tx.evmAddress] {
436443
require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key())
437444
require.Equal(t, test.expectedQueue[i].priority, q.priority)

internal/mempool/reactor_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
167167
secondaryReactor.observePanic = observePanic
168168

169169
firstTx := &WrappedTx{}
170-
primaryMempool.insertTx(firstTx, true)
170+
primaryMempool.insertTx(firstTx)
171171

172172
// run the router
173173
rts.start(ctx, t)
@@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
180180
wg.Add(1)
181181
go func() {
182182
defer wg.Done()
183-
primaryMempool.insertTx(next, true)
183+
primaryMempool.insertTx(next)
184184
}()
185185
}
186186

0 commit comments

Comments
 (0)