From bea145f826d97bac81157c42ce131c5a15f0ae83 Mon Sep 17 00:00:00 2001 From: charithabandi Date: Fri, 21 Feb 2025 14:35:23 -0600 Subject: [PATCH] configurable mempool size limits --- app/node/build.go | 2 +- config/config.go | 9 ++++++++ node/consensus/engine_test.go | 6 +++++- node/mempool/mempool.go | 4 ++-- node/mempool/mempool_test.go | 40 +++++++++++++++++++---------------- node/node_live_test.go | 6 +++--- node/node_test.go | 6 +++++- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/app/node/build.go b/app/node/build.go index a9a33c68b..194968b23 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -85,7 +85,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { d.namespaceManager.Ready() // Mempool - mp := mempool.New() + mp := mempool.New(d.cfg.Mempool.MaxSize) // TxAPP txApp := buildTxApp(ctx, d, db, accounts, vs, e) diff --git a/config/config.go b/config/config.go index ea7b1c5ad..53528d584 100644 --- a/config/config.go +++ b/config/config.go @@ -267,6 +267,9 @@ func DefaultConfig() *Config { BlockProposalInterval: types.Duration(1 * time.Second), BlockAnnInterval: types.Duration(3 * time.Second), }, + Mempool: MempoolConfig{ + MaxSize: 200_000_000, // 200MB + }, Store: StoreConfig{ Compression: true, }, @@ -325,6 +328,7 @@ type Config struct { P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"` Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"` + Mempool MempoolConfig `toml:"mempool" comment:"Mempool related configuration"` DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"` Store StoreConfig `toml:"store" comment:"Block store configuration"` RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"` @@ -337,6 +341,11 @@ type Config struct { Checkpoint Checkpoint `toml:"checkpoint" comment:"checkpoint info for the leader to sync to before proposing a new block"` } +type MempoolConfig struct { + // MaxSize is the maximum size of the mempool in bytes. + MaxSize int64 `toml:"max_size" comment:"maximum size of the mempool in bytes"` +} + // PeerConfig corresponds to the [p2p] section of the config. type PeerConfig struct { ListenAddress string `toml:"listen" comment:"address in host:port format to listen on for P2P connections"` diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index ccb3bb208..70c9831b6 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -41,6 +41,10 @@ import ( "github.com/stretchr/testify/require" ) +const ( + mempoolSz = 200_000_000 // 200MB +) + var ( broadcastFns = BroadcastFns{ ProposalBroadcaster: mockBlockPropBroadcaster, @@ -158,7 +162,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) ([]*Config, ma ceConfigs[i] = &Config{ PrivateKey: privKeys[i], Leader: pubKeys[0], - Mempool: mempool.New(), + Mempool: mempool.New(mempoolSz), BlockStore: bs, BlockProcessor: bp, // ValidatorSet: validatorSet, diff --git a/node/mempool/mempool.go b/node/mempool/mempool.go index b188f5615..adc442140 100644 --- a/node/mempool/mempool.go +++ b/node/mempool/mempool.go @@ -30,11 +30,11 @@ type sizedTx struct { // New creates a new Mempool instance with a default max size of 200MB. // See also SetMaxSize. -func New() *Mempool { +func New(sz int64) *Mempool { return &Mempool{ txns: make(map[types.Hash]*sizedTx), fetching: make(map[types.Hash]bool), - maxSize: 200_000_000, // 200MB + maxSize: sz, } } diff --git a/node/mempool/mempool_test.go b/node/mempool/mempool_test.go index 9101291e8..1376bfd2e 100644 --- a/node/mempool/mempool_test.go +++ b/node/mempool/mempool_test.go @@ -14,6 +14,10 @@ import ( "github.com/stretchr/testify/require" ) +const ( + mempoolSz = 200_000_000 // 200MB +) + func newTx(nonce uint64, sender string) *ktypes.Transaction { return &ktypes.Transaction{ Signature: &auth.Signature{}, @@ -28,7 +32,7 @@ func newTx(nonce uint64, sender string) *ktypes.Transaction { } func Test_MempoolRemove(t *testing.T) { - m := New() + m := New(mempoolSz) // Setup test transactions tx1 := types.NamedTx{ @@ -66,7 +70,7 @@ func Test_MempoolRemove(t *testing.T) { } func Test_MempoolReapN(t *testing.T) { - m := New() + m := New(mempoolSz) // Setup test transactions tx1 := types.NamedTx{ @@ -127,7 +131,7 @@ func Test_MempoolReapN(t *testing.T) { func TestMempool_Size(t *testing.T) { t.Run("size tracking with stored transactions", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) // Create a test transaction tx := newTx(1, "A") @@ -151,7 +155,7 @@ func TestMempool_Size(t *testing.T) { }) t.Run("size tracking with multiple transactions", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") @@ -173,7 +177,7 @@ func TestMempool_Size(t *testing.T) { // Test store same txid again, already found t.Run("size tracking with duplicate txid", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") found, _ := mp.Store(tx1.Hash(), tx1) require.False(t, found) @@ -183,7 +187,7 @@ func TestMempool_Size(t *testing.T) { // Test Store with a low SetMaxSize t.Run("size tracking with SetMaxSize", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) mp.SetMaxSize(20) tx1 := newTx(1, "abcdefghijklmnopqrstuvwxyz") _, rejected := mp.Store(tx1.Hash(), tx1) @@ -192,7 +196,7 @@ func TestMempool_Size(t *testing.T) { } func TestMempool_SizeWithRemove(t *testing.T) { - mp := New() + mp := New(mempoolSz) // Create and store two transactions tx1 := newTx(1, "A") @@ -234,7 +238,7 @@ func TestMempool_SizeWithRemove(t *testing.T) { func TestMempool_RecheckTxs(t *testing.T) { t.Run("recheck with all valid transactions", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") @@ -255,7 +259,7 @@ func TestMempool_RecheckTxs(t *testing.T) { }) t.Run("recheck with all invalid transactions", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") @@ -276,7 +280,7 @@ func TestMempool_RecheckTxs(t *testing.T) { }) t.Run("recheck with mixed valid/invalid transactions", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") tx3 := newTx(3, "C") @@ -309,7 +313,7 @@ func TestMempool_RecheckTxs(t *testing.T) { }) t.Run("with a lot more transactions mixed invalid", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") tx3 := newTx(3, "C") @@ -372,7 +376,7 @@ func TestMempool_RecheckTxs(t *testing.T) { }) t.Run("recheck with canceled context", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") mp.Store(tx1.Hash(), tx1) @@ -396,7 +400,7 @@ func TestMempool_RecheckTxs(t *testing.T) { func TestMempool_PeekN(t *testing.T) { t.Run("peek with size limit", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") // small tx tx2 := newTx(2, strings.Repeat("B", 1000)) // large tx tx3 := newTx(3, "C") // small tx @@ -418,7 +422,7 @@ func TestMempool_PeekN(t *testing.T) { }) t.Run("peek with zero size limit", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") @@ -432,7 +436,7 @@ func TestMempool_PeekN(t *testing.T) { }) t.Run("peek with n greater than available txs", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") mp.Store(tx1.Hash(), tx1) @@ -442,13 +446,13 @@ func TestMempool_PeekN(t *testing.T) { }) t.Run("peek with empty mempool", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) txns := mp.PeekN(1, 1000) assert.Empty(t, txns) }) t.Run("peek with negative size limit", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") tx2 := newTx(2, "B") @@ -462,7 +466,7 @@ func TestMempool_PeekN(t *testing.T) { }) t.Run("peek with zero n", func(t *testing.T) { - mp := New() + mp := New(mempoolSz) tx1 := newTx(1, "A") mp.Store(tx1.Hash(), tx1) diff --git a/node/node_live_test.go b/node/node_live_test.go index c8646eccb..6f5be2429 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -51,7 +51,7 @@ func TestSingleNodeMocknet(t *testing.T) { pk1, h1 := newTestHost(t, mn) bs1 := memstore.NewMemBS() - mp1 := mempool.New() + mp1 := mempool.New(mempoolSz) priv1, _ := pk1.Raw() db1 := initDB(t, "5432", "kwil_test_db") @@ -199,7 +199,7 @@ func TestDualNodeMocknet(t *testing.T) { pk1, h1 := newTestHost(t, mn) bs1 := memstore.NewMemBS() - mp1 := mempool.New() + mp1 := mempool.New(mempoolSz) db1 := initDB(t, "5432", "kwil_test_db") func() { @@ -217,7 +217,7 @@ func TestDualNodeMocknet(t *testing.T) { pk2, h2 := newTestHost(t, mn) bs2 := memstore.NewMemBS() - mp2 := mempool.New() + mp2 := mempool.New(mempoolSz) db2 := initDB(t, "5432", "kwil_test_db2") // NOTE: using the same postgres host is a little wild priv2, _ := pk2.Raw() diff --git a/node/node_test.go b/node/node_test.go index 6849ea74b..beebc6a4a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -35,6 +35,10 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +const ( + mempoolSz = 200_000_000 // 200MB +) + var blackholeIP6 = net.ParseIP("100::") func newTestHost(t *testing.T, mn mock.Mocknet) (p2pcrypto.PrivKey, host.Host) { @@ -110,7 +114,7 @@ func makeTestHosts(t *testing.T, nNodes, nExtraHosts int, blockInterval time.Dur // DB unused DBConfig: &defaultConfigSet.DB, Statesync: &defaultConfigSet.StateSync, - Mempool: mempool.New(), + Mempool: mempool.New(mempoolSz), BlockStore: bs, Snapshotter: newSnapshotStore(bs), Consensus: ce,