@@ -43,6 +43,9 @@ type TxMempool struct {
43
43
// sizeBytes defines the total size of the mempool (sum of all tx bytes)
44
44
sizeBytes int64
45
45
46
+ // pendingSizeBytes defines the total size of the pending set (sum of all tx bytes)
47
+ pendingSizeBytes int64
48
+
46
49
// cache defines a fixed-size cache of already seen transactions as this
47
50
// reduces pressure on the proxyApp.
48
51
cache TxCache
@@ -177,9 +180,11 @@ func (txmp *TxMempool) Unlock() {
177
180
// Size returns the number of valid transactions in the mempool. It is
178
181
// thread-safe.
179
182
func (txmp * TxMempool ) Size () int {
180
- txSize := txmp .txStore .Size ()
181
- pendingSize := txmp .pendingTxs .Size ()
182
- return txSize + pendingSize
183
+ return txmp .SizeWithoutPending () + txmp .PendingSize ()
184
+ }
185
+
186
+ func (txmp * TxMempool ) SizeWithoutPending () int {
187
+ return txmp .txStore .Size ()
183
188
}
184
189
185
190
// PendingSize returns the number of pending transactions in the mempool.
@@ -193,6 +198,10 @@ func (txmp *TxMempool) SizeBytes() int64 {
193
198
return atomic .LoadInt64 (& txmp .sizeBytes )
194
199
}
195
200
201
+ func (txmp * TxMempool ) PendingSizeBytes () int64 {
202
+ return atomic .LoadInt64 (& txmp .pendingSizeBytes )
203
+ }
204
+
196
205
// FlushAppConn executes FlushSync on the mempool's proxyAppConn.
197
206
//
198
207
// NOTE: The caller must obtain a write-lock prior to execution.
@@ -326,6 +335,11 @@ func (txmp *TxMempool) CheckTx(
326
335
if res .Checker == nil {
327
336
return errors .New ("no checker available for pending transaction" )
328
337
}
338
+ if err := txmp .canAddPendingTx (wtx ); err != nil {
339
+ // TODO: eviction strategy for pending transactions
340
+ return err
341
+ }
342
+ atomic .AddInt64 (& txmp .pendingSizeBytes , int64 (wtx .Size ()))
329
343
txmp .pendingTxs .Insert (wtx , res , txInfo )
330
344
}
331
345
}
@@ -410,7 +424,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
410
424
)
411
425
412
426
var txs []types.Tx
413
- if uint64 (txmp .Size ()) < txmp .config .TxNotifyThreshold {
427
+ if uint64 (txmp .SizeWithoutPending ()) < txmp .config .TxNotifyThreshold {
414
428
// do not reap anything if threshold is not met
415
429
return txs
416
430
}
@@ -522,7 +536,7 @@ func (txmp *TxMempool) Update(
522
536
}
523
537
}
524
538
525
- txmp .metrics .Size .Set (float64 (txmp .Size ()))
539
+ txmp .metrics .Size .Set (float64 (txmp .SizeWithoutPending ()))
526
540
txmp .metrics .PendingSize .Set (float64 (txmp .PendingSize ()))
527
541
return nil
528
542
}
@@ -640,7 +654,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
640
654
}
641
655
642
656
txmp .metrics .TxSizeBytes .Observe (float64 (wtx .Size ()))
643
- txmp .metrics .Size .Set (float64 (txmp .Size ()))
657
+ txmp .metrics .Size .Set (float64 (txmp .SizeWithoutPending ()))
644
658
txmp .metrics .PendingSize .Set (float64 (txmp .PendingSize ()))
645
659
646
660
if txmp .insertTx (wtx ) {
@@ -649,7 +663,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
649
663
"priority" , wtx .priority ,
650
664
"tx" , fmt .Sprintf ("%X" , wtx .tx .Hash ()),
651
665
"height" , txmp .height ,
652
- "num_txs" , txmp .Size (),
666
+ "num_txs" , txmp .SizeWithoutPending (),
653
667
)
654
668
txmp .notifyTxsAvailable ()
655
669
}
@@ -745,12 +759,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
745
759
if txmp .recheckCursor == nil {
746
760
txmp .logger .Debug ("finished rechecking transactions" )
747
761
748
- if txmp .Size () > 0 {
762
+ if txmp .SizeWithoutPending () > 0 {
749
763
txmp .notifyTxsAvailable ()
750
764
}
751
765
}
752
766
753
- txmp .metrics .Size .Set (float64 (txmp .Size ()))
767
+ txmp .metrics .Size .Set (float64 (txmp .SizeWithoutPending ()))
754
768
txmp .metrics .PendingSize .Set (float64 (txmp .PendingSize ()))
755
769
}
756
770
@@ -803,7 +817,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
803
817
// the transaction can be inserted into the mempool.
804
818
func (txmp * TxMempool ) canAddTx (wtx * WrappedTx ) error {
805
819
var (
806
- numTxs = txmp .Size ()
820
+ numTxs = txmp .SizeWithoutPending ()
807
821
sizeBytes = txmp .SizeBytes ()
808
822
)
809
823
@@ -819,6 +833,24 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
819
833
return nil
820
834
}
821
835
836
+ func (txmp * TxMempool ) canAddPendingTx (wtx * WrappedTx ) error {
837
+ var (
838
+ numTxs = txmp .PendingSize ()
839
+ sizeBytes = txmp .PendingSizeBytes ()
840
+ )
841
+
842
+ if numTxs >= txmp .config .PendingSize || int64 (wtx .Size ())+ sizeBytes > txmp .config .MaxPendingTxsBytes {
843
+ return types.ErrMempoolPendingIsFull {
844
+ NumTxs : numTxs ,
845
+ MaxTxs : txmp .config .PendingSize ,
846
+ TxsBytes : sizeBytes ,
847
+ MaxTxsBytes : txmp .config .MaxPendingTxsBytes ,
848
+ }
849
+ }
850
+
851
+ return nil
852
+ }
853
+
822
854
func (txmp * TxMempool ) insertTx (wtx * WrappedTx ) bool {
823
855
if txmp .isInMempool (wtx .tx ) {
824
856
return false
@@ -935,13 +967,14 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
935
967
}
936
968
937
969
// remove pending txs that have expired
938
- txmp .pendingTxs .PurgeExpired (txmp .config .TTLNumBlocks , blockHeight , txmp .config .TTLDuration , now , func (wtx * WrappedTx ) {
970
+ txmp .pendingTxs .PurgeExpired (txmp .config .PendingTTLNumBlocks , blockHeight , txmp .config .PendingTTLDuration , now , func (wtx * WrappedTx ) {
971
+ atomic .AddInt64 (& txmp .pendingSizeBytes , int64 (- wtx .Size ()))
939
972
txmp .expire (blockHeight , wtx )
940
973
})
941
974
}
942
975
943
976
func (txmp * TxMempool ) notifyTxsAvailable () {
944
- if txmp .Size () == 0 {
977
+ if txmp .SizeWithoutPending () == 0 {
945
978
return
946
979
}
947
980
@@ -979,12 +1012,14 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
979
1012
func (txmp * TxMempool ) handlePendingTransactions () {
980
1013
accepted , rejected := txmp .pendingTxs .EvaluatePendingTransactions ()
981
1014
for _ , tx := range accepted {
1015
+ atomic .AddInt64 (& txmp .pendingSizeBytes , int64 (- tx .tx .Size ()))
982
1016
if err := txmp .addNewTransaction (tx .tx , tx .checkTxResponse .ResponseCheckTx , tx .txInfo ); err != nil {
983
1017
txmp .logger .Error (fmt .Sprintf ("error adding pending transaction: %s" , err ))
984
1018
}
985
1019
}
986
- if ! txmp .config .KeepInvalidTxsInCache {
987
- for _ , tx := range rejected {
1020
+ for _ , tx := range rejected {
1021
+ atomic .AddInt64 (& txmp .pendingSizeBytes , int64 (- tx .tx .Size ()))
1022
+ if ! txmp .config .KeepInvalidTxsInCache {
988
1023
tx .tx .removeHandler (true )
989
1024
}
990
1025
}
0 commit comments