diff --git a/app/node/build.go b/app/node/build.go index 8a2e123ed..9f0a30108 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -90,7 +90,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { migrator := buildMigrator(d, db, accounts, vs) // BlockProcessor - bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss, es, migrator) + bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss, es, migrator, bs) // Consensus ce := buildConsensusEngine(ctx, d, db, mp, bs, bp, valSet) @@ -357,10 +357,10 @@ func buildTxApp(ctx context.Context, d *coreDependencies, db *pg.DB, accounts *a return txapp } -func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore, es *voting.EventStore, migrator *migrations.Migrator) *blockprocessor.BlockProcessor { +func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore, es *voting.EventStore, migrator *migrations.Migrator, bs *store.BlockStore) *blockprocessor.BlockProcessor { signer := auth.GetNodeSigner(d.privKey) - bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, es, migrator, d.genesisCfg, signer, d.logger.New("BP")) + bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, es, migrator, bs, d.genesisCfg, signer, d.logger.New("BP")) if err != nil { failBuild(err, "failed to create block processor") } @@ -409,16 +409,18 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB, } ceCfg := &consensus.Config{ - PrivateKey: d.privKey, - Leader: leaderPubKey, - DB: db, - BlockStore: bs, - BlockProcessor: bp, - Mempool: mempool, - ValidatorSet: valSet, - Logger: d.logger.New("CONS"), - ProposeTimeout: time.Duration(d.cfg.Consensus.ProposeTimeout), - GenesisHeight: d.genesisCfg.InitialHeight, + PrivateKey: d.privKey, + Leader: leaderPubKey, + DB: db, + BlockStore: bs, + BlockProcessor: bp, + Mempool: mempool, + ValidatorSet: valSet, + Logger: d.logger.New("CONS"), + ProposeTimeout: time.Duration(d.cfg.Consensus.ProposeTimeout), + BlockProposalInterval: time.Duration(d.cfg.Consensus.BlockProposalInterval), + BlockAnnInterval: time.Duration(d.cfg.Consensus.BlockAnnInterval), + GenesisHeight: d.genesisCfg.InitialHeight, } ce := consensus.New(ceCfg) diff --git a/config/config.go b/config/config.go index ed34ddc60..c34470694 100644 --- a/config/config.go +++ b/config/config.go @@ -160,9 +160,11 @@ func DefaultConfig() *Config { BootNodes: []string{}, }, Consensus: ConsensusConfig{ - ProposeTimeout: Duration(1000 * time.Millisecond), - MaxBlockSize: 50_000_000, - MaxTxsPerBlock: 20_000, + ProposeTimeout: Duration(1000 * time.Millisecond), + MaxBlockSize: 50_000_000, + MaxTxsPerBlock: 20_000, + BlockProposalInterval: Duration(1 * time.Second), + BlockAnnInterval: Duration(3 * time.Second), }, DB: DBConfig{ Host: "127.0.0.1", @@ -260,7 +262,16 @@ type ConsensusConfig struct { ProposeTimeout Duration `toml:"propose_timeout" comment:"timeout for proposing a block (applies to leader)"` MaxBlockSize uint64 `toml:"max_block_size" comment:"max size of a block in bytes"` MaxTxsPerBlock uint64 `toml:"max_txs_per_block" comment:"max number of transactions per block"` - // ? reannounce intervals? + // reannounce intervals + + // BlockProposalInterval is the interval between block proposal reannouncements by the leader. + // This impacts the time it takes for an out-of-sync validator to receive the current block proposal, + // thereby impacting the block times. Default is 1 second. + BlockProposalInterval Duration `toml:"block_proposal_interval" comment:"interval between block proposal reannouncements by the leader"` + // BlockAnnInterval is the frequency with which the block commit messages are reannouncements by the leader, + // and votes reannounced by validators. Default is 3 second. This impacts the time it takes for an + // out-of-sync nodes to catch up with the latest block. + BlockAnnInterval Duration `toml:"block_ann_interval" comment:"interval between block commit reannouncements by the leader, and votes reannouncements by validators"` } type RPCConfig struct { diff --git a/core/types/block.go b/core/types/block.go index 17c57d0a8..dd4aae249 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -19,21 +19,21 @@ const ( var ErrNotFound = errors.New("not found") type BlockHeader struct { - Version uint16 - Height int64 - NumTxns uint32 - PrevHash Hash // previous block's hash - PrevAppHash Hash // app hash after last block + Version uint16 + Height int64 + NumTxns uint32 + PrevHash Hash // previous block's hash + // app hash after last block. + // calculated based on updates to the PG state, accounts, validators, chain state and txResults. + PrevAppHash Hash Timestamp time.Time MerkleRoot Hash // Merkle tree reference to hash of all transactions for the block - // Proposer []byte should be leader, so probably pointless here - ValidatorSetHash Hash // Hash of the validator set for the block + // Hash of the current validator set for the block + ValidatorSetHash Hash - // ConsensusUpdates []ConsensusUpdate - - // ChainStateHash Hash // if we want to keep the "application" hash separate from state of consensus engine - // PrevExecResultHash Hash + // ConsensusParams updates for the block, empty if no updates, ignored for this release + // ConsensusParamsUpdates *ConsensusParams } type Block struct { diff --git a/node/block_processor/interfaces.go b/node/block_processor/interfaces.go index 47b61f60e..cc9b7f0fc 100644 --- a/node/block_processor/interfaces.go +++ b/node/block_processor/interfaces.go @@ -87,6 +87,10 @@ var ( type MigratorModule interface { NotifyHeight(ctx context.Context, block *common.BlockContext, db migrations.Database) error StoreChangesets(height int64, changes <-chan any) error - PersistLastChangesetHeight(ctx context.Context, tx sql.Executor) error + PersistLastChangesetHeight(ctx context.Context, tx sql.Executor, height int64) error GetMigrationMetadata(ctx context.Context, status types.MigrationStatus) (*types.MigrationMetadata, error) } + +type BlockStore interface { + GetByHeight(height int64) (types.Hash, *ktypes.Block, types.Hash, error) +} diff --git a/node/block_processor/processor.go b/node/block_processor/processor.go index 79c76c2fc..049691027 100644 --- a/node/block_processor/processor.go +++ b/node/block_processor/processor.go @@ -69,7 +69,7 @@ type BlockProcessor struct { type BroadcastTxFn func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (*ktypes.ResultBroadcastTx, error) -func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Accounts, vs ValidatorModule, sp SnapshotModule, es EventStore, migrator MigratorModule, genesisCfg *config.GenesisConfig, signer auth.Signer, logger log.Logger) (*BlockProcessor, error) { +func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Accounts, vs ValidatorModule, sp SnapshotModule, es EventStore, migrator MigratorModule, bs BlockStore, genesisCfg *config.GenesisConfig, signer auth.Signer, logger log.Logger) (*BlockProcessor, error) { // get network parameters from the chain context bp := &BlockProcessor{ db: db, @@ -95,10 +95,29 @@ func NewBlockProcessor(ctx context.Context, db DB, txapp TxApp, accounts Account } defer tx.Rollback(ctx) - height, appHash, _, err := meta.GetChainState(ctx, tx) + height, appHash, dirty, err := meta.GetChainState(ctx, tx) if err != nil { return nil, fmt.Errorf("failed to get chain state: %w", err) } + if dirty { + // app state is in a partially committed state, recover the chain state. + _, _, hash, err := bs.GetByHeight(height) + if err != nil { + return nil, err + } + + if err := meta.SetChainState(ctx, tx, height, hash[:], false); err != nil { + return nil, err + } + + copy(appHash, hash[:]) + + // also update the last changeset height in the migrator + if err := bp.migrator.PersistLastChangesetHeight(ctx, tx, height); err != nil { + return nil, err + } + } + bp.height.Store(height) copy(bp.appHash[:], appHash) @@ -523,7 +542,7 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest) return err } - if err := bp.migrator.PersistLastChangesetHeight(ctxS, tx); err != nil { + if err := bp.migrator.PersistLastChangesetHeight(ctxS, tx, req.Height); err != nil { err2 := tx.Rollback(ctxS) if err2 != nil { bp.log.Error("Failed to rollback the transaction", "err", err2) diff --git a/node/consensus/block.go b/node/consensus/block.go index 909909108..3d278a876 100644 --- a/node/consensus/block.go +++ b/node/consensus/block.go @@ -38,7 +38,12 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error { return fmt.Errorf("merkleroot mismatch, expected %v, got %v", merkleRoot, blk.Header.MerkleRoot) } - // Verify other stuff such as validatorsetHash, signature of the block etc. + // Verify the current validator set for the block + valSetHash := ce.validatorSetHash() + if valSetHash != blk.Header.ValidatorSetHash { + return fmt.Errorf("validator set hash mismatch, expected %s, got %s", valSetHash.String(), blk.Header.ValidatorSetHash.String()) + } + return nil } @@ -121,7 +126,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error { // update the role of the node based on the final validator set at the end of the commit. ce.updateValidatorSetAndRole() - ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash, "appHash", appHash.String()) + ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(), "appHash", appHash.String()) return nil } @@ -143,7 +148,6 @@ func (ce *ConsensusEngine) rollbackState(ctx context.Context) error { } ce.resetState() - return nil } diff --git a/node/consensus/engine.go b/node/consensus/engine.go index 6e537d9bc..cdb4d33cd 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -45,8 +45,18 @@ type ConsensusEngine struct { leader crypto.PublicKey log log.Logger + // proposeTimeout specifies the time duration to wait before proposing a new block for the next height. + // Default is 1 second. proposeTimeout time.Duration + // blkProposalInterval specifies the time duration to wait before reannouncing the block proposal message. + // This is only applicable for the leader. This timeout influences how quickly the out-of-sync nodes can + // catchup with the consensus rounds, thereby influencing the block time. Default is 1 second. + blkProposalInterval time.Duration + + // blkAnnReannounceTimeout specifies the time duration to wait before reannouncing the block announce message. + blkAnnInterval time.Duration + genesisHeight int64 // height of the genesis block networkHeight atomic.Int64 validatorSet map[string]ktypes.Validator // key: hex encoded pubkey @@ -101,24 +111,26 @@ type Config struct { PrivateKey crypto.PrivateKey // Leader is the public key of the leader. Leader crypto.PublicKey - + // GenesisHeight is the initial height of the network. GenesisHeight int64 - - DB *pg.DB - // Mempool is the mempool of the node. - Mempool Mempool - // BlockStore is the blockstore of the node. - BlockStore BlockStore - - BlockProcessor BlockProcessor - // ValidatorSet is the set of validators in the network. ValidatorSet map[string]ktypes.Validator - // Logger is the logger of the node. - Logger log.Logger - // ProposeTimeout is the timeout for proposing a block. ProposeTimeout time.Duration + // BlkPropReannounceInterval is the frequency at which block proposal messages are reannounced by the Leader. + BlockProposalInterval time.Duration + // BlkAnnReannounceInterval is the frequency at which block commit messages are reannounced by the Leader. + // This is also the frequency at which the validators reannounce the ack messages. + BlockAnnInterval time.Duration + // CatchUpInterval is the frequency at which the node attempts to catches up with the network if lagging. + // CatchUpInterval time.Duration + + // Interfaces + DB *pg.DB + Mempool Mempool + BlockStore BlockStore + BlockProcessor BlockProcessor + Logger log.Logger } // ProposalBroadcaster broadcasts the new block proposal message to the network @@ -218,11 +230,13 @@ func New(cfg *Config) *ConsensusEngine { // rethink how this state is initialized ce := &ConsensusEngine{ - pubKey: pubKey, - privKey: cfg.PrivateKey, - leader: cfg.Leader, - proposeTimeout: cfg.ProposeTimeout, - db: cfg.DB, + pubKey: pubKey, + privKey: cfg.PrivateKey, + leader: cfg.Leader, + proposeTimeout: cfg.ProposeTimeout, + blkProposalInterval: cfg.BlockProposalInterval, + blkAnnInterval: cfg.BlockAnnInterval, + db: cfg.DB, state: state{ blkProp: nil, blockRes: nil, @@ -337,11 +351,10 @@ func (ce *ConsensusEngine) close() { // Apart from the above events, the node also periodically checks if it needs to // catchup with the network and reannounce the messages. func (ce *ConsensusEngine) runConsensusEventLoop(ctx context.Context) error { - // TODO: make these configurable? ce.log.Info("Starting the consensus event loop...") - catchUpTicker := time.NewTicker(5 * time.Second) - reannounceTicker := time.NewTicker(3 * time.Second) - blkPropTicker := time.NewTicker(1 * time.Second) + catchUpTicker := time.NewTicker(5 * time.Second) // Should this be configurable?? + reannounceTicker := time.NewTicker(ce.blkAnnInterval) // 3 secs (default) + blkPropTicker := time.NewTicker(ce.blkProposalInterval) // 1 sec (default) for { select { @@ -458,7 +471,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { } if dirty { - ce.log.Info("App state is dirty, partially committed??") // TODO: what to be done here?? + return fmt.Errorf("app state is dirty, error in the blockprocessor initialization, height: %d, appHash: %x", appHeight, appHash) } ce.log.Info("Initial Node state: ", "appHeight", appHeight, "storeHeight", storeHeight, "appHash", appHash, "storeAppHash", storeAppHash) @@ -594,7 +607,7 @@ func (ce *ConsensusEngine) reannounceMsgs(ctx context.Context) { if ce.role.Load() == types.RoleLeader && ce.state.lc.height > 0 { // Announce block commit message for the last committed block if ce.state.lc.blk != nil { - go ce.blkAnnouncer(ctx, ce.state.lc.blk, ce.state.lc.appHash) // TODO: can be made infrequent + go ce.blkAnnouncer(ctx, ce.state.lc.blk, ce.state.lc.appHash) } return } diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index da653d6b4..b9611549d 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -129,7 +129,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) []*Config { ev := &mockEventStore{} m := &mockMigrator{} - bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, v, ss, ev, m, genCfg, signer, log.New(log.WithName("BP"))) + bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, v, ss, ev, m, bs, genCfg, signer, log.New(log.WithName("BP"))) assert.NoError(t, err) bp.SetNetworkParameters(&common.NetworkParameters{ MaxBlockSize: genCfg.MaxBlockSize, @@ -140,14 +140,16 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) []*Config { }) ceConfigs[i] = &Config{ - PrivateKey: privKeys[i], - Leader: pubKeys[0], - Mempool: mempool.New(), - BlockStore: bs, - BlockProcessor: bp, - ValidatorSet: validatorSet, - Logger: logger, - ProposeTimeout: 1 * time.Second, + PrivateKey: privKeys[i], + Leader: pubKeys[0], + Mempool: mempool.New(), + BlockStore: bs, + BlockProcessor: bp, + ValidatorSet: validatorSet, + Logger: logger, + ProposeTimeout: 1 * time.Second, + BlockProposalInterval: 1 * time.Second, + BlockAnnInterval: 3 * time.Second, } closers = append(closers, func() { @@ -914,7 +916,7 @@ func (m *mockMigrator) StoreChangesets(height int64, changes <-chan any) error { return nil } -func (m *mockMigrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor) error { +func (m *mockMigrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor, height int64) error { return nil } diff --git a/node/consensus/leader.go b/node/consensus/leader.go index f142d53d3..c952d2ecf 100644 --- a/node/consensus/leader.go +++ b/node/consensus/leader.go @@ -25,7 +25,7 @@ import ( // - This phase includes committing the block to the block store, clearing the mempool, // updating the chain state, creating snapshots, committing the pg db state, etc. // -// The Leader can also issue ResetState messages using "kwil-admin reset " +// The Leader can also issue ResetState messages using "kwil-admin reset " // When a leader receives a ResetState request, it will broadcast the ResetState message to the network to // halt the current block execution if the current block height equals reset-block-height + 1. The leader will stop processing // the current block, revert any state changes made, and remove the problematic transactions from the mempool before @@ -162,9 +162,8 @@ func (ce *ConsensusEngine) createBlockProposal(ctx context.Context) (*blockPropo ce.mempool.Remove(types.Hash(tx)) } - blk := ktypes.NewBlock(ce.state.lc.height+1, ce.state.lc.blkHash, ce.state.lc.appHash, ce.ValidatorSetHash(), time.Now(), finalTxs) - - // ValSet + valUpdatesHash + valSetHash := ce.validatorSetHash() + blk := ktypes.NewBlock(ce.state.lc.height+1, ce.state.lc.blkHash, ce.state.lc.appHash, valSetHash, time.Now(), finalTxs) // Sign the block if err := blk.Sign(ce.privKey); err != nil { @@ -274,7 +273,7 @@ func (ce *ConsensusEngine) processVotes(ctx context.Context) error { return nil } -func (ce *ConsensusEngine) ValidatorSetHash() types.Hash { +func (ce *ConsensusEngine) validatorSetHash() types.Hash { hasher := ktypes.NewHasher() keys := make([]string, 0, len(ce.validatorSet)) diff --git a/node/migrations/migrator.go b/node/migrations/migrator.go index cd398d09c..aea4cf617 100644 --- a/node/migrations/migrator.go +++ b/node/migrations/migrator.go @@ -258,11 +258,12 @@ func (m *Migrator) generateGenesisConfig(snapshotHash []byte, logger log.Logger) return nil } -func (m *Migrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor) error { +func (m *Migrator) PersistLastChangesetHeight(ctx context.Context, tx sql.Executor, height int64) error { m.mu.RLock() defer m.mu.RUnlock() - return setLastStoredChangeset(ctx, tx, m.lastChangeset) + m.lastChangeset = height // safety update for inconsistent bootup. it should already be updated by the writer + return setLastStoredChangeset(ctx, tx, height) } // GetMigrationMetadata gets the metadata for the genesis snapshot, diff --git a/node/node_live_test.go b/node/node_live_test.go index 7b2a43088..63a4c5dda 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -102,19 +102,21 @@ func TestSingleNodeMocknet(t *testing.T) { require.NoError(t, err) bpl := log.New(log.WithName("BP1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)) - bp, err := blockprocessor.NewBlockProcessor(ctx, db1, newDummyTxApp(valSetList), &mockAccounts{}, vsReal, ss, es, migrator, genCfg, signer1, bpl) + bp, err := blockprocessor.NewBlockProcessor(ctx, db1, newDummyTxApp(valSetList), &mockAccounts{}, vsReal, ss, es, migrator, bs1, genCfg, signer1, bpl) require.NoError(t, err) ceCfg1 := &consensus.Config{ - PrivateKey: privKeys[0], - ValidatorSet: valSet, - Leader: privKeys[0].Public(), - Mempool: mp1, - BlockStore: bs1, - BlockProcessor: bp, - Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), - ProposeTimeout: 1 * time.Second, - DB: db1, + PrivateKey: privKeys[0], + ValidatorSet: valSet, + Leader: privKeys[0].Public(), + Mempool: mp1, + BlockStore: bs1, + BlockProcessor: bp, + Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), + ProposeTimeout: 1 * time.Second, + BlockProposalInterval: 1 * time.Second, + BlockAnnInterval: 3 * time.Second, + DB: db1, } ce1 := consensus.New(ceCfg1) defaultConfigSet := config.DefaultConfig() @@ -232,19 +234,21 @@ func TestDualNodeMocknet(t *testing.T) { require.NoError(t, err) bpl1 := log.New(log.WithName("BP1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)) - bp1, err := blockprocessor.NewBlockProcessor(ctx, db1, newDummyTxApp(valSetList), accounts1, vstore1, ss, es1, migrator, genCfg, signer1, bpl1) + bp1, err := blockprocessor.NewBlockProcessor(ctx, db1, newDummyTxApp(valSetList), accounts1, vstore1, ss, es1, migrator, bs1, genCfg, signer1, bpl1) require.NoError(t, err) ceCfg1 := &consensus.Config{ - PrivateKey: privKeys[0], - ValidatorSet: valSet, - Leader: privKeys[0].Public(), - Mempool: mp1, - BlockStore: bs1, - Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), - ProposeTimeout: 1 * time.Second, - DB: db1, - BlockProcessor: bp1, + PrivateKey: privKeys[0], + ValidatorSet: valSet, + Leader: privKeys[0].Public(), + Mempool: mp1, + BlockStore: bs1, + Logger: log.New(log.WithName("CE1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), + ProposeTimeout: 1 * time.Second, + BlockProposalInterval: 1 * time.Second, + BlockAnnInterval: 3 * time.Second, + DB: db1, + BlockProcessor: bp1, } ce1 := consensus.New(ceCfg1) defaultConfigSet := config.DefaultConfig() @@ -289,19 +293,21 @@ func TestDualNodeMocknet(t *testing.T) { require.NoError(t, err) bpl2 := log.New(log.WithName("BP2"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)) - bp2, err := blockprocessor.NewBlockProcessor(ctx, db2, newDummyTxApp(valSetList), accounts2, vstore2, ss, es2, migrator2, genCfg, signer2, bpl2) + bp2, err := blockprocessor.NewBlockProcessor(ctx, db2, newDummyTxApp(valSetList), accounts2, vstore2, ss, es2, migrator2, bs2, genCfg, signer2, bpl2) require.NoError(t, err) ceCfg2 := &consensus.Config{ - PrivateKey: privKeys[1], - ValidatorSet: valSet, - Leader: privKeys[0].Public(), - Mempool: mp2, - BlockStore: bs2, - BlockProcessor: bp2, - Logger: log.New(log.WithName("CE2"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), - ProposeTimeout: 1 * time.Second, - DB: db2, + PrivateKey: privKeys[1], + ValidatorSet: valSet, + Leader: privKeys[0].Public(), + Mempool: mp2, + BlockStore: bs2, + BlockProcessor: bp2, + Logger: log.New(log.WithName("CE2"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)), + ProposeTimeout: 1 * time.Second, + BlockProposalInterval: 1 * time.Second, + BlockAnnInterval: 3 * time.Second, + DB: db2, } ce2 := consensus.New(ceCfg2) diff --git a/node/services/jsonrpc/adminsvc/service.go b/node/services/jsonrpc/adminsvc/service.go index 2e354e697..3b1b43d36 100644 --- a/node/services/jsonrpc/adminsvc/service.go +++ b/node/services/jsonrpc/adminsvc/service.go @@ -386,7 +386,7 @@ func (svc *Service) JoinStatus(ctx context.Context, req *adminjson.JoinStatusReq voters := svc.voting.GetValidators() - pendingJoin, err := toPendingInfo(resolution, voters) + pendingJoin, err := svc.toPendingInfo(resolution, voters) if err != nil { svc.log.Error("failed to convert join request", "error", err) return nil, jsonrpc.NewError(jsonrpc.ErrorResultEncoding, "failed to convert join request", nil) @@ -431,7 +431,7 @@ func (svc *Service) ListPendingJoins(ctx context.Context, req *adminjson.ListJoi pbJoins := make([]*adminjson.PendingJoin, len(activeJoins)) for i, ji := range activeJoins { - pbJoins[i], err = toPendingInfo(ji, voters) + pbJoins[i], err = svc.toPendingInfo(ji, voters) if err != nil { svc.log.Error("failed to convert join request", "error", err) return nil, jsonrpc.NewError(jsonrpc.ErrorResultEncoding, "failed to convert join request", nil) @@ -444,12 +444,24 @@ func (svc *Service) ListPendingJoins(ctx context.Context, req *adminjson.ListJoi } // toPendingInfo gets the pending information for an active join from a resolution -func toPendingInfo(resolution *resolutions.Resolution, allVoters []*ktypes.Validator) (*adminjson.PendingJoin, error) { +func (svc *Service) toPendingInfo(resolution *resolutions.Resolution, allVoters []*ktypes.Validator) (*adminjson.PendingJoin, error) { resolutionBody := &voting.UpdatePowerRequest{} if err := resolutionBody.UnmarshalBinary(resolution.Body); err != nil { return nil, fmt.Errorf("failed to unmarshal join request") } + board, approvals := svc.approvalsInfo(resolution, allVoters) + + return &adminjson.PendingJoin{ + Candidate: resolutionBody.PubKey, + Power: resolutionBody.Power, + ExpiresAt: resolution.ExpirationHeight, + Board: board, + Approved: approvals, + }, nil +} + +func (svc *Service) approvalsInfo(resolution *resolutions.Resolution, allVoters []*ktypes.Validator) ([]ktypes.HexBytes, []bool) { // to create the board, we will take a list of all approvers and append the voters. // we will then remove any duplicates the second time we see them. // this will result with all approvers at the start of the list, and all voters at the end. @@ -475,13 +487,7 @@ func toPendingInfo(resolution *resolutions.Resolution, allVoters []*ktypes.Valid found[string(board[i])] = struct{}{} } - return &adminjson.PendingJoin{ - Candidate: resolutionBody.PubKey, - Power: resolutionBody.Power, - ExpiresAt: resolution.ExpirationHeight, - Board: board, - Approved: approvals, - }, nil + return board, approvals } func (svc *Service) GetConfig(ctx context.Context, req *adminjson.GetConfigRequest) (*adminjson.GetConfigResponse, *jsonrpc.Error) { @@ -558,18 +564,16 @@ func (svc *Service) ResolutionStatus(ctx context.Context, req *adminjson.Resolut return nil, jsonrpc.NewError(jsonrpc.ErrorDBInternal, "failed to retrieve resolution", nil) } - // get the status of the resolution - // expiresAt, board, approvals, err := voting.ResolutionStatus(ctx, readTx, resolution) - // if err != nil { - // return nil, jsonrpc.NewError(jsonrpc.ErrorDBInternal, "failed to retrieve resolution status", nil) - // } + voters := svc.voting.GetValidators() + + board, approvals := svc.approvalsInfo(resolution, voters) return &adminjson.ResolutionStatusResponse{ Status: &ktypes.PendingResolution{ ResolutionID: req.ResolutionID, ExpiresAt: resolution.ExpirationHeight, - Board: nil, // resolution.Voters ??? - Approved: nil, // ??? + Board: board, + Approved: approvals, }, }, nil } diff --git a/node/voting/events.go b/node/voting/events.go index dacc2f81d..48aa07788 100644 --- a/node/voting/events.go +++ b/node/voting/events.go @@ -17,7 +17,6 @@ import ( "github.com/kwilteam/kwil-db/node/versioning" ) -// TODO: To implement EventBroadcaster-> broadcast VoteID Transactions, p2p service handlers has to be implemented // Life cycle of an event: // 1. Events received from an external source is stored in the event store. // 2. Block proposer will create resolutions for the events observed.