Skip to content

Commit

Permalink
Event broadcasting support:
Browse files Browse the repository at this point in the history
Ability to configure and run extensions
Order Block transactions in nonce order and enforce consensus limits such as maxBlockSz, maxVotesPerTx
Leader to induce ValidatorVoteBodies Tx if there are any events without resolutions
Validators can broadcast their vote for the existing resolutions
ethDeposits extension
  • Loading branch information
charithabandi committed Dec 13, 2024
1 parent 5e075f0 commit c0bfdb3
Show file tree
Hide file tree
Showing 33 changed files with 1,724 additions and 448 deletions.
38 changes: 27 additions & 11 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
blockprocessor "github.com/kwilteam/kwil-db/node/block_processor"
"github.com/kwilteam/kwil-db/node/consensus"
"github.com/kwilteam/kwil-db/node/engine/execution"
"github.com/kwilteam/kwil-db/node/listeners"
"github.com/kwilteam/kwil-db/node/mempool"
"github.com/kwilteam/kwil-db/node/meta"
"github.com/kwilteam/kwil-db/node/pg"
Expand Down Expand Up @@ -71,7 +72,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
accounts := buildAccountStore(ctx, d, db)

// eventstore, votestore
_, vs := buildVoteStore(ctx, d, closers) // ev, vs
es, vs := buildVoteStore(ctx, d, closers) // ev, vs

// TxAPP
txApp := buildTxApp(ctx, d, db, accounts, vs, e)
Expand All @@ -80,14 +81,17 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
ss := buildSnapshotStore(d)

// BlockProcessor
bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss)
bp := buildBlockProcessor(ctx, d, db, txApp, accounts, vs, ss, es)

// Consensus
ce := buildConsensusEngine(ctx, d, db, mp, bs, bp, valSet)

// Node
node := buildNode(d, mp, bs, ce, ss, db)

// listeners
lm := buildListenerManager(d, es, txApp, node)

// RPC Services
rpcSvcLogger := d.logger.New("USER")
jsonRPCTxSvc := usersvc.NewService(db, e, node, bp, vs, rpcSvcLogger,
Expand Down Expand Up @@ -135,6 +139,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
closers: closers,
node: node,
ce: ce,
listeners: lm,
jsonRPCServer: jsonRPCServer,
jsonRPCAdminServer: jsonRPCAdminServer,
dbCtx: db,
Expand Down Expand Up @@ -202,26 +207,33 @@ func buildMetaStore(ctx context.Context, db *pg.DB) {
}
}

// service returns a common.Service with the given logger name
func (c *coreDependencies) service(loggerName string) *common.Service {
signer := auth.GetNodeSigner(c.privKey)

return &common.Service{
Logger: c.logger.New(loggerName),
GenesisConfig: c.genesisCfg,
LocalConfig: c.cfg,
Identity: signer.Identity(),
}
}

func buildTxApp(ctx context.Context, d *coreDependencies, db *pg.DB, accounts *accounts.Accounts,
votestore *voting.VoteStore, engine *execution.GlobalContext) *txapp.TxApp {
signer := auth.GetNodeSigner(d.privKey)
service := &common.Service{
Logger: d.logger.New("TXAPP"),
Identity: signer.Identity(),
// TODO: pass extension configs
// ExtensionConfigs: make(map[string]map[string]string),
}

txapp, err := txapp.NewTxApp(ctx, db, engine, signer, nil, service, accounts, votestore)
txapp, err := txapp.NewTxApp(ctx, db, engine, signer, nil, d.service("TxAPP"), accounts, votestore)
if err != nil {
failBuild(err, "failed to create txapp")
}

return txapp
}

func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore) *blockprocessor.BlockProcessor {
bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, d.genesisCfg, d.logger.New("BP"))
func buildBlockProcessor(ctx context.Context, d *coreDependencies, db *pg.DB, txapp *txapp.TxApp, accounts *accounts.Accounts, vs *voting.VoteStore, ss *snapshotter.SnapshotStore, es *voting.EventStore) *blockprocessor.BlockProcessor {
signer := auth.GetNodeSigner(d.privKey)
bp, err := blockprocessor.NewBlockProcessor(ctx, db, txapp, accounts, vs, ss, es, d.genesisCfg, signer, d.logger.New("BP"))
if err != nil {
failBuild(err, "failed to create block processor")
}
Expand Down Expand Up @@ -349,6 +361,10 @@ func buildSnapshotStore(d *coreDependencies) *snapshotter.SnapshotStore {
return ss
}

func buildListenerManager(d *coreDependencies, ev *voting.EventStore, txapp *txapp.TxApp, node *node.Node) *listeners.ListenerManager {
return listeners.NewListenerManager(d.service("ListenerManager"), ev, txapp, node)
}

func buildJRPCAdminServer(d *coreDependencies) *rpcserver.Server {
var wantTLS bool
addr := d.cfg.Admin.ListenAddress
Expand Down
13 changes: 13 additions & 0 deletions app/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kwilteam/kwil-db/core/log"
"github.com/kwilteam/kwil-db/node"
"github.com/kwilteam/kwil-db/node/consensus"
"github.com/kwilteam/kwil-db/node/listeners"
rpcserver "github.com/kwilteam/kwil-db/node/services/jsonrpc"
"github.com/kwilteam/kwil-db/version"

Expand All @@ -35,6 +36,7 @@ type server struct {
// subsystems
node *node.Node
ce *consensus.ConsensusEngine
listeners *listeners.ListenerManager
jsonRPCServer *rpcserver.Server
jsonRPCAdminServer *rpcserver.Server
}
Expand Down Expand Up @@ -164,6 +166,17 @@ func (s *server) Start(ctx context.Context) error {
return nil
})

// Start listener manager
group.Go(func() error {
go func() {
<-groupCtx.Done()
s.log.Info("stop listeners")
s.listeners.Stop()
}()
return s.listeners.Start()
})
s.log.Info("listener manager started")

// TODO: node is starting the consensus engine for ease of testing
// Start the consensus engine

Expand Down
16 changes: 9 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func DefaultConfig() *Config {
DiscoveryTimeout: Duration(30 * time.Second),
MaxRetries: 3,
},
Extensions: make(map[string]map[string]string),
}
}

Expand All @@ -170,13 +171,14 @@ type Config struct {
// ProfileMode string `toml:"profile_mode"`
// ProfileFile string `toml:"profile_file"`

P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"`
Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"`
DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"`
RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"`
Admin AdminConfig `toml:"admin" comment:"Admin RPC service configuration"`
Snapshots SnapshotConfig `toml:"snapshots" comment:"Snapshot creation and provider configuration"`
StateSync StateSyncConfig `toml:"state_sync" comment:"Statesync configuration (vs block sync)"`
P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"`
Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"`
DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"`
RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"`
Admin AdminConfig `toml:"admin" comment:"Admin RPC service configuration"`
Snapshots SnapshotConfig `toml:"snapshots" comment:"Snapshot creation and provider configuration"`
StateSync StateSyncConfig `toml:"state_sync" comment:"Statesync configuration (vs block sync)"`
Extensions map[string]map[string]string `toml:"extensions" comment:"extension configuration"`
}

// PeerConfig corresponds to the [peer] section of the config.
Expand Down
1 change: 0 additions & 1 deletion core/rpc/client/admin/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (cl *Client) Status(ctx context.Context) (*adminTypes.Status, error) {
Syncing: res.Sync.Syncing,
},
Validator: &adminTypes.ValidatorInfo{
Role: res.Validator.Role,
PubKey: res.Validator.PubKey,
Power: res.Validator.Power,
},
Expand Down
1 change: 0 additions & 1 deletion core/rpc/json/admin/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type SyncInfo struct {
type HealthResponse struct {
Version string `json:"version"`
Healthy bool `json:"healthy"`
Role string `json:"role"`
PubKey types.HexBytes `json:"pubkey"`
NumValidators int `json:"num_validators"`
}
Expand Down
32 changes: 32 additions & 0 deletions core/types/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,35 @@ func TestMarshalUnmarshalPayload(t *testing.T) {

assert.Equal(t, tp.val, tp2.val)
}

func TestValidatorVoteBodyMarshalUnmarshal(t *testing.T) {
voteBody := &types.ValidatorVoteBodies{
Events: []*types.VotableEvent{
{
Type: "emptydata",
Body: []byte(""),
},
{
Type: "test",
Body: []byte("test"),
},
{
Type: "test2",
Body: []byte("random large data, random large data,random large data,random large data,random large data,random large data,random large data,random large data,random large data,random large data,random large data,random large data,random large data,"),
},
},
}

data, err := voteBody.MarshalBinary()
require.NoError(t, err)

voteBody2 := &types.ValidatorVoteBodies{}
err = voteBody2.UnmarshalBinary(data)
require.NoError(t, err)

require.NotNil(t, voteBody2)
require.NotNil(t, voteBody2.Events)
require.Len(t, voteBody2.Events, 3)

require.Equal(t, voteBody.Events, voteBody2.Events)
}
57 changes: 55 additions & 2 deletions core/types/payloads.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"bytes"
"encoding"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -606,15 +607,67 @@ type ValidatorVoteBodies struct {
var _ Payload = (*ValidatorVoteBodies)(nil)

func (v *ValidatorVoteBodies) MarshalBinary() ([]byte, error) {
return serialize.Encode(v)
buf := new(bytes.Buffer)
// Length of events (uint32)
if err := binary.Write(buf, binary.LittleEndian, uint32(len(v.Events))); err != nil {
return nil, err
}
for _, event := range v.Events {
// Length of event type (uint32)
if err := binary.Write(buf, binary.LittleEndian, uint32(len(event.Type))); err != nil {
return nil, err
}
// Event type
if _, err := buf.WriteString(event.Type); err != nil {
return nil, err
}
// Length of event body (uint32)
if err := binary.Write(buf, binary.LittleEndian, uint32(len(event.Body))); err != nil {
return nil, err
}
// Event body
if _, err := buf.Write(event.Body); err != nil {
return nil, err
}
}

return buf.Bytes(), nil
}

func (v *ValidatorVoteBodies) Type() PayloadType {
return PayloadTypeValidatorVoteBodies
}

func (v *ValidatorVoteBodies) UnmarshalBinary(p0 []byte) error {
return serialize.Decode(p0, v)
buf := bytes.NewBuffer(p0)
var numEvents uint32
if err := binary.Read(buf, binary.LittleEndian, &numEvents); err != nil {
return err
}
v.Events = make([]*VotableEvent, numEvents)
for i := range v.Events {
var eventTypeLen uint32
if err := binary.Read(buf, binary.LittleEndian, &eventTypeLen); err != nil {
return err
}
eventType := make([]byte, eventTypeLen)
if _, err := buf.Read(eventType); err != nil {
return err
}
var eventBodyLen uint32
if err := binary.Read(buf, binary.LittleEndian, &eventBodyLen); err != nil {
return err
}
eventBody := make([]byte, eventBodyLen)
if _, err := buf.Read(eventBody); err != nil {
return err
}
v.Events[i] = &VotableEvent{
Type: string(eventType),
Body: eventBody,
}
}
return nil
}

// CreateResolution is a payload for creating a new resolution.
Expand Down
1 change: 0 additions & 1 deletion core/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type JoinRequest struct {
}

type Validator struct {
Role string `json:"role"`
PubKey HexBytes `json:"pubkey"`
Power int64 `json:"power"`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
// It will search for a local extension configuration named "eth_deposit".
func Start(ctx context.Context, service *common.Service, eventStore listeners.EventStore) error {
config := &EthDepositConfig{}
listenerConfig, ok := service.LocalConfig.AppConfig.Extensions[ListenerName]
listenerConfig, ok := service.LocalConfig.Extensions[ListenerName]
if !ok {
service.Logger.Warn("no eth_deposit configuration found, eth_deposit oracle will not start")
return nil // no configuration, so we don't start the oracle
Expand Down Expand Up @@ -72,7 +72,7 @@ func Start(ctx context.Context, service *common.Service, eventStore listeners.Ev
if err != nil {
return fmt.Errorf("failed to get current block height: %w", err)
}
service.Logger.S.Infof("ETH best block: %v", currentHeight)
service.Logger.Infof("ETH best block: %v", currentHeight)

if lastHeight > currentHeight-config.RequiredConfirmations {
return fmt.Errorf("starting height is greater than the last confirmed eth block height")
Expand Down Expand Up @@ -135,7 +135,7 @@ func Start(ctx context.Context, service *common.Service, eventStore listeners.Ev
// height range. This means inserting any that have not already been processed
// for broadcast in a Kwil vote ID / approval transaction, and then storing the
// processed height.
func processEvents(ctx context.Context, from, to int64, client *ethClient, eventStore listeners.EventStore, logger log.SugaredLogger) error {
func processEvents(ctx context.Context, from, to int64, client *ethClient, eventStore listeners.EventStore, logger log.Logger) error {
logs, err := client.GetCreditEventLogs(ctx, from, to)
if err != nil {
return fmt.Errorf("failed to get credit event logs: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(ad
type ethClient struct {
targetAddress ethcommon.Address
maxRetries int64
logger log.SugaredLogger
logger log.Logger
client *ethclient.Client
}

// newEthClient creates a new ethereum client
func newEthClient(ctx context.Context, rpcurl string, maxRetries int64, targetAddress ethcommon.Address, logger log.SugaredLogger) (*ethClient, error) {
func newEthClient(ctx context.Context, rpcurl string, maxRetries int64, targetAddress ethcommon.Address, logger log.Logger) (*ethClient, error) {
var client *ethclient.Client

// I don't set the max retries here because this only gets run on startup
Expand Down
Loading

0 comments on commit c0bfdb3

Please sign in to comment.