Skip to content

Commit

Permalink
change erc20-bridge confiog
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaiba committed Feb 20, 2025
1 parent da36253 commit fba0ec0
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 121 deletions.
17 changes: 7 additions & 10 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
// Consensus
ce := buildConsensusEngine(ctx, d, db, mp, bs, bp)

// Erc20 reward signer service
// Erc20 bridge signer service
erc20RWSignerMgr := buildErc20RWignerMgr(d)

// Node
Expand Down Expand Up @@ -509,13 +509,10 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB,
}

func buildErc20RWignerMgr(d *coreDependencies) *signersvc.ServiceMgr {
cfg := d.cfg.Erc20BridgeSigner
if !cfg.Enable {
return nil
}
cfg := d.cfg.Erc20Bridge

if err := cfg.Validate(); err != nil {
failBuild(err, "invalid erc20 reward signer config")
failBuild(err, "invalid erc20 bridge config")
}

// create shared state
Expand All @@ -524,21 +521,21 @@ func buildErc20RWignerMgr(d *coreDependencies) *signersvc.ServiceMgr {
if !fileExists(stateFile) {
emptyFile, err := os.Create(stateFile)
if err != nil {
failBuild(err, "Failed to create erc20 reward signer state file")
failBuild(err, "Failed to create erc20 bridge signer state file")
}
_ = emptyFile.Close()
}

state, err := signersvc.LoadStateFromFile(stateFile)
if err != nil {
failBuild(err, "Failed to load erc20 reward signer state file")
failBuild(err, "Failed to load erc20 bridge signer state file")
}

rpcUrl := "http://" + d.cfg.RPC.ListenAddress

mgr, err := signersvc.NewServiceMgr(rpcUrl, cfg.Targets, cfg.EthRpcs, cfg.PrivateKeys, time.Duration(cfg.SyncEvery), state, d.logger.New("EVMRW"))
mgr, err := signersvc.NewServiceMgr(rpcUrl, cfg, state, d.logger.New("EVMRW"))
if err != nil {
failBuild(err, "Failed to create erc20 reward signer service manager")
failBuild(err, "Failed to create erc20 bridge signer service manager")
}

return mgr
Expand Down
2 changes: 1 addition & 1 deletion app/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (s *server) Start(ctx context.Context) error {
})
s.log.Info("listener manager started")

// Start erc20 reward signer svc
// Start erc20 bridge signer svc
if s.erc20RWSigner != nil {
group.Go(func() error {
return s.erc20RWSigner.Start(groupCtx)
Expand Down
70 changes: 25 additions & 45 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,10 @@ func DefaultConfig() *Config {
Height: 0,
Hash: types.Hash{},
},
Erc20BridgeSigner: ERC20BridgeSignerConfig{
Enable: false,
PrivateKeys: nil,
Targets: nil,
// the reasonable value is the block time
SyncEvery: types.Duration(1 * time.Minute),
Erc20Bridge: ERC20BridgeConfig{
RPC: make(map[string]string),
BlockSyncChuckSize: make(map[string]string),
Signer: make(map[string]string),
},
}
}
Expand All @@ -330,19 +328,19 @@ type Config struct {
ProfileMode string `toml:"profile_mode,commented" comment:"profile mode (http, cpu, mem, mutex, or block)"`
ProfileFile string `toml:"profile_file,commented" comment:"profile output file path (e.g. cpu.pprof)"`

P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"`
Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"`
DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"`
Store StoreConfig `toml:"store" comment:"Block store configuration"`
RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"`
Admin AdminConfig `toml:"admin" comment:"Admin RPC service configuration"`
Snapshots SnapshotConfig `toml:"snapshots" comment:"Snapshot creation and provider configuration"`
StateSync StateSyncConfig `toml:"state_sync" comment:"Statesync configuration (vs block sync)"`
Extensions map[string]map[string]string `toml:"extensions" comment:"extension configuration"`
GenesisState string `toml:"genesis_state" comment:"path to the genesis state file, relative to the root directory"`
Migrations MigrationConfig `toml:"migrations" comment:"zero downtime migration configuration"`
Checkpoint Checkpoint `toml:"checkpoint" comment:"checkpoint info for the leader to sync to before proposing a new block"`
Erc20BridgeSigner ERC20BridgeSignerConfig `toml:"erc20_bridge_signer" comment:"ERC20 bridge signer service configuration"`
P2P PeerConfig `toml:"p2p" comment:"P2P related configuration"`
Consensus ConsensusConfig `toml:"consensus" comment:"Consensus related configuration"`
DB DBConfig `toml:"db" comment:"DB (PostgreSQL) related configuration"`
Store StoreConfig `toml:"store" comment:"Block store configuration"`
RPC RPCConfig `toml:"rpc" comment:"User RPC service configuration"`
Admin AdminConfig `toml:"admin" comment:"Admin RPC service configuration"`
Snapshots SnapshotConfig `toml:"snapshots" comment:"Snapshot creation and provider configuration"`
StateSync StateSyncConfig `toml:"state_sync" comment:"Statesync configuration (vs block sync)"`
Extensions map[string]map[string]string `toml:"extensions" comment:"extension configuration"`
GenesisState string `toml:"genesis_state" comment:"path to the genesis state file, relative to the root directory"`
Migrations MigrationConfig `toml:"migrations" comment:"zero downtime migration configuration"`
Checkpoint Checkpoint `toml:"checkpoint" comment:"checkpoint info for the leader to sync to before proposing a new block"`
Erc20Bridge ERC20BridgeConfig `toml:"erc20_bridge" comment:"ERC20 bridge configuration"`
}

// PeerConfig corresponds to the [p2p] section of the config.
Expand Down Expand Up @@ -451,34 +449,16 @@ type Checkpoint struct {
Hash types.Hash `toml:"hash" comment:"checkpoint block hash."`
}

type ERC20BridgeSignerConfig struct {
Enable bool `toml:"enable" comment:"enable the ERC20 bridge signer service"`
Targets []string `toml:"targets" comment:"target reward ext alias for the ERC20 reward"`
PrivateKeys []string `toml:"private_keys" comment:"private key for the ERC20 reward target"`
EthRpcs []string `toml:"eth_rpcs" comment:"eth rpc address for the ERC20 reward target"`
SyncEvery types.Duration `toml:"sync_every" comment:"sync interval; a recommend value is same as the block time"`
type ERC20BridgeConfig struct {
RPC map[string]string `toml:"rpc" comment:"evm RPC; format: chain_name='rpc_url'"`
BlockSyncChuckSize map[string]string `toml:"block_sync_chuck_size" comment:"block sync chunk size; format: chain_name='chunk_size'"`
Signer map[string]string `toml:"signer" comment:"signer service configuration; format: chain_name='target:file_path_to_private_key'"`
}

func (cfg ERC20BridgeSignerConfig) Validate() error {
if (len(cfg.PrivateKeys) != len(cfg.Targets)) && (len(cfg.EthRpcs) != len(cfg.Targets)) {
return fmt.Errorf("private keys and targets and eth_rpcs must be configured in triples")
}

if len(cfg.Targets) == 0 {
return fmt.Errorf("no target configured")
}

for i, target := range cfg.Targets {
if target == "" {
return fmt.Errorf("target %dth is empty", i)
}

if cfg.PrivateKeys[i] == "" {
return fmt.Errorf("private key %dth is empty", i)
}

if cfg.EthRpcs[i] == "" {
return fmt.Errorf("eth rpc %dth is empty", i)
func (cfg ERC20BridgeConfig) Validate() error {
for chain, _ := range cfg.Signer {

Check failure on line 459 in config/config.go

View workflow job for this annotation

GitHub Actions / test

File is not properly formatted (gofmt)
if _, ok := cfg.RPC[chain]; !ok {
return fmt.Errorf("signer service: chain '%s' is not in rpc", chain)
}
}

Expand Down
86 changes: 59 additions & 27 deletions node/exts/erc20-bridge/signersvc/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"encoding/hex"
"fmt"
"math/big"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"

ethAccounts "github.com/ethereum/go-ethereum/accounts"
ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/kwilteam/kwil-db/config"
"github.com/kwilteam/kwil-db/core/client"
clientType "github.com/kwilteam/kwil-db/core/client/types"
"github.com/kwilteam/kwil-db/core/crypto"
Expand Down Expand Up @@ -49,13 +52,12 @@ type rewardSigner struct {
safe *Safe

logger log.Logger
every time.Duration
state *State
}

// newRewardSigner returns a new rewardSigner.
func newRewardSigner(kwilRpc string, target string, ethRpc string, pkStr string,
every time.Duration, state *State, logger log.Logger) (*rewardSigner, error) {
state *State, logger log.Logger) (*rewardSigner, error) {
if logger == nil {
logger = log.DiscardLogger
}
Expand All @@ -79,7 +81,6 @@ func newRewardSigner(kwilRpc string, target string, ethRpc string, pkStr string,
signerAddr: address,
state: state,
logger: logger,
every: every,
target: target,
}, nil
}
Expand All @@ -89,19 +90,19 @@ func (s *rewardSigner) init() error {

pkBytes, err := hex.DecodeString(s.signerPkStr)
if err != nil {
return fmt.Errorf("decode erc20 reward signer private key failed: %w", err)
return fmt.Errorf("decode erc20 bridge signer private key failed: %w", err)
}

key, err := crypto.UnmarshalSecp256k1PrivateKey(pkBytes)
if err != nil {
return fmt.Errorf("parse erc20 reward signer private key failed: %w", err)
return fmt.Errorf("parse erc20 bridge signer private key failed: %w", err)
}

opts := &clientType.Options{Signer: &auth.EthPersonalSigner{Key: *key}}

clt, err := client.NewClient(ctx, s.kwilRpc, opts)
if err != nil {
return fmt.Errorf("create erc20 reward signer api client failed: %w", err)
return fmt.Errorf("create erc20 bridge signer api client failed: %w", err)
}

s.kwil = newERC20RWExtAPI(clt, s.target)
Expand Down Expand Up @@ -318,48 +319,79 @@ func (s *rewardSigner) sync(ctx context.Context) {

// ServiceMgr manages multiple rewardSigner instances running in parallel.
type ServiceMgr struct {
signers []*rewardSigner
logger log.Logger
syncInterval time.Duration
signers []*rewardSigner
logger log.Logger
}

func NewServiceMgr(
kwilRpc string,
targets []string,
ethRpcs []string,
pkStrs []string,
syncEvery time.Duration,
cfg config.ERC20BridgeConfig,
state *State,
logger log.Logger) (*ServiceMgr, error) {

signers := make([]*rewardSigner, len(targets))
for i, target := range targets {
pk := pkStrs[i]
svc, err := newRewardSigner(kwilRpc, target, ethRpcs[i], pk,
syncEvery, state, logger.New("EVMRW."+target))
signerCfgDelimiter := ":"

var signers []*rewardSigner
for chain, value := range cfg.Signer {
chainRpc, ok := cfg.RPC[chain]
if !ok {
return nil, fmt.Errorf("chain %s not found in rpc config", chain)
}

// we need http endpoint
if strings.HasPrefix(chainRpc, "wss://") {
chainRpc = strings.Replace(chainRpc, "wss://", "https://", 1)
}
if strings.HasPrefix(chainRpc, "ws") {
chainRpc = strings.Replace(chainRpc, "ws://", "http://", 1)
}

if !strings.Contains(value, signerCfgDelimiter) {
return nil, fmt.Errorf("invalid signer config: %s", value)
}

segs := strings.SplitN(value, signerCfgDelimiter, 2)

target := segs[0]
pkPath := segs[1]

if !ethCommon.FileExist(pkPath) {
return nil, fmt.Errorf("private key file %s not found", pkPath)
}

pkBytes, err := os.ReadFile(pkPath)
if err != nil {
return nil, fmt.Errorf("read private key file %s failed: %w", pkPath, err)
}

svc, err := newRewardSigner(kwilRpc, target, chainRpc, strings.TrimSpace(string(pkBytes)),
state, logger.New("EVMRW."+target))
if err != nil {
return nil, fmt.Errorf("create erc20 reward signer service failed: %w", err)
return nil, fmt.Errorf("create erc20 bridge signer service failed: %w", err)
}

signers[i] = svc
signers = append(signers, svc)
}

return &ServiceMgr{
signers: signers,
logger: logger,
signers: signers,
logger: logger,
syncInterval: time.Minute, // default to 1m
}, nil
}

// Start runs all rewardSigners. It returns error if there are issues initializing the rewardSigner;
// no errors are returned after the rewardSigner is running.
func (s *ServiceMgr) Start(ctx context.Context) error {
func (m *ServiceMgr) Start(ctx context.Context) error {
// since we need to wait on RPC running, we move the initialization logic into `init`

// To be able to run with docker, we need to apply a retry logic, since a new
// docker instance has no erc20 instance configured, but we need to config the
// erc20 instance target.
for { // naive way to keep trying the init
var err error
for _, s := range s.signers {
for _, s := range m.signers {
err = s.init()
if err != nil {
break
Expand All @@ -372,18 +404,18 @@ func (s *ServiceMgr) Start(ctx context.Context) error {

// if any error happens in init, we try again
time.Sleep(time.Second * 5)
s.logger.Warn("failed to initialize erc20 reward signer, will retry", "error", err.Error())
m.logger.Warn("failed to initialize erc20 bridge signer, will retry", "error", err.Error())
}

wg := &sync.WaitGroup{}

for _, s := range s.signers {
for _, s := range m.signers {
wg.Add(1)
go func() {
defer wg.Done()

s.logger.Info("start watching erc20 reward epoches")
tick := time.NewTicker(s.every)
tick := time.NewTicker(m.syncInterval)

for {
s.sync(ctx)
Expand All @@ -401,7 +433,7 @@ func (s *ServiceMgr) Start(ctx context.Context) error {
<-ctx.Done()
wg.Wait()

s.logger.Infof("Erc20 reward signer service shutting down...")
m.logger.Infof("Erc20 bridge signer service shutting down...")

return nil
}
2 changes: 1 addition & 1 deletion node/exts/evm-sync/evm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *statePoller) runPollFuncs(ctx context.Context, service *common.Service,
}

func makeNewClient(ctx context.Context, service *common.Service, chain chains.Chain) (*ethclient.Client, error) {
chainConf, err := getChainConf(service.LocalConfig.Extensions, chain)
chainConf, err := getChainConf(service.LocalConfig.Erc20Bridge, chain)
if err != nil {
return nil, fmt.Errorf("failed to get chain config for %s: %v", chain, err)
}
Expand Down
2 changes: 1 addition & 1 deletion node/exts/evm-sync/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ type listenerInfo struct {
func (l *listenerInfo) listen(ctx context.Context, service *common.Service, eventstore listeners.EventStore, syncConf *syncConfig) {
logger := service.Logger.New(l.uniqueName + "." + string(l.chain.Name))

chainConf, err := getChainConf(service.LocalConfig.Extensions, l.chain.Name)
chainConf, err := getChainConf(service.LocalConfig.Erc20Bridge, l.chain.Name)
if err != nil {
logger.Error("failed to get chain config", "err", err)
return
Expand Down
Loading

0 comments on commit fba0ec0

Please sign in to comment.