diff --git a/app/node/build.go b/app/node/build.go index 1004b8048..dcaa9896e 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -232,6 +232,8 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB, acc failBuild(err, "failed to parse leader public key") } + genHash := d.genesisCfg.ComputeGenesisHash() + ceCfg := &consensus.Config{ PrivateKey: d.privKey, Leader: leaderPubKey, @@ -245,6 +247,7 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB, acc MaxVotesPerTx: d.genesisCfg.MaxVotesPerTx, }, }, + GenesisHash: genHash, DB: db, Accounts: accounts, BlockStore: bs, diff --git a/config/config.go b/config/config.go index 597245f33..35f2fd394 100644 --- a/config/config.go +++ b/config/config.go @@ -1,8 +1,11 @@ package config import ( + "cmp" + "encoding/binary" "encoding/json" "os" + "slices" "time" "github.com/kwilteam/kwil-db/core/log" @@ -80,6 +83,27 @@ func LoadGenesisConfig(filename string) (*GenesisConfig, error) { return &nc, nil } +// TODO: Harden this code to prevent from consensus breaking changes +func (gc *GenesisConfig) ComputeGenesisHash() types.Hash { + hasher := ktypes.NewHasher() + + hasher.Write([]byte(gc.ChainID)) + hasher.Write(gc.Leader) + + // sort the validators by public key + slices.SortFunc(gc.Validators, func(a, b ktypes.Validator) int { + return cmp.Compare(a.PubKey.String(), b.PubKey.String()) + }) + + binary.Write(hasher, binary.BigEndian, gc.MaxBlockSize) + binary.Write(hasher, binary.BigEndian, gc.JoinExpiry) + binary.Write(hasher, binary.BigEndian, gc.VoteExpiry) + binary.Write(hasher, binary.BigEndian, gc.DisabledGasCosts) + binary.Write(hasher, binary.BigEndian, gc.MaxVotesPerTx) + + return hasher.Sum(nil) +} + // const ( // defaultUserRPCPort = 8484 // defaultAdminRPCPort = 8584 diff --git a/node/consensus.go b/node/consensus.go index 386e06d3f..3c1d223b9 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -415,11 +415,6 @@ func (n *Node) startDiscoveryRequestGossip(ctx context.Context, ps *pubsub.PubSu return } - // We're only interested if we are the validator. - if n.ce.Role() != types.RoleValidator { - continue // discard, we are just relaying to leader - } - if peer.ID(discMsg.From) == me { continue } diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index 39e297a6b..cee15eaa6 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -13,10 +13,12 @@ import ( // caught up with the network, before it can start proposing blocks. // Else it can propose a block at a height that is already finalized // leading to a fork. - func (ce *ConsensusEngine) doBlockSync(ctx context.Context) error { if ce.role.Load() == types.RoleLeader { - if len(ce.validators.GetValidators()) == 1 { + // TODO: which validator set we should use here? whatever we have in the state? + // what if the current validators are not the same as the ones in the state? + // and they don't respond to the status request? + if len(ce.validatorSet) == 1 { return nil // we are the network } // figure out the best height to sync with the network diff --git a/node/consensus/engine.go b/node/consensus/engine.go index 7de30ec30..e573b3bdc 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -50,8 +50,9 @@ type ConsensusEngine struct { proposeTimeout time.Duration - networkHeight atomic.Int64 - validatorSet map[string]ktypes.Validator + networkHeight atomic.Int64 + validatorSet map[string]ktypes.Validator + genesisAppHash types.Hash // stores state machine state for the consensus engine state state @@ -196,6 +197,7 @@ type Config struct { // Leader is the public key of the leader. Leader crypto.PublicKey + GenesisHash types.Hash GenesisParams *GenesisParams // *config.GenesisConfig DB *pg.DB @@ -298,13 +300,14 @@ func New(cfg *Config) *ConsensusEngine { resetChan: make(chan int64, 1), bestHeightCh: make(chan *discoveryMsg, 1), // interfaces - mempool: cfg.Mempool, - blockStore: cfg.BlockStore, - txapp: cfg.TxApp, - accounts: cfg.Accounts, - validators: cfg.ValidatorStore, - snapshotter: cfg.Snapshots, - log: logger, + mempool: cfg.Mempool, + blockStore: cfg.BlockStore, + txapp: cfg.TxApp, + accounts: cfg.Accounts, + validators: cfg.ValidatorStore, + snapshotter: cfg.Snapshots, + log: logger, + genesisAppHash: cfg.GenesisHash, } ce.role.Store(role) @@ -313,6 +316,8 @@ func New(cfg *Config) *ConsensusEngine { return ce } +var initialHeight int64 = 0 // TODO: get it from genesis? + func (ce *ConsensusEngine) Start(ctx context.Context, proposerBroadcaster ProposalBroadcaster, blkAnnouncer BlkAnnouncer, ackBroadcaster AckBroadcaster, blkRequester BlkRequester, stateResetter ResetStateBroadcaster, discoveryReqBroadcaster DiscoveryReqBroadcaster) error { @@ -337,6 +342,52 @@ func (ce *ConsensusEngine) Start(ctx context.Context, proposerBroadcaster Propos return ce.runConsensusEventLoop(ctx) } +// GenesisInit initializes the node with the genesis state. This included initializing the +// votestore with the genesis validators, accounts with the genesis allocations and the +// chain meta store with the genesis network parameters. +// This is called only once when the node is bootstrapping for the first time. +func (ce *ConsensusEngine) GenesisInit(ctx context.Context) error { + genesisTx, err := ce.db.BeginTx(ctx) + if err != nil { + return err + } + defer genesisTx.Rollback(ctx) + + // TODO: genesis allocs + + // genesis validators + genVals := make([]*ktypes.Validator, 0, len(ce.validatorSet)) + for _, v := range ce.validatorSet { + genVals = append(genVals, &ktypes.Validator{ + PubKey: v.PubKey, + Power: v.Power, + }) + } + + startParams := *ce.chainCtx.NetworkParameters + + if err := ce.txapp.GenesisInit(ctx, genesisTx, genVals, nil, initialHeight, ce.chainCtx); err != nil { + return err + } + + if err := meta.SetChainState(ctx, genesisTx, initialHeight, ce.genesisAppHash[:], false); err != nil { + return fmt.Errorf("error storing the genesis state: %w", err) + } + + if err := meta.StoreDiff(ctx, genesisTx, &startParams, ce.chainCtx.NetworkParameters); err != nil { + return fmt.Errorf("error storing the genesis consensus params: %w", err) + } + + // TODO: Genesis hash and what are the mechanics for producing the first block (genesis block)? + ce.txapp.Commit() + + ce.state.lc.appHash = ce.genesisAppHash + ce.state.lc.height = initialHeight + + ce.log.Info("Initialized chain", "height", initialHeight, "appHash", ce.state.lc.appHash.String()) + return genesisTx.Commit(ctx) +} + // runEventLoop starts the event loop for the consensus engine. // Below are the event triggers that nodes can receive depending on their role: // Leader: @@ -497,13 +548,22 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { ce.setLastCommitInfo(appHeight, blkHash, types.Hash(appHash)) } + if appHeight == -1 { + // This is the first time the node is bootstrapping + // initialize the db with the genesis state + if err := ce.GenesisInit(ctx); err != nil { + return fmt.Errorf("error initializing the genesis state: %w", err) + } + } + + // Replay the blocks from the blockstore if the app hasn't played all the blocks yet. if appHeight < storeHeight { - // Replay the blocks from the blockstore if err := ce.replayFromBlockStore(appHeight+1, storeHeight); err != nil { return err } } + // Sync with the network using the blocksync if err := ce.doBlockSync(ctx); err != nil { return err } diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 4d4b40731..5912a48cb 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -574,7 +574,7 @@ func TestValidatorStateMachine(t *testing.T) { return false } return true - }, 2*time.Second, 100*time.Millisecond) + }, 6*time.Second, 100*time.Millisecond) } }) } @@ -658,7 +658,7 @@ func TestCELeaderTwoNodesMajorityAcks(t *testing.T) { height := n1.lastCommitHeight() fmt.Printf("Height: %d\n", height) return height == 1 - }, 2*time.Second, 100*time.Millisecond) + }, 6*time.Second, 100*time.Millisecond) } func TestCELeaderTwoNodesMajorityNacks(t *testing.T) { @@ -689,7 +689,7 @@ func TestCELeaderTwoNodesMajorityNacks(t *testing.T) { require.Eventually(t, func() bool { blockRes := n1.blockResult() return blockRes != nil && !blockRes.appHash.IsZero() - }, 2*time.Second, 100*time.Millisecond) + }, 6*time.Second, 100*time.Millisecond) _, _, b := n1.info() assert.NotNil(t, b) @@ -777,6 +777,9 @@ func (d *dummyTxApp) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transact func (d *dummyTxApp) Commit() error { return nil } +func (d *dummyTxApp) GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error { + return nil +} type validatorStore struct { valSet []*ktypes.Validator diff --git a/node/consensus/interfaces.go b/node/consensus/interfaces.go index 1bca9d891..aa2076c64 100644 --- a/node/consensus/interfaces.go +++ b/node/consensus/interfaces.go @@ -60,6 +60,7 @@ type TxApp interface { Execute(ctx *common.TxContext, db sql.DB, tx *ktypes.Transaction) *txapp.TxResponse Finalize(ctx context.Context, db sql.DB, block *common.BlockContext) (finalValidators []*ktypes.Validator, err error) Commit() error + GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transaction, chainContext *common.ChainContext) (*big.Int, error) } diff --git a/node/node_live_test.go b/node/node_live_test.go index c79f974ab..9c106fcfe 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -369,6 +369,10 @@ func (d *dummyTxApp) Commit() error { return nil } +func (d *dummyTxApp) GenesisInit(ctx context.Context, db sql.DB, validators []*ktypes.Validator, genesisAccounts []*ktypes.Account, initialHeight int64, chain *common.ChainContext) error { + return nil +} + type validatorStore struct { valSet []*ktypes.Validator }