Skip to content

Commit 79fd60f

Browse files
stevenlanderssigv
andauthored
Add heapIndex with safety check (#213)
* add heapIndex with safety check * cleanup * comment out for perf test * add back perf improvement * fix nil test * Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209) ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim > Transactions returned are not removed from the mempool transaction > store or indexes. However, they use a priority queue to accomplish the claim > Transaction are retrieved in priority order. This is accomplished by popping all items out of the whole heap, and then pushing then back in sequentially. A copy of the heap cannot be obtained otherwise. Both of the mentioned functions use a read-lock (RLock) when doing this. This results in a potential scenario where multiple executions of the ReapMax can be started in parallel, and both would be popping items out of the priority queue. In practice, this can be abused by executing the `unconfirmed_txs` RPC call repeatedly. Based on our observations, running it multiple times per millisecond results in multiple threads picking it up at the same time. Such a scenario can be obtained via the WebSocket interface, and spamming `unconfirmed_txs` calls there. The behavior that happens is a `Panic in WSJSONRPC handler` when a queue item unexpectedly disappears for `mempool.(*TxPriorityQueue).Swap`. (`runtime error: index out of range [0] with length 0`) This can additionally lead to a `CONSENSUS FAILURE!!!` if the race condition occurs for `internal/consensus.(*State).finalizeCommit` when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but the ReapMax has already removed all elements from the underlying heap. (`runtime error: index out of range [-1]`) This commit switches the lock type to a write-lock (Lock) to ensure no parallel modifications take place. This commit additionally updates the tests to allow parallel execution of the func calls in testing, as to prevent regressions (in case someone wants to downgrade the locks without considering the implications from the underlying heap usage). --------- Co-authored-by: Valters Jansons <sigv@users.noreply.github.com>
1 parent 77ad5bd commit 79fd60f

File tree

3 files changed

+86
-35
lines changed

3 files changed

+86
-35
lines changed

internal/mempool/mempool.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,8 @@ func (txmp *TxMempool) Flush() {
417417
// - Transactions returned are not removed from the mempool transaction
418418
// store or indexes.
419419
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
420-
txmp.mtx.RLock()
421-
defer txmp.mtx.RUnlock()
420+
txmp.mtx.Lock()
421+
defer txmp.mtx.Unlock()
422422

423423
var (
424424
totalGas int64
@@ -458,8 +458,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
458458
// - Transactions returned are not removed from the mempool transaction
459459
// store or indexes.
460460
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
461-
txmp.mtx.RLock()
462-
defer txmp.mtx.RUnlock()
461+
txmp.mtx.Lock()
462+
defer txmp.mtx.Unlock()
463463

464464
wTxs := txmp.priorityIndex.PeekTxs(max)
465465
txs := make([]types.Tx, 0, len(wTxs))

internal/mempool/mempool_test.go

+62-30
Original file line numberDiff line numberDiff line change
@@ -379,27 +379,43 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
379379
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
380380
}
381381

382+
var wg sync.WaitGroup
383+
382384
// reap by gas capacity only
383-
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
384-
ensurePrioritized(reapedTxs)
385-
require.Equal(t, len(tTxs), txmp.Size())
386-
require.Equal(t, int64(5690), txmp.SizeBytes())
387-
require.Len(t, reapedTxs, 50)
385+
wg.Add(1)
386+
go func() {
387+
defer wg.Done()
388+
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
389+
ensurePrioritized(reapedTxs)
390+
require.Equal(t, len(tTxs), txmp.Size())
391+
require.Equal(t, int64(5690), txmp.SizeBytes())
392+
require.Len(t, reapedTxs, 50)
393+
}()
388394

389395
// reap by transaction bytes only
390-
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
391-
ensurePrioritized(reapedTxs)
392-
require.Equal(t, len(tTxs), txmp.Size())
393-
require.Equal(t, int64(5690), txmp.SizeBytes())
394-
require.GreaterOrEqual(t, len(reapedTxs), 16)
396+
wg.Add(1)
397+
go func() {
398+
defer wg.Done()
399+
reapedTxs := txmp.ReapMaxBytesMaxGas(1000, -1)
400+
ensurePrioritized(reapedTxs)
401+
require.Equal(t, len(tTxs), txmp.Size())
402+
require.Equal(t, int64(5690), txmp.SizeBytes())
403+
require.GreaterOrEqual(t, len(reapedTxs), 16)
404+
}()
395405

396406
// Reap by both transaction bytes and gas, where the size yields 31 reaped
397407
// transactions and the gas limit reaps 25 transactions.
398-
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
399-
ensurePrioritized(reapedTxs)
400-
require.Equal(t, len(tTxs), txmp.Size())
401-
require.Equal(t, int64(5690), txmp.SizeBytes())
402-
require.Len(t, reapedTxs, 25)
408+
wg.Add(1)
409+
go func() {
410+
defer wg.Done()
411+
reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30)
412+
ensurePrioritized(reapedTxs)
413+
require.Equal(t, len(tTxs), txmp.Size())
414+
require.Equal(t, int64(5690), txmp.SizeBytes())
415+
require.Len(t, reapedTxs, 25)
416+
}()
417+
418+
wg.Wait()
403419
}
404420

405421
func TestTxMempool_ReapMaxTxs(t *testing.T) {
@@ -438,26 +454,42 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
438454
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
439455
}
440456

457+
var wg sync.WaitGroup
458+
441459
// reap all transactions
442-
reapedTxs := txmp.ReapMaxTxs(-1)
443-
ensurePrioritized(reapedTxs)
444-
require.Equal(t, len(tTxs), txmp.Size())
445-
require.Equal(t, int64(5690), txmp.SizeBytes())
446-
require.Len(t, reapedTxs, len(tTxs))
460+
wg.Add(1)
461+
go func() {
462+
defer wg.Done()
463+
reapedTxs := txmp.ReapMaxTxs(-1)
464+
ensurePrioritized(reapedTxs)
465+
require.Equal(t, len(tTxs), txmp.Size())
466+
require.Equal(t, int64(5690), txmp.SizeBytes())
467+
require.Len(t, reapedTxs, len(tTxs))
468+
}()
447469

448470
// reap a single transaction
449-
reapedTxs = txmp.ReapMaxTxs(1)
450-
ensurePrioritized(reapedTxs)
451-
require.Equal(t, len(tTxs), txmp.Size())
452-
require.Equal(t, int64(5690), txmp.SizeBytes())
453-
require.Len(t, reapedTxs, 1)
471+
wg.Add(1)
472+
go func() {
473+
defer wg.Done()
474+
reapedTxs := txmp.ReapMaxTxs(1)
475+
ensurePrioritized(reapedTxs)
476+
require.Equal(t, len(tTxs), txmp.Size())
477+
require.Equal(t, int64(5690), txmp.SizeBytes())
478+
require.Len(t, reapedTxs, 1)
479+
}()
454480

455481
// reap half of the transactions
456-
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
457-
ensurePrioritized(reapedTxs)
458-
require.Equal(t, len(tTxs), txmp.Size())
459-
require.Equal(t, int64(5690), txmp.SizeBytes())
460-
require.Len(t, reapedTxs, len(tTxs)/2)
482+
wg.Add(1)
483+
go func() {
484+
defer wg.Done()
485+
reapedTxs := txmp.ReapMaxTxs(len(tTxs) / 2)
486+
ensurePrioritized(reapedTxs)
487+
require.Equal(t, len(tTxs), txmp.Size())
488+
require.Equal(t, int64(5690), txmp.SizeBytes())
489+
require.Len(t, reapedTxs, len(tTxs)/2)
490+
}()
491+
492+
wg.Wait()
461493
}
462494

463495
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {

internal/mempool/priority_queue.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx in
178178
}
179179

180180
func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
181+
// safety check for race situation where heapIndex is out of range of txs
182+
if tx.heapIndex >= 0 && tx.heapIndex < len(pq.txs) && pq.txs[tx.heapIndex].tx.Key() == tx.tx.Key() {
183+
return tx.heapIndex, true
184+
}
185+
186+
// heap index isn't trustable here, so attempt to find it
181187
for i, t := range pq.txs {
182188
if t.tx.Key() == tx.tx.Key() {
183189
return i, true
@@ -443,7 +449,9 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
443449
//
444450
// NOTE: A caller should never call Push. Use PushTx instead.
445451
func (pq *TxPriorityQueue) Push(x interface{}) {
452+
n := len(pq.txs)
446453
item := x.(*WrappedTx)
454+
item.heapIndex = n
447455
pq.txs = append(pq.txs, item)
448456
}
449457

@@ -454,7 +462,8 @@ func (pq *TxPriorityQueue) Pop() interface{} {
454462
old := pq.txs
455463
n := len(old)
456464
item := old[n-1]
457-
old[n-1] = nil // avoid memory leak
465+
old[n-1] = nil // avoid memory leak
466+
setHeapIndex(item, -1) // for safety
458467
pq.txs = old[0 : n-1]
459468
return item
460469
}
@@ -483,4 +492,14 @@ func (pq *TxPriorityQueue) Less(i, j int) bool {
483492
// Swap implements the Heap interface. It swaps two transactions in the queue.
484493
func (pq *TxPriorityQueue) Swap(i, j int) {
485494
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
495+
setHeapIndex(pq.txs[i], i)
496+
setHeapIndex(pq.txs[j], j)
497+
}
498+
499+
func setHeapIndex(tx *WrappedTx, i int) {
500+
// a removed tx can be nil
501+
if tx == nil {
502+
return
503+
}
504+
tx.heapIndex = i
486505
}

0 commit comments

Comments
 (0)