Skip to content

Commit 3cc1293

Browse files
authored
Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209)
ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim > Transactions returned are not removed from the mempool transaction > store or indexes. However, they use a priority queue to accomplish the claim > Transaction are retrieved in priority order. This is accomplished by popping all items out of the whole heap, and then pushing then back in sequentially. A copy of the heap cannot be obtained otherwise. Both of the mentioned functions use a read-lock (RLock) when doing this. This results in a potential scenario where multiple executions of the ReapMax can be started in parallel, and both would be popping items out of the priority queue. In practice, this can be abused by executing the `unconfirmed_txs` RPC call repeatedly. Based on our observations, running it multiple times per millisecond results in multiple threads picking it up at the same time. Such a scenario can be obtained via the WebSocket interface, and spamming `unconfirmed_txs` calls there. The behavior that happens is a `Panic in WSJSONRPC handler` when a queue item unexpectedly disappears for `mempool.(*TxPriorityQueue).Swap`. (`runtime error: index out of range [0] with length 0`) This can additionally lead to a `CONSENSUS FAILURE!!!` if the race condition occurs for `internal/consensus.(*State).finalizeCommit` when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but the ReapMax has already removed all elements from the underlying heap. (`runtime error: index out of range [-1]`) This commit switches the lock type to a write-lock (Lock) to ensure no parallel modifications take place. This commit additionally updates the tests to allow parallel execution of the func calls in testing, as to prevent regressions (in case someone wants to downgrade the locks without considering the implications from the underlying heap usage).
1 parent 8061a47 commit 3cc1293

File tree

2 files changed

+66
-34
lines changed

2 files changed

+66
-34
lines changed

internal/mempool/mempool.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ func (txmp *TxMempool) Flush() {
362362
// - Transactions returned are not removed from the mempool transaction
363363
// store or indexes.
364364
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
365-
txmp.mtx.RLock()
366-
defer txmp.mtx.RUnlock()
365+
txmp.mtx.Lock()
366+
defer txmp.mtx.Unlock()
367367

368368
var (
369369
totalGas int64
@@ -417,8 +417,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
417417
// - Transactions returned are not removed from the mempool transaction
418418
// store or indexes.
419419
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
420-
txmp.mtx.RLock()
421-
defer txmp.mtx.RUnlock()
420+
txmp.mtx.Lock()
421+
defer txmp.mtx.Unlock()
422422

423423
numTxs := txmp.priorityIndex.NumTxs()
424424
if max < 0 {

internal/mempool/mempool_test.go

+62-30
Original file line numberDiff line numberDiff line change
@@ -304,27 +304,43 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
304304
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
305305
}
306306

307+
var wg sync.WaitGroup
308+
307309
// reap by gas capacity only
308-
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
309-
ensurePrioritized(reapedTxs)
310-
require.Equal(t, len(tTxs), txmp.Size())
311-
require.Equal(t, int64(5690), txmp.SizeBytes())
312-
require.Len(t, reapedTxs, 50)
310+
wg.Add(1)
311+
go func() {
312+
defer wg.Done()
313+
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
314+
ensurePrioritized(reapedTxs)
315+
require.Equal(t, len(tTxs), txmp.Size())
316+
require.Equal(t, int64(5690), txmp.SizeBytes())
317+
require.Len(t, reapedTxs, 50)
318+
}()
313319

314320
// reap by transaction bytes only
315-
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
316-
ensurePrioritized(reapedTxs)
317-
require.Equal(t, len(tTxs), txmp.Size())
318-
require.Equal(t, int64(5690), txmp.SizeBytes())
319-
require.GreaterOrEqual(t, len(reapedTxs), 16)
321+
wg.Add(1)
322+
go func() {
323+
defer wg.Done()
324+
reapedTxs := txmp.ReapMaxBytesMaxGas(1000, -1)
325+
ensurePrioritized(reapedTxs)
326+
require.Equal(t, len(tTxs), txmp.Size())
327+
require.Equal(t, int64(5690), txmp.SizeBytes())
328+
require.GreaterOrEqual(t, len(reapedTxs), 16)
329+
}()
320330

321331
// Reap by both transaction bytes and gas, where the size yields 31 reaped
322332
// transactions and the gas limit reaps 25 transactions.
323-
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
324-
ensurePrioritized(reapedTxs)
325-
require.Equal(t, len(tTxs), txmp.Size())
326-
require.Equal(t, int64(5690), txmp.SizeBytes())
327-
require.Len(t, reapedTxs, 25)
333+
wg.Add(1)
334+
go func() {
335+
defer wg.Done()
336+
reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30)
337+
ensurePrioritized(reapedTxs)
338+
require.Equal(t, len(tTxs), txmp.Size())
339+
require.Equal(t, int64(5690), txmp.SizeBytes())
340+
require.Len(t, reapedTxs, 25)
341+
}()
342+
343+
wg.Wait()
328344
}
329345

330346
func TestTxMempool_ReapMaxTxs(t *testing.T) {
@@ -363,26 +379,42 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
363379
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
364380
}
365381

382+
var wg sync.WaitGroup
383+
366384
// reap all transactions
367-
reapedTxs := txmp.ReapMaxTxs(-1)
368-
ensurePrioritized(reapedTxs)
369-
require.Equal(t, len(tTxs), txmp.Size())
370-
require.Equal(t, int64(5690), txmp.SizeBytes())
371-
require.Len(t, reapedTxs, len(tTxs))
385+
wg.Add(1)
386+
go func() {
387+
defer wg.Done()
388+
reapedTxs := txmp.ReapMaxTxs(-1)
389+
ensurePrioritized(reapedTxs)
390+
require.Equal(t, len(tTxs), txmp.Size())
391+
require.Equal(t, int64(5690), txmp.SizeBytes())
392+
require.Len(t, reapedTxs, len(tTxs))
393+
}()
372394

373395
// reap a single transaction
374-
reapedTxs = txmp.ReapMaxTxs(1)
375-
ensurePrioritized(reapedTxs)
376-
require.Equal(t, len(tTxs), txmp.Size())
377-
require.Equal(t, int64(5690), txmp.SizeBytes())
378-
require.Len(t, reapedTxs, 1)
396+
wg.Add(1)
397+
go func() {
398+
defer wg.Done()
399+
reapedTxs := txmp.ReapMaxTxs(1)
400+
ensurePrioritized(reapedTxs)
401+
require.Equal(t, len(tTxs), txmp.Size())
402+
require.Equal(t, int64(5690), txmp.SizeBytes())
403+
require.Len(t, reapedTxs, 1)
404+
}()
379405

380406
// reap half of the transactions
381-
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
382-
ensurePrioritized(reapedTxs)
383-
require.Equal(t, len(tTxs), txmp.Size())
384-
require.Equal(t, int64(5690), txmp.SizeBytes())
385-
require.Len(t, reapedTxs, len(tTxs)/2)
407+
wg.Add(1)
408+
go func() {
409+
defer wg.Done()
410+
reapedTxs := txmp.ReapMaxTxs(len(tTxs) / 2)
411+
ensurePrioritized(reapedTxs)
412+
require.Equal(t, len(tTxs), txmp.Size())
413+
require.Equal(t, int64(5690), txmp.SizeBytes())
414+
require.Len(t, reapedTxs, len(tTxs)/2)
415+
}()
416+
417+
wg.Wait()
386418
}
387419

388420
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {

0 commit comments

Comments
 (0)