Skip to content

Commit

Permalink
configurable mempool size limits
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 21, 2025
1 parent 67bc6bb commit bea145f
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 26 deletions.
2 changes: 1 addition & 1 deletion app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
d.namespaceManager.Ready()

// Mempool
mp := mempool.New()
mp := mempool.New(d.cfg.Mempool.MaxSize)

// TxAPP
txApp := buildTxApp(ctx, d, db, accounts, vs, e)
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func DefaultConfig() *Config {
BlockProposalInterval: types.Duration(1 * time.Second),
BlockAnnInterval: types.Duration(3 * time.Second),
},
Mempool: MempoolConfig{
MaxSize: 200_000_000, // 200MB
},
Store: StoreConfig{
Compression: true,
},
Expand Down Expand Up @@ -325,6 +328,7 @@ type Config struct {

P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"`
Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"`
Mempool MempoolConfig `toml:"mempool" comment:"Mempool related configuration"`
DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"`
Store StoreConfig `toml:"store" comment:"Block store configuration"`
RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"`
Expand All @@ -337,6 +341,11 @@ type Config struct {
Checkpoint Checkpoint `toml:"checkpoint" comment:"checkpoint info for the leader to sync to before proposing a new block"`
}

type MempoolConfig struct {
// MaxSize is the maximum size of the mempool in bytes.
MaxSize int64 `toml:"max_size" comment:"maximum size of the mempool in bytes"`
}

// PeerConfig corresponds to the [p2p] section of the config.
type PeerConfig struct {
ListenAddress string `toml:"listen" comment:"address in host:port format to listen on for P2P connections"`
Expand Down
6 changes: 5 additions & 1 deletion node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import (
"github.com/stretchr/testify/require"
)

const (
mempoolSz = 200_000_000 // 200MB
)

var (
broadcastFns = BroadcastFns{
ProposalBroadcaster: mockBlockPropBroadcaster,
Expand Down Expand Up @@ -158,7 +162,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) ([]*Config, ma
ceConfigs[i] = &Config{
PrivateKey: privKeys[i],
Leader: pubKeys[0],
Mempool: mempool.New(),
Mempool: mempool.New(mempoolSz),
BlockStore: bs,
BlockProcessor: bp,
// ValidatorSet: validatorSet,
Expand Down
4 changes: 2 additions & 2 deletions node/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type sizedTx struct {

// New creates a new Mempool instance with a default max size of 200MB.
// See also SetMaxSize.
func New() *Mempool {
func New(sz int64) *Mempool {
return &Mempool{
txns: make(map[types.Hash]*sizedTx),
fetching: make(map[types.Hash]bool),
maxSize: 200_000_000, // 200MB
maxSize: sz,
}
}

Expand Down
40 changes: 22 additions & 18 deletions node/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/stretchr/testify/require"
)

const (
mempoolSz = 200_000_000 // 200MB
)

func newTx(nonce uint64, sender string) *ktypes.Transaction {
return &ktypes.Transaction{
Signature: &auth.Signature{},
Expand All @@ -28,7 +32,7 @@ func newTx(nonce uint64, sender string) *ktypes.Transaction {
}

func Test_MempoolRemove(t *testing.T) {
m := New()
m := New(mempoolSz)

// Setup test transactions
tx1 := types.NamedTx{
Expand Down Expand Up @@ -66,7 +70,7 @@ func Test_MempoolRemove(t *testing.T) {
}

func Test_MempoolReapN(t *testing.T) {
m := New()
m := New(mempoolSz)

// Setup test transactions
tx1 := types.NamedTx{
Expand Down Expand Up @@ -127,7 +131,7 @@ func Test_MempoolReapN(t *testing.T) {

func TestMempool_Size(t *testing.T) {
t.Run("size tracking with stored transactions", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)

// Create a test transaction
tx := newTx(1, "A")
Expand All @@ -151,7 +155,7 @@ func TestMempool_Size(t *testing.T) {
})

t.Run("size tracking with multiple transactions", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)

tx1 := newTx(1, "A")
tx2 := newTx(2, "B")
Expand All @@ -173,7 +177,7 @@ func TestMempool_Size(t *testing.T) {

// Test store same txid again, already found
t.Run("size tracking with duplicate txid", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
found, _ := mp.Store(tx1.Hash(), tx1)
require.False(t, found)
Expand All @@ -183,7 +187,7 @@ func TestMempool_Size(t *testing.T) {

// Test Store with a low SetMaxSize
t.Run("size tracking with SetMaxSize", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
mp.SetMaxSize(20)
tx1 := newTx(1, "abcdefghijklmnopqrstuvwxyz")
_, rejected := mp.Store(tx1.Hash(), tx1)
Expand All @@ -192,7 +196,7 @@ func TestMempool_Size(t *testing.T) {
}

func TestMempool_SizeWithRemove(t *testing.T) {
mp := New()
mp := New(mempoolSz)

// Create and store two transactions
tx1 := newTx(1, "A")
Expand Down Expand Up @@ -234,7 +238,7 @@ func TestMempool_SizeWithRemove(t *testing.T) {

func TestMempool_RecheckTxs(t *testing.T) {
t.Run("recheck with all valid transactions", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")

Expand All @@ -255,7 +259,7 @@ func TestMempool_RecheckTxs(t *testing.T) {
})

t.Run("recheck with all invalid transactions", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")

Expand All @@ -276,7 +280,7 @@ func TestMempool_RecheckTxs(t *testing.T) {
})

t.Run("recheck with mixed valid/invalid transactions", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")
tx3 := newTx(3, "C")
Expand Down Expand Up @@ -309,7 +313,7 @@ func TestMempool_RecheckTxs(t *testing.T) {
})

t.Run("with a lot more transactions mixed invalid", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")
tx3 := newTx(3, "C")
Expand Down Expand Up @@ -372,7 +376,7 @@ func TestMempool_RecheckTxs(t *testing.T) {
})

t.Run("recheck with canceled context", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
mp.Store(tx1.Hash(), tx1)

Expand All @@ -396,7 +400,7 @@ func TestMempool_RecheckTxs(t *testing.T) {

func TestMempool_PeekN(t *testing.T) {
t.Run("peek with size limit", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A") // small tx
tx2 := newTx(2, strings.Repeat("B", 1000)) // large tx
tx3 := newTx(3, "C") // small tx
Expand All @@ -418,7 +422,7 @@ func TestMempool_PeekN(t *testing.T) {
})

t.Run("peek with zero size limit", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")

Expand All @@ -432,7 +436,7 @@ func TestMempool_PeekN(t *testing.T) {
})

t.Run("peek with n greater than available txs", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
mp.Store(tx1.Hash(), tx1)

Expand All @@ -442,13 +446,13 @@ func TestMempool_PeekN(t *testing.T) {
})

t.Run("peek with empty mempool", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
txns := mp.PeekN(1, 1000)
assert.Empty(t, txns)
})

t.Run("peek with negative size limit", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
tx2 := newTx(2, "B")

Expand All @@ -462,7 +466,7 @@ func TestMempool_PeekN(t *testing.T) {
})

t.Run("peek with zero n", func(t *testing.T) {
mp := New()
mp := New(mempoolSz)
tx1 := newTx(1, "A")
mp.Store(tx1.Hash(), tx1)

Expand Down
6 changes: 3 additions & 3 deletions node/node_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestSingleNodeMocknet(t *testing.T) {

pk1, h1 := newTestHost(t, mn)
bs1 := memstore.NewMemBS()
mp1 := mempool.New()
mp1 := mempool.New(mempoolSz)
priv1, _ := pk1.Raw()

db1 := initDB(t, "5432", "kwil_test_db")
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestDualNodeMocknet(t *testing.T) {

pk1, h1 := newTestHost(t, mn)
bs1 := memstore.NewMemBS()
mp1 := mempool.New()
mp1 := mempool.New(mempoolSz)

db1 := initDB(t, "5432", "kwil_test_db")
func() {
Expand All @@ -217,7 +217,7 @@ func TestDualNodeMocknet(t *testing.T) {

pk2, h2 := newTestHost(t, mn)
bs2 := memstore.NewMemBS()
mp2 := mempool.New()
mp2 := mempool.New(mempoolSz)
db2 := initDB(t, "5432", "kwil_test_db2") // NOTE: using the same postgres host is a little wild

priv2, _ := pk2.Raw()
Expand Down
6 changes: 5 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const (
mempoolSz = 200_000_000 // 200MB
)

var blackholeIP6 = net.ParseIP("100::")

func newTestHost(t *testing.T, mn mock.Mocknet) (p2pcrypto.PrivKey, host.Host) {
Expand Down Expand Up @@ -110,7 +114,7 @@ func makeTestHosts(t *testing.T, nNodes, nExtraHosts int, blockInterval time.Dur
// DB unused
DBConfig: &defaultConfigSet.DB,
Statesync: &defaultConfigSet.StateSync,
Mempool: mempool.New(),
Mempool: mempool.New(mempoolSz),
BlockStore: bs,
Snapshotter: newSnapshotStore(bs),
Consensus: ce,
Expand Down

0 comments on commit bea145f

Please sign in to comment.