From 00329ee0acbfa85df5db62020e2d7f3c4bac551d Mon Sep 17 00:00:00 2001 From: charithabandi Date: Fri, 13 Dec 2024 10:19:02 -0600 Subject: [PATCH] Start listener manager, bring back eth_deposits --- app/node/build.go | 29 +- app/node/node.go | 13 + core/rpc/client/admin/jsonrpc/client.go | 1 - core/rpc/json/admin/responses.go | 1 - core/types/types.go | 1 - extensions/listeners/eth_deposits/deposits.go | 338 ++++++++++++++++++ .../listeners/eth_deposits/deposits_test.go | 28 ++ extensions/listeners/eth_deposits/ethereum.go | 227 ++++++++++++ node/listeners/mgr.go | 7 +- node/node.go | 3 +- node/services/jsonrpc/adminsvc/service.go | 9 +- node/txapp/txapp.go | 4 + node/voting/voting.go | 5 +- 13 files changed, 640 insertions(+), 26 deletions(-) create mode 100644 extensions/listeners/eth_deposits/deposits.go create mode 100644 extensions/listeners/eth_deposits/deposits_test.go create mode 100644 extensions/listeners/eth_deposits/ethereum.go diff --git a/app/node/build.go b/app/node/build.go index c2cc31649..c7a0ffcc1 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -24,6 +24,7 @@ import ( blockprocessor "github.com/kwilteam/kwil-db/node/block_processor" "github.com/kwilteam/kwil-db/node/consensus" "github.com/kwilteam/kwil-db/node/engine/execution" + "github.com/kwilteam/kwil-db/node/listeners" "github.com/kwilteam/kwil-db/node/mempool" "github.com/kwilteam/kwil-db/node/meta" "github.com/kwilteam/kwil-db/node/pg" @@ -88,6 +89,9 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { // Node node := buildNode(d, mp, bs, ce, ss, db) + // listeners + lm := buildListenerManager(d, es, txApp, node) + // RPC Services rpcSvcLogger := d.logger.New("USER") jsonRPCTxSvc := usersvc.NewService(db, e, node, bp, vs, rpcSvcLogger, @@ -135,6 +139,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { closers: closers, node: node, ce: ce, + listeners: lm, jsonRPCServer: jsonRPCServer, jsonRPCAdminServer: jsonRPCAdminServer, dbCtx: db, @@ -202,17 +207,23 @@ func buildMetaStore(ctx context.Context, db *pg.DB) { } } +// service returns a common.Service with the given logger name +func (c *coreDependencies) service(loggerName string) *common.Service { + signer := auth.GetNodeSigner(c.privKey) + + return &common.Service{ + Logger: c.logger.New(loggerName), + GenesisConfig: c.genesisCfg, + LocalConfig: c.cfg, + Identity: signer.Identity(), + } +} + func buildTxApp(ctx context.Context, d *coreDependencies, db *pg.DB, accounts *accounts.Accounts, votestore *voting.VoteStore, engine *execution.GlobalContext) *txapp.TxApp { signer := auth.GetNodeSigner(d.privKey) - service := &common.Service{ - Logger: d.logger.New("TXAPP"), - Identity: signer.Identity(), - // TODO: pass extension configs - // ExtensionConfigs: make(map[string]map[string]string), - } - txapp, err := txapp.NewTxApp(ctx, db, engine, signer, nil, service, accounts, votestore) + txapp, err := txapp.NewTxApp(ctx, db, engine, signer, nil, d.service("TxAPP"), accounts, votestore) if err != nil { failBuild(err, "failed to create txapp") } @@ -350,6 +361,10 @@ func buildSnapshotStore(d *coreDependencies) *snapshotter.SnapshotStore { return ss } +func buildListenerManager(d *coreDependencies, ev *voting.EventStore, txapp *txapp.TxApp, node *node.Node) *listeners.ListenerManager { + return listeners.NewListenerManager(d.service("ListenerManager"), ev, txapp, node) +} + func buildJRPCAdminServer(d *coreDependencies) *rpcserver.Server { var wantTLS bool addr := d.cfg.Admin.ListenAddress diff --git a/app/node/node.go b/app/node/node.go index 6cc83deec..d16067644 100644 --- a/app/node/node.go +++ b/app/node/node.go @@ -15,6 +15,7 @@ import ( "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/node" "github.com/kwilteam/kwil-db/node/consensus" + "github.com/kwilteam/kwil-db/node/listeners" rpcserver "github.com/kwilteam/kwil-db/node/services/jsonrpc" "github.com/kwilteam/kwil-db/version" @@ -35,6 +36,7 @@ type server struct { // subsystems node *node.Node ce *consensus.ConsensusEngine + listeners *listeners.ListenerManager jsonRPCServer *rpcserver.Server jsonRPCAdminServer *rpcserver.Server } @@ -164,6 +166,17 @@ func (s *server) Start(ctx context.Context) error { return nil }) + // Start listener manager + group.Go(func() error { + go func() { + <-groupCtx.Done() + s.log.Info("stop listeners") + s.listeners.Stop() + }() + return s.listeners.Start() + }) + s.log.Info("listener manager started") + // TODO: node is starting the consensus engine for ease of testing // Start the consensus engine diff --git a/core/rpc/client/admin/jsonrpc/client.go b/core/rpc/client/admin/jsonrpc/client.go index fb871fba5..d2f02a4b1 100644 --- a/core/rpc/client/admin/jsonrpc/client.go +++ b/core/rpc/client/admin/jsonrpc/client.go @@ -147,7 +147,6 @@ func (cl *Client) Status(ctx context.Context) (*adminTypes.Status, error) { Syncing: res.Sync.Syncing, }, Validator: &adminTypes.ValidatorInfo{ - Role: res.Validator.Role, PubKey: res.Validator.PubKey, Power: res.Validator.Power, }, diff --git a/core/rpc/json/admin/responses.go b/core/rpc/json/admin/responses.go index c1518291a..26f551700 100644 --- a/core/rpc/json/admin/responses.go +++ b/core/rpc/json/admin/responses.go @@ -45,7 +45,6 @@ type SyncInfo struct { type HealthResponse struct { Version string `json:"version"` Healthy bool `json:"healthy"` - Role string `json:"role"` PubKey types.HexBytes `json:"pubkey"` NumValidators int `json:"num_validators"` } diff --git a/core/types/types.go b/core/types/types.go index 928167444..d4441c387 100644 --- a/core/types/types.go +++ b/core/types/types.go @@ -40,7 +40,6 @@ type JoinRequest struct { } type Validator struct { - Role string `json:"role"` PubKey HexBytes `json:"pubkey"` Power int64 `json:"power"` } diff --git a/extensions/listeners/eth_deposits/deposits.go b/extensions/listeners/eth_deposits/deposits.go new file mode 100644 index 000000000..a9139b694 --- /dev/null +++ b/extensions/listeners/eth_deposits/deposits.go @@ -0,0 +1,338 @@ +// package ethdeposits implements an listener that listens to Ethereum events +// and triggers the creation of deposit events in Kwil. +package ethdeposits + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "strconv" + "strings" + "time" + + ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/kwilteam/kwil-db/common" + "github.com/kwilteam/kwil-db/core/log" + "github.com/kwilteam/kwil-db/extensions/listeners" + "github.com/kwilteam/kwil-db/extensions/resolutions/credit" +) + +const ListenerName = "eth_deposits" + +// use golang's init function, which runs before main, to register the extension +// see more here: https://www.digitalocean.com/community/tutorials/understanding-init-in-go +func init() { + // register the listener with the name "eth_deposit" + err := listeners.RegisterListener(ListenerName, Start) + if err != nil { + panic(err) + } +} + +// Start starts the eth_deposit listener, which triggers the creation of deposit events in Kwil. +// It can be configured to listen to a certain smart contract address. It will listen for the EVM event signature +// "Credit(address,uint256)" and create a deposit event in Kwil when it sees a matching event. It uses the +// "credit_account" resolution, defined in extensions/resolutions/credit/credit.go, to create the deposit event. +// It will search for a local extension configuration named "eth_deposit". +func Start(ctx context.Context, service *common.Service, eventStore listeners.EventStore) error { + config := &EthDepositConfig{} + listenerConfig, ok := service.LocalConfig.Extensions[ListenerName] + if !ok { + service.Logger.Warn("no eth_deposit configuration found, eth_deposit oracle will not start") + return nil // no configuration, so we don't start the oracle + } + err := config.setConfig(listenerConfig) + if err != nil { + return fmt.Errorf("failed to set eth_deposit configuration: %w", err) + } + + // we need to catch up with the ethereum chain. + // we will get the last seen height from the kv store + // we will either start from the last seen height, or from the configured starting height, + // whichever is greater + lastHeight, err := getLastStoredHeight(ctx, eventStore) + if err != nil { + return fmt.Errorf("failed to get last stored height: %w", err) + } + + if config.StartingHeight > lastHeight { + lastHeight = config.StartingHeight + } + + client, err := newEthClient(ctx, config.RPCProvider, config.MaxRetries, + ethcommon.HexToAddress(config.ContractAddress), service.Logger) + if err != nil { + return fmt.Errorf("failed to create ethereum client: %w", err) + } + defer client.Close() + + // get the current block height from the Ethereum client + currentHeight, err := client.GetLatestBlock(ctx) + if err != nil { + return fmt.Errorf("failed to get current block height: %w", err) + } + service.Logger.Infof("ETH best block: %v", currentHeight) + + if lastHeight > currentHeight-config.RequiredConfirmations { + return fmt.Errorf("starting height is greater than the last confirmed eth block height") + } + + // we will now sync all logs from the starting height to the current height, + // in chunks of config.BlockSyncChunkSize + for { + if lastHeight >= currentHeight-config.RequiredConfirmations { + break + } + + // get the next block chunk. if it is greater than the current height - required confirmations, + // we will set it to the current height - required confirmations + toBlock := lastHeight + config.BlockSyncChunkSize + if toBlock > currentHeight-config.RequiredConfirmations { + toBlock = currentHeight - config.RequiredConfirmations + } + + err = processEvents(ctx, lastHeight, toBlock, client, eventStore, service.Logger) + if err != nil { + return fmt.Errorf("failed to process events: %w", err) + } + + lastHeight = toBlock + } + + // ListenToBlocks will listen to new blocks and process the events. + // It only returns when the context is cancelled, or when the client cannot recover + // from an error after the max retries. + outerErr := client.ListenToBlocks(ctx, time.Duration(config.ReconnectionInterval)*time.Second, func(newHeight int64) error { + newHeight = newHeight - config.RequiredConfirmations // account for required confirmations + + // it is possible to receive the same height twice + if newHeight <= lastHeight { + service.Logger.Info("received duplicate block height", "height", newHeight) + return nil + } + + service.Logger.Info("received new block height", "height", newHeight) + + // lastheight + 1 because we have already processed the last height + err = processEvents(ctx, lastHeight+1, newHeight, client, eventStore, service.Logger) + if err != nil { + return fmt.Errorf("failed to process events: %w", err) + } + + lastHeight = newHeight + + return nil + }) + if outerErr != nil { + return fmt.Errorf("ListenToBlocks failure: %w", outerErr) + } + + return nil +} + +// processEvents will process all events from the Ethereum client from the given +// height range. This means inserting any that have not already been processed +// for broadcast in a Kwil vote ID / approval transaction, and then storing the +// processed height. +func processEvents(ctx context.Context, from, to int64, client *ethClient, eventStore listeners.EventStore, logger log.Logger) error { + logs, err := client.GetCreditEventLogs(ctx, from, to) + if err != nil { + return fmt.Errorf("failed to get credit event logs: %w", err) + } + + for _, log := range logs { + event, err := decodeCreditEvent(&log) + if err != nil { + return fmt.Errorf("failed to decode credit event: %w", err) + } + + bts, err := event.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + logger.Info("Flagging new account credit event for approval (to broadcast)", + "account", hex.EncodeToString(event.Account), "amount", event.Amount, "txHash", hex.EncodeToString(event.TxHash)) + err = eventStore.Broadcast(ctx, credit.CreditAccountEventType, bts) + if err != nil { + return fmt.Errorf("failed to mark new event for broadcast: %w", err) + } + } + + logger.Info("processed events", "from", from, "to", to, "events", len(logs)) + + return setLastStoredHeight(ctx, eventStore, to) +} + +// EthDepositConfig is the configuration for the eth_deposit listener. +// It can be read in from a map[string]string, which is passed from +// the node's local configuration. +type EthDepositConfig struct { + // StartingHeight is the Ethereum block height it will start listening from. + // Any events emitted before this height will be ignored. + // If not configured, it will start from block 0. + StartingHeight int64 + // ContractAddress is the Ethereum address of the smart contract it will listen to. + // It is a required configuration. + ContractAddress string + // RequiredConfirmations is the number of Ethereum blocks that must be mined before + // the listener will create a deposit event in Kwil. This is to protect against Ethereum + // network reorgs / soft forks. If not configured, it will default to 12. + // https://www.alchemy.com/overviews/what-is-a-reorg + RequiredConfirmations int64 + // RPCProvider is the URL of the Ethereum RPC endpoint it will connect to. + // This would likely be an Infura / Alchemy endpoint. + // It is a required configuration. + RPCProvider string + // ReconnectionInterval is the amount of time in seconds that the listener + // will wait before resubscribing for new Ethereum Blocks. Reconnects are + // automatically handled, but a subscription may stall, in which case we + // will make a new subscription. If the write or read on the connection to + // the RPC provider errors, the RPC client will reconnect, and we will + // continue to reestablish a new block subscription. If not configured, it + // will default to 60s. + ReconnectionInterval int64 + // MaxRetries is the total number of times the listener will attempt an RPC + // with the provider before giving up. It will exponentially back off after + // each try, starting at 1 second and doubling each time. If not configured, + // it will default to 10. + MaxRetries int64 + // BlockSyncChunkSize is the number of Ethereum blocks the listener will request from the + // Ethereum RPC endpoint at a time while catching up to the network. If not configured, + // it will default to 1,000,000. + BlockSyncChunkSize int64 +} + +// setConfig sets the configuration for the eth_deposit listener. +// If it doesn't find a required configuration, or if it finds an invalid +// configuration, it returns an error +func (e *EthDepositConfig) setConfig(m map[string]string) error { + startingHeight, ok := m["starting_height"] + if !ok { + startingHeight = "0" + } + + var err error + e.StartingHeight, err = strconv.ParseInt(startingHeight, 10, 64) + if err != nil { + return fmt.Errorf("invalid starting_height: %s", startingHeight) + } + if e.StartingHeight < 0 { + return fmt.Errorf("starting_height cannot be negative") + } + + contractAddress, ok := m["contract_address"] + if !ok { + return fmt.Errorf("no contract_address provided") + } + e.ContractAddress = contractAddress + + requiredConfirmations, ok := m["required_confirmations"] + if !ok { + requiredConfirmations = "12" + } + e.RequiredConfirmations, err = strconv.ParseInt(requiredConfirmations, 10, 64) + if err != nil { + return fmt.Errorf("invalid required_confirmations: %s", requiredConfirmations) + } + if e.RequiredConfirmations < 0 { + return fmt.Errorf("required_confirmations cannot be negative") + } + + rpc, ok := m["rpc_provider"] + if !ok { + return fmt.Errorf("no rpc_provider provided") + } + if !strings.HasPrefix(rpc, "ws") { + return fmt.Errorf("rpc_provider must be a websocket URL") + } + e.RPCProvider = rpc + + reconnectionInterval, ok := m["reconnection_interval"] + if !ok { + reconnectionInterval = "60" + } + intervalInt, err := strconv.ParseInt(reconnectionInterval, 10, 64) + if err != nil { + return fmt.Errorf("invalid reconnection_interval: %s", reconnectionInterval) + } + if intervalInt < 5 { + return fmt.Errorf("reconnection_interval must be greater than or equal to 5") + } + e.ReconnectionInterval = intervalInt + + maxRetries, ok := m["max_retries"] + if !ok { + maxRetries = "10" + } + e.MaxRetries, err = strconv.ParseInt(maxRetries, 10, 64) + if err != nil { + return fmt.Errorf("invalid max_retries: %s", maxRetries) + } + if e.MaxRetries < 0 { + return fmt.Errorf("max_retries cannot be negative") + } + + blockSyncChunkSize, ok := m["block_sync_chunk_size"] + if !ok { + blockSyncChunkSize = "1000000" // check this on goerli, it's big + } + e.BlockSyncChunkSize, err = strconv.ParseInt(blockSyncChunkSize, 10, 64) + if err != nil { + return fmt.Errorf("invalid block_sync_chunk_size: %s", blockSyncChunkSize) + } + if e.BlockSyncChunkSize <= 0 { + return fmt.Errorf("block_sync_chunk_size must be greater than 0") + } + + return nil +} + +// Map returns the configuration as a map[string]string. +// This is used for testing +func (e *EthDepositConfig) Map() map[string]string { + + return map[string]string{ + "starting_height": strconv.FormatInt(e.StartingHeight, 10), + "contract_address": e.ContractAddress, + "required_confirmations": strconv.FormatInt(e.RequiredConfirmations, 10), + "rpc_provider": e.RPCProvider, + "reconnection_interval": strconv.FormatInt(e.ReconnectionInterval, 10), + "max_retries": strconv.FormatInt(e.MaxRetries, 10), + "block_sync_chunk_size": strconv.FormatInt(e.BlockSyncChunkSize, 10), + } +} + +var ( + // lastHeightKey is the key used to store the last height processed by the listener + lastHeightKey = []byte("lh") +) + +// getLastStoredHeight gets the last height stored by the KV store +func getLastStoredHeight(ctx context.Context, eventStore listeners.EventStore) (int64, error) { + // get the last confirmed block height processed by the listener + lastHeight, err := eventStore.Get(ctx, lastHeightKey) + if err != nil { + return 0, fmt.Errorf("failed to get last block height: %w", err) + } + + if len(lastHeight) == 0 { + return 0, nil + } + + return int64(binary.LittleEndian.Uint64(lastHeight)), nil +} + +// setLastStoredHeight sets the last height stored by the KV store +func setLastStoredHeight(ctx context.Context, eventStore listeners.EventStore, height int64) error { + heightBts := make([]byte, 8) + binary.LittleEndian.PutUint64(heightBts, uint64(height)) + + // set the last confirmed block height processed by the listener + err := eventStore.Set(ctx, lastHeightKey, heightBts) + if err != nil { + return fmt.Errorf("failed to set last block height: %w", err) + } + return nil +} diff --git a/extensions/listeners/eth_deposits/deposits_test.go b/extensions/listeners/eth_deposits/deposits_test.go new file mode 100644 index 000000000..5e73f8219 --- /dev/null +++ b/extensions/listeners/eth_deposits/deposits_test.go @@ -0,0 +1,28 @@ +package ethdeposits + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// we could use better tests for this package. +// this would require more abstractions, which would then +// take away from this being an example of a simple event listener. +func Test_Config(t *testing.T) { + cfg := &EthDepositConfig{ + StartingHeight: 45, + ContractAddress: "0x1234", + RequiredConfirmations: 10, + RPCProvider: "ws://localhost:8545", + ReconnectionInterval: 10, + MaxRetries: 5, + BlockSyncChunkSize: 100, + } + m := cfg.Map() + + cfg2 := &EthDepositConfig{} + err := cfg2.setConfig(m) + require.NoError(t, err) + require.Equal(t, cfg, cfg2) +} diff --git a/extensions/listeners/eth_deposits/ethereum.go b/extensions/listeners/eth_deposits/ethereum.go new file mode 100644 index 000000000..6c0a0a973 --- /dev/null +++ b/extensions/listeners/eth_deposits/ethereum.go @@ -0,0 +1,227 @@ +package ethdeposits + +import ( + "context" + "fmt" + "math/big" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/jpillora/backoff" + + "github.com/kwilteam/kwil-db/core/log" + "github.com/kwilteam/kwil-db/extensions/resolutions/credit" +) + +// file contains functionality for subscribing to ethereum and reading logs + +func init() { + // parse the contract ABI + var err error + eventABI, err = abi.JSON(strings.NewReader(contractABIStr)) + if err != nil { + panic(err) + } +} + +// contractABIStr is the ABI of the smart contract the listener listens to. +// It follows the Ethereum ABI JSON format, and matches the `Credit(address,uint256)` event signature. +const contractABIStr = `[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"_from","type":"address"},{"indexed":false,"internalType":"uint256","name":"_amount","type":"uint256"}],"name":"Credit","type":"event"}]` + +// eventABI is the abi for the Credit event +var eventABI abi.ABI + +// creditEventSignature is the EVM event signature the listener listens to. +var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(address,uint256)")) + +// ethClient is a client for interacting with the ethereum blockchain +// it handles retries and resubscribing to the blockchain in case of +// transient errors +type ethClient struct { + targetAddress ethcommon.Address + maxRetries int64 + logger log.Logger + client *ethclient.Client +} + +// newEthClient creates a new ethereum client +func newEthClient(ctx context.Context, rpcurl string, maxRetries int64, targetAddress ethcommon.Address, logger log.Logger) (*ethClient, error) { + var client *ethclient.Client + + // I don't set the max retries here because this only gets run on startup + // the max retries are used for resubscribing to the blockchain + // if we fail 3 times here, it is likely a permanent error + err := retry(ctx, 3, func() error { + var innerErr error + client, innerErr = ethclient.DialContext(ctx, rpcurl) + return innerErr + }) + if err != nil { + return nil, err + } + + return ðClient{ + targetAddress: targetAddress, + maxRetries: maxRetries, + logger: logger, + client: client, + }, nil +} + +// GetLatestBlock gets the latest block number from the ethereum blockchain +func (ec *ethClient) GetLatestBlock(ctx context.Context) (int64, error) { + var blockNumber int64 + err := retry(ctx, ec.maxRetries, func() error { + header, err := ec.client.HeaderByNumber(ctx, nil) + if err != nil { + ec.logger.Error("Failed to get latest block", "error", err) + return err + } + blockNumber = header.Number.Int64() + return nil + }) + return blockNumber, err +} + +// GetCreditEventLogs gets the logs for the credit event from the ethereum blockchain. +// It can be given a start range and an end range to filter the logs by block height. +func (ec *ethClient) GetCreditEventLogs(ctx context.Context, fromBlock, toBlock int64) ([]types.Log, error) { + var logs []types.Log + err := retry(ctx, ec.maxRetries, func() error { + var err error + logs, err = ec.client.FilterLogs(ctx, ethereum.FilterQuery{ + ToBlock: big.NewInt(toBlock), + FromBlock: big.NewInt(fromBlock), + Addresses: []ethcommon.Address{ec.targetAddress}, + Topics: [][]ethcommon.Hash{{creditEventSignature}}, + }) + if err != nil { + ec.logger.Error("Failed to get credit event logs", "error", err) + } + + return err + }) + return logs, err +} + +// decodeCreditEvent decodes the credit event from the ethereum log +func decodeCreditEvent(l *types.Log) (*credit.AccountCreditResolution, error) { + data, err := eventABI.Unpack("Credit", l.Data) + if err != nil { + return nil, err + } + + // the first argument is the address, the second is the amount + address, ok := data[0].(ethcommon.Address) + if !ok { + return nil, fmt.Errorf("failed to parse credit event address") + } + amount, ok := data[1].(*big.Int) + if !ok { + return nil, fmt.Errorf("failed to parse credit event amount") + } + + return &credit.AccountCreditResolution{ + Account: address.Bytes(), + Amount: amount, + TxHash: l.TxHash.Bytes(), + }, nil +} + +// ListenToBlocks subscribes to new blocks on the ethereum blockchain. +// It takes a reconnectInterval, which is the amount of time it will wait +// to reconnect to the ethereum client if no new blocks are received. +// It takes a callback function that is called with the new block number. +// It can send duplicates, if that is received from the ethereum client. +// It will block until the context is cancelled, or until an error is +// returned from the callback function. +func (ec *ethClient) ListenToBlocks(ctx context.Context, reconnectInterval time.Duration, cb func(int64) error) error { + headers := make(chan *types.Header, 1) + sub, err := ec.client.SubscribeNewHead(ctx, headers) + if err != nil { + return err + } + + resubscribe := func() error { + var retryCount int + ec.logger.Warn("Resubscribing to Ethereum node", "attempt", retryCount) // anomalous + sub.Unsubscribe() + + return retry(ctx, ec.maxRetries, func() error { + retryCount++ + sub, err = ec.client.SubscribeNewHead(ctx, headers) + return err + }) + } + + reconn := time.NewTicker(reconnectInterval) + defer reconn.Stop() + + for { + select { + case <-ctx.Done(): + ec.logger.Debug("Context cancelled, stopping ethereum client") + return nil + case header := <-headers: + ec.logger.Debug("New block", "height", header.Number.Int64()) + err := cb(header.Number.Int64()) + if err != nil { + return err + } + + reconn.Reset(reconnectInterval) + case err := <-sub.Err(): + ec.logger.Error("Ethereum subscription error", "error", err) + err = resubscribe() + if err != nil { + return err + } + reconn.Reset(reconnectInterval) + case <-reconn.C: + ec.logger.Warn("No new blocks received, resubscribing") + err := resubscribe() + if err != nil { + return err + } + } + } +} + +// Close closes the ethereum client +func (ec *ethClient) Close() { + ec.client.Close() +} + +// retry will retry the function until it is successful, or reached the max retries +func retry(ctx context.Context, maxRetries int64, fn func() error) error { + retrier := &backoff.Backoff{ + Min: 1 * time.Second, + Max: 10 * time.Second, + Factor: 2, + Jitter: true, + } + + for { + err := fn() + if err == nil { + return nil + } + + // fail after maxRetries retries + if retrier.Attempt() > float64(maxRetries) { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retrier.Duration()): + } + } +} diff --git a/node/listeners/mgr.go b/node/listeners/mgr.go index 9edc03887..3f970342f 100644 --- a/node/listeners/mgr.go +++ b/node/listeners/mgr.go @@ -27,7 +27,7 @@ type ListenerManager struct { // ValidatorGetter is able to read the current validator set. type ValidatorGetter interface { - GetValidators(ctx context.Context) ([]*types.Validator, error) + GetValidators() []*types.Validator SubscribeValidators() <-chan []*types.Validator } @@ -136,10 +136,7 @@ func (omgr *ListenerManager) Start() (err error) { syncCheck.Stop() valChan = omgr.vstore.SubscribeValidators() // creates a new channel in txApp - validators, err := omgr.vstore.GetValidators(ctx) - if err != nil { - return err - } + validators := omgr.vstore.GetValidators() startStop(containsMe(validators)) } } diff --git a/node/node.go b/node/node.go index 3dbb09547..4bf88d8f0 100644 --- a/node/node.go +++ b/node/node.go @@ -475,10 +475,9 @@ func (n *Node) Status(ctx context.Context) (*adminTypes.Status, error) { BestBlockHash: blkHash[:], BestBlockHeight: height, // BestBlockTime: , - Syncing: false, // n.ce.Status().Syncing ??? need a node/exec pkg for block_executor stuff? + Syncing: n.ce.InCatchup(), }, Validator: &adminTypes.ValidatorInfo{ - Role: n.ce.Role().String(), PubKey: pkBytes, // Power: 1, }, diff --git a/node/services/jsonrpc/adminsvc/service.go b/node/services/jsonrpc/adminsvc/service.go index f94eebb57..d9aa29214 100644 --- a/node/services/jsonrpc/adminsvc/service.go +++ b/node/services/jsonrpc/adminsvc/service.go @@ -16,7 +16,6 @@ import ( types "github.com/kwilteam/kwil-db/core/types/admin" "github.com/kwilteam/kwil-db/extensions/resolutions" rpcserver "github.com/kwilteam/kwil-db/node/services/jsonrpc" - nodetypes "github.com/kwilteam/kwil-db/node/types" "github.com/kwilteam/kwil-db/node/types/sql" "github.com/kwilteam/kwil-db/node/voting" "github.com/kwilteam/kwil-db/version" @@ -141,7 +140,6 @@ func (svc *Service) HealthMethod(ctx context.Context, _ *userjson.HealthRequest) Healthy: happy, Version: apiSemver, PubKey: status.Validator.PubKey, - Role: status.Validator.Role, NumValidators: len(vals.Validators), }, nil // slices.ContainsFunc(vals.Validators, func(v *ktypes.Validator) bool { return bytes.Equal(v.PubKey, status.Validator.PubKey) }) @@ -254,16 +252,12 @@ func (svc *Service) Status(ctx context.Context, req *adminjson.StatusRequest) (* } var power int64 - switch status.Validator.Role { - case nodetypes.RoleLeader.String(), nodetypes.RoleValidator.String(): - power, _ = svc.voting.GetValidatorPower(ctx, status.Validator.PubKey) - } + power, _ = svc.voting.GetValidatorPower(ctx, status.Validator.PubKey) return &adminjson.StatusResponse{ Node: status.Node, Sync: convertSyncInfo(status.Sync), Validator: &adminjson.Validator{ // TODO: weed out the type dups - Role: status.Validator.Role, PubKey: status.Validator.PubKey, Power: power, }, @@ -401,7 +395,6 @@ func (svc *Service) ListValidators(ctx context.Context, req *adminjson.ListValid pbValidators := make([]*adminjson.Validator, len(vals)) for i, vi := range vals { pbValidators[i] = &adminjson.Validator{ - Role: vi.Role, PubKey: vi.PubKey, Power: vi.Power, } diff --git a/node/txapp/txapp.go b/node/txapp/txapp.go index 38eca5152..d0e5a3fa5 100644 --- a/node/txapp/txapp.go +++ b/node/txapp/txapp.go @@ -588,6 +588,10 @@ func (r *TxApp) announceValidators() { } } +func (r *TxApp) GetValidators() []*types.Validator { + return r.Validators.GetValidators() +} + func validatorSetPower(validators []*types.Validator) int64 { var totalPower int64 for _, v := range validators { diff --git a/node/voting/voting.go b/node/voting/voting.go index 9e3878de4..bf319349e 100644 --- a/node/voting/voting.go +++ b/node/voting/voting.go @@ -603,7 +603,10 @@ func (v *VoteStore) GetValidators() []*types.Validator { vals := make([]*types.Validator, 0) for _, val := range v.validatorSet { - vals = append(vals, val) + vals = append(vals, &types.Validator{ + PubKey: slices.Clone(val.PubKey), + Power: val.Power, + }) } return vals