Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crash recovery and other minor fixes #1173

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
migrator := buildMigrator(d, db, accounts, vs)

// BlockProcessor
bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss, es, migrator)
bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss, es, migrator, bs)

// Consensus
ce := buildConsensusEngine(ctx, d, db, mp, bs, bp, valSet)
Expand Down Expand Up @@ -357,10 +357,10 @@ func buildTxApp(ctx context.Context, d *coreDependencies, db *pg.DB, accounts *a
return txapp
}

func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore, es *voting.EventStore, migrator *migrations.Migrator) *blockprocessor.BlockProcessor {
func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore, es *voting.EventStore, migrator *migrations.Migrator, bs *store.BlockStore) *blockprocessor.BlockProcessor {
signer := auth.GetNodeSigner(d.privKey)

bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, es, migrator, d.genesisCfg, signer, d.logger.New("BP"))
bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, es, migrator, bs, d.genesisCfg, signer, d.logger.New("BP"))
if err != nil {
failBuild(err, "failed to create block processor")
}
Expand Down Expand Up @@ -409,16 +409,18 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB,
}

ceCfg := &consensus.Config{
PrivateKey: d.privKey,
Leader: leaderPubKey,
DB: db,
BlockStore: bs,
BlockProcessor: bp,
Mempool: mempool,
ValidatorSet: valSet,
Logger: d.logger.New("CONS"),
ProposeTimeout: time.Duration(d.cfg.Consensus.ProposeTimeout),
GenesisHeight: d.genesisCfg.InitialHeight,
PrivateKey: d.privKey,
Leader: leaderPubKey,
DB: db,
BlockStore: bs,
BlockProcessor: bp,
Mempool: mempool,
ValidatorSet: valSet,
Logger: d.logger.New("CONS"),
ProposeTimeout: time.Duration(d.cfg.Consensus.ProposeTimeout),
BlockProposalInterval: time.Duration(d.cfg.Consensus.BlockProposalInterval),
BlockAnnInterval: time.Duration(d.cfg.Consensus.BlockAnnInterval),
GenesisHeight: d.genesisCfg.InitialHeight,
}

ce := consensus.New(ceCfg)
Expand Down
19 changes: 15 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ func DefaultConfig() *Config {
BootNodes: []string{},
},
Consensus: ConsensusConfig{
ProposeTimeout: Duration(1000 * time.Millisecond),
MaxBlockSize: 50_000_000,
MaxTxsPerBlock: 20_000,
ProposeTimeout: Duration(1000 * time.Millisecond),
MaxBlockSize: 50_000_000,
MaxTxsPerBlock: 20_000,
BlockProposalInterval: Duration(1 * time.Second),
BlockAnnInterval: Duration(3 * time.Second),
},
DB: DBConfig{
Host: "127.0.0.1",
Expand Down Expand Up @@ -260,7 +262,16 @@ type ConsensusConfig struct {
ProposeTimeout Duration `toml:"propose_timeout" comment:"timeout for proposing a block (applies to leader)"`
MaxBlockSize uint64 `toml:"max_block_size" comment:"max size of a block in bytes"`
MaxTxsPerBlock uint64 `toml:"max_txs_per_block" comment:"max number of transactions per block"`
// ? reannounce intervals?
// reannounce intervals

// BlockProposalInterval is the interval between block proposal reannouncements by the leader.
// This impacts the time it takes for an out-of-sync validator to receive the current block proposal,
// thereby impacting the block times. Default is 1 second.
BlockProposalInterval Duration `toml:"block_proposal_interval" comment:"interval between block proposal reannouncements by the leader"`
// BlockAnnInterval is the frequency with which the block commit messages are reannouncements by the leader,
// and votes reannounced by validators. Default is 3 second. This impacts the time it takes for an
// out-of-sync nodes to catch up with the latest block.
BlockAnnInterval Duration `toml:"block_ann_interval" comment:"interval between block commit reannouncements by the leader, and votes reannouncements by validators"`
}

type RPCConfig struct {
Expand Down
22 changes: 11 additions & 11 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ const (
var ErrNotFound = errors.New("not found")

type BlockHeader struct {
Version uint16
Height int64
NumTxns uint32
PrevHash Hash // previous block's hash
PrevAppHash Hash // app hash after last block
Version uint16
Height int64
NumTxns uint32
PrevHash Hash // previous block's hash
// app hash after last block.
// calculated based on updates to the PG state, accounts, validators, chain state and txResults.
PrevAppHash Hash
Timestamp time.Time
MerkleRoot Hash // Merkle tree reference to hash of all transactions for the block

// Proposer []byte should be leader, so probably pointless here
ValidatorSetHash Hash // Hash of the validator set for the block
// Hash of the current validator set for the block
ValidatorSetHash Hash

// ConsensusUpdates []ConsensusUpdate

// ChainStateHash Hash // if we want to keep the "application" hash separate from state of consensus engine
// PrevExecResultHash Hash
// ConsensusParams updates for the block, empty if no updates, ignored for this release
// ConsensusParamsUpdates *ConsensusParams
}

type Block struct {
Expand Down
6 changes: 5 additions & 1 deletion node/block_processor/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ var (
type MigratorModule interface {
NotifyHeight(ctx context.Context, block *common.BlockContext, db migrations.Database) error
StoreChangesets(height int64, changes <-chan any) error
PersistLastChangesetHeight(ctx context.Context, tx sql.Executor) error
PersistLastChangesetHeight(ctx context.Context, tx sql.Executor, height int64) error
GetMigrationMetadata(ctx context.Context, status types.MigrationStatus) (*types.MigrationMetadata, error)
}

type BlockStore interface {
GetByHeight(height int64) (types.Hash, *ktypes.Block, types.Hash, error)
}
25 changes: 22 additions & 3 deletions node/block_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type BlockProcessor struct {

type BroadcastTxFn func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (*ktypes.ResultBroadcastTx, error)

func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Accounts, vs ValidatorModule, sp SnapshotModule, es EventStore, migrator MigratorModule, genesisCfg *config.GenesisConfig, signer auth.Signer, logger log.Logger) (*BlockProcessor, error) {
func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Accounts, vs ValidatorModule, sp SnapshotModule, es EventStore, migrator MigratorModule, bs BlockStore, genesisCfg *config.GenesisConfig, signer auth.Signer, logger log.Logger) (*BlockProcessor, error) {
// get network parameters from the chain context
bp := &BlockProcessor{
db: db,
Expand All @@ -95,10 +95,29 @@ func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Account
}
defer tx.Rollback(ctx)

height, appHash, _, err := meta.GetChainState(ctx, tx)
height, appHash, dirty, err := meta.GetChainState(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to get chain state: %w", err)
}
if dirty {
// app state is in a partially committed state, recover the chain state.
_, _, hash, err := bs.GetByHeight(height)
if err != nil {
return nil, err
}

if err := meta.SetChainState(ctx, tx, height, hash[:], false); err != nil {
return nil, err
}

copy(appHash, hash[:])

// also update the last changeset height in the migrator
if err := bp.migrator.PersistLastChangesetHeight(ctx, tx, height); err != nil {
return nil, err
}
}

bp.height.Store(height)
copy(bp.appHash[:], appHash)

Expand Down Expand Up @@ -523,7 +542,7 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest)
return err
}

if err := bp.migrator.PersistLastChangesetHeight(ctxS, tx); err != nil {
if err := bp.migrator.PersistLastChangesetHeight(ctxS, tx, req.Height); err != nil {
err2 := tx.Rollback(ctxS)
if err2 != nil {
bp.log.Error("Failed to rollback the transaction", "err", err2)
Expand Down
10 changes: 7 additions & 3 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error {
return fmt.Errorf("merkleroot mismatch, expected %v, got %v", merkleRoot, blk.Header.MerkleRoot)
}

// Verify other stuff such as validatorsetHash, signature of the block etc.
// Verify the current validator set for the block
valSetHash := ce.validatorSetHash()
if valSetHash != blk.Header.ValidatorSetHash {
return fmt.Errorf("validator set hash mismatch, expected %s, got %s", valSetHash.String(), blk.Header.ValidatorSetHash.String())
}

return nil
}

Expand Down Expand Up @@ -121,7 +126,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
// update the role of the node based on the final validator set at the end of the commit.
ce.updateValidatorSetAndRole()

ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash, "appHash", appHash.String())
ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), "appHash", appHash.String())
return nil
}

Expand All @@ -143,7 +148,6 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error {
}

ce.resetState()

return nil
}

Expand Down
61 changes: 37 additions & 24 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ type ConsensusEngine struct {
leader crypto.PublicKey
log log.Logger

// proposeTimeout specifies the time duration to wait before proposing a new block for the next height.
// Default is 1 second.
proposeTimeout time.Duration

// blkProposalInterval specifies the time duration to wait before reannouncing the block proposal message.
// This is only applicable for the leader. This timeout influences how quickly the out-of-sync nodes can
// catchup with the consensus rounds, thereby influencing the block time. Default is 1 second.
blkProposalInterval time.Duration

// blkAnnReannounceTimeout specifies the time duration to wait before reannouncing the block announce message.
blkAnnInterval time.Duration

genesisHeight int64 // height of the genesis block
networkHeight atomic.Int64
validatorSet map[string]ktypes.Validator // key: hex encoded pubkey
Expand Down Expand Up @@ -101,24 +111,26 @@ type Config struct {
PrivateKey crypto.PrivateKey
// Leader is the public key of the leader.
Leader crypto.PublicKey

// GenesisHeight is the initial height of the network.
GenesisHeight int64

DB *pg.DB
// Mempool is the mempool of the node.
Mempool Mempool
// BlockStore is the blockstore of the node.
BlockStore BlockStore

BlockProcessor BlockProcessor

// ValidatorSet is the set of validators in the network.
ValidatorSet map[string]ktypes.Validator
// Logger is the logger of the node.
Logger log.Logger

// ProposeTimeout is the timeout for proposing a block.
ProposeTimeout time.Duration
// BlkPropReannounceInterval is the frequency at which block proposal messages are reannounced by the Leader.
BlockProposalInterval time.Duration
// BlkAnnReannounceInterval is the frequency at which block commit messages are reannounced by the Leader.
// This is also the frequency at which the validators reannounce the ack messages.
BlockAnnInterval time.Duration
// CatchUpInterval is the frequency at which the node attempts to catches up with the network if lagging.
// CatchUpInterval time.Duration

// Interfaces
DB *pg.DB
Mempool Mempool
BlockStore BlockStore
BlockProcessor BlockProcessor
Logger log.Logger
}

// ProposalBroadcaster broadcasts the new block proposal message to the network
Expand Down Expand Up @@ -218,11 +230,13 @@ func New(cfg *Config) *ConsensusEngine {

// rethink how this state is initialized
ce := &ConsensusEngine{
pubKey: pubKey,
privKey: cfg.PrivateKey,
leader: cfg.Leader,
proposeTimeout: cfg.ProposeTimeout,
db: cfg.DB,
pubKey: pubKey,
privKey: cfg.PrivateKey,
leader: cfg.Leader,
proposeTimeout: cfg.ProposeTimeout,
blkProposalInterval: cfg.BlockProposalInterval,
blkAnnInterval: cfg.BlockAnnInterval,
db: cfg.DB,
state: state{
blkProp: nil,
blockRes: nil,
Expand Down Expand Up @@ -337,11 +351,10 @@ func (ce *ConsensusEngine) close() {
// Apart from the above events, the node also periodically checks if it needs to
// catchup with the network and reannounce the messages.
func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error {
// TODO: make these configurable?
ce.log.Info("Starting the consensus event loop...")
catchUpTicker := time.NewTicker(5 * time.Second)
reannounceTicker := time.NewTicker(3 * time.Second)
blkPropTicker := time.NewTicker(1 * time.Second)
catchUpTicker := time.NewTicker(5 * time.Second) // Should this be configurable??
reannounceTicker := time.NewTicker(ce.blkAnnInterval) // 3 secs (default)
blkPropTicker := time.NewTicker(ce.blkProposalInterval) // 1 sec (default)

for {
select {
Expand Down Expand Up @@ -458,7 +471,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {
}

if dirty {
ce.log.Info("App state is dirty, partially committed??") // TODO: what to be done here??
return fmt.Errorf("app state is dirty, error in the blockprocessor initialization, height: %d, appHash: %x", appHeight, appHash)
}

ce.log.Info("Initial Node state: ", "appHeight", appHeight, "storeHeight", storeHeight, "appHash", appHash, "storeAppHash", storeAppHash)
Expand Down Expand Up @@ -594,7 +607,7 @@ func (ce *ConsensusEngine) reannounceMsgs(ctx context.Context) {
if ce.role.Load() == types.RoleLeader && ce.state.lc.height > 0 {
// Announce block commit message for the last committed block
if ce.state.lc.blk != nil {
go ce.blkAnnouncer(ctx, ce.state.lc.blk, ce.state.lc.appHash) // TODO: can be made infrequent
go ce.blkAnnouncer(ctx, ce.state.lc.blk, ce.state.lc.appHash)
}
return
}
Expand Down
22 changes: 12 additions & 10 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) []*Config {
ev := &mockEventStore{}
m := &mockMigrator{}

bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, v, ss, ev, m, genCfg, signer, log.New(log.WithName("BP")))
bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, v, ss, ev, m, bs, genCfg, signer, log.New(log.WithName("BP")))
assert.NoError(t, err)
bp.SetNetworkParameters(&common.NetworkParameters{
MaxBlockSize: genCfg.MaxBlockSize,
Expand All @@ -140,14 +140,16 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) []*Config {
})

ceConfigs[i] = &Config{
PrivateKey: privKeys[i],
Leader: pubKeys[0],
Mempool: mempool.New(),
BlockStore: bs,
BlockProcessor: bp,
ValidatorSet: validatorSet,
Logger: logger,
ProposeTimeout: 1 * time.Second,
PrivateKey: privKeys[i],
Leader: pubKeys[0],
Mempool: mempool.New(),
BlockStore: bs,
BlockProcessor: bp,
ValidatorSet: validatorSet,
Logger: logger,
ProposeTimeout: 1 * time.Second,
BlockProposalInterval: 1 * time.Second,
BlockAnnInterval: 3 * time.Second,
}

closers = append(closers, func() {
Expand Down Expand Up @@ -914,7 +916,7 @@ func (m *mockMigrator) StoreChangesets(height int64, changes <-chan any) error {
return nil
}

func (m *mockMigrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor) error {
func (m *mockMigrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor, height int64) error {
return nil
}

Expand Down
9 changes: 4 additions & 5 deletions node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// - This phase includes committing the block to the block store, clearing the mempool,
// updating the chain state, creating snapshots, committing the pg db state, etc.
//
// The Leader can also issue ResetState messages using "kwil-admin reset <reset-block-height> <problematic-tx-list>"
// The Leader can also issue ResetState messages using "kwil-admin reset <reset-block-height> <longrunning-tx-list>"
// When a leader receives a ResetState request, it will broadcast the ResetState message to the network to
// halt the current block execution if the current block height equals reset-block-height + 1. The leader will stop processing
// the current block, revert any state changes made, and remove the problematic transactions from the mempool before
Expand Down Expand Up @@ -162,9 +162,8 @@ func (ce *ConsensusEngine) createBlockProposal(ctx context.Context) (*blockPropo
ce.mempool.Remove(types.Hash(tx))
}

blk := ktypes.NewBlock(ce.state.lc.height+1, ce.state.lc.blkHash, ce.state.lc.appHash, ce.ValidatorSetHash(), time.Now(), finalTxs)

// ValSet + valUpdatesHash
valSetHash := ce.validatorSetHash()
blk := ktypes.NewBlock(ce.state.lc.height+1, ce.state.lc.blkHash, ce.state.lc.appHash, valSetHash, time.Now(), finalTxs)

// Sign the block
if err := blk.Sign(ce.privKey); err != nil {
Expand Down Expand Up @@ -274,7 +273,7 @@ func (ce *ConsensusEngine) processVotes(ctx context.Context) error {
return nil
}

func (ce *ConsensusEngine) ValidatorSetHash() types.Hash {
func (ce *ConsensusEngine) validatorSetHash() types.Hash {
hasher := ktypes.NewHasher()

keys := make([]string, 0, len(ce.validatorSet))
Expand Down
Loading
Loading