diff --git a/.gitignore b/.gitignore index d480dffb1..6ed37dc0d 100644 --- a/.gitignore +++ b/.gitignore @@ -55,4 +55,7 @@ swagger-codegen-cli.jar vendor/ go.work -go.work.sum \ No newline at end of file +go.work.sum + +# mergetool artifacts +*.orig diff --git a/app/node/build.go b/app/node/build.go index 6dbd843df..f6da6b87c 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -2,15 +2,23 @@ package node import ( "context" + "crypto/tls" + "crypto/x509" "encoding/hex" "errors" "fmt" + "math/big" + "net" + "net/url" "os" "path/filepath" + "strings" + "time" "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" + "github.com/kwilteam/kwil-db/core/types" ktypes "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/extensions/precompiles" "github.com/kwilteam/kwil-db/node" @@ -23,11 +31,13 @@ import ( "github.com/kwilteam/kwil-db/node/snapshotter" "github.com/kwilteam/kwil-db/node/store" "github.com/kwilteam/kwil-db/node/txapp" + "github.com/kwilteam/kwil-db/node/types/sql" "github.com/kwilteam/kwil-db/node/voting" rpcserver "github.com/kwilteam/kwil-db/node/services/jsonrpc" + "github.com/kwilteam/kwil-db/node/services/jsonrpc/adminsvc" "github.com/kwilteam/kwil-db/node/services/jsonrpc/funcsvc" - usersvc "github.com/kwilteam/kwil-db/node/services/jsonrpc/usersvc" + "github.com/kwilteam/kwil-db/node/services/jsonrpc/usersvc" ) func buildServer(ctx context.Context, d *coreDependencies) *server { @@ -74,16 +84,16 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { node := buildNode(d, mp, bs, ce, ss, db) // RPC Services - rpcSvcLogger := d.logger.New("user-json-svc") + rpcSvcLogger := d.logger.New("USER") jsonRPCTxSvc := usersvc.NewService(db, e, node, txApp, vs, rpcSvcLogger, - // usersvc.WithReadTxTimeout(time.Duration(d.cfg.AppConfig.ReadTxTimeout)), + usersvc.WithReadTxTimeout(time.Duration(d.cfg.DB.ReadTxTimeout)), usersvc.WithPrivateMode(d.cfg.RPC.Private), - // usersvc.WithChallengeExpiry(time.Duration(d.cfg.AppConfig.ChallengeExpiry)), - // usersvc.WithChallengeRateLimit(d.cfg.AppConfig.ChallengeRateLimit), + usersvc.WithChallengeExpiry(d.cfg.RPC.ChallengeExpiry), + usersvc.WithChallengeRateLimit(d.cfg.RPC.ChallengeRateLimit), // usersvc.WithBlockAgeHealth(6*totalConsensusTimeouts.Dur()), ) - rpcServerLogger := d.logger.New("user-jsonrprc-server") + rpcServerLogger := d.logger.New("RPC") jsonRPCServer, err := rpcserver.NewServer(d.cfg.RPC.ListenAddress, rpcServerLogger, rpcserver.WithTimeout(d.cfg.RPC.Timeout), rpcserver.WithReqSizeLimit(d.cfg.RPC.MaxReqSize), @@ -95,27 +105,52 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { jsonRPCServer.RegisterSvc(jsonRPCTxSvc) jsonRPCServer.RegisterSvc(&funcsvc.Service{}) - // admin service and server - // signer := buildSigner(d) - // jsonAdminSvc := adminsvc.NewService(db, wrappedCmtClient, txApp, abciApp, p2p, nil, d.cfg, - // d.genesisCfg.ChainID, *d.log.Named("admin-json-svc")) - // jsonRPCAdminServer := buildJRPCAdminServer(d) - // jsonRPCAdminServer.RegisterSvc(jsonAdminSvc) - // jsonRPCAdminServer.RegisterSvc(jsonRPCTxSvc) - // jsonRPCAdminServer.RegisterSvc(&funcsvc.Service{}) + var jsonRPCAdminServer *rpcserver.Server + if d.cfg.Admin.Enable { + // admin service and server + adminServerLogger := d.logger.New("ADMIN") + // The admin service uses a client-style signer rather than just a private + // key because it is used to sign transactions and provide an Identity for + // account information (nonce and balance). + appIface := &mysteryThing{txApp, ce} + txSigner := &auth.EthPersonalSigner{Key: *d.privKey.(*crypto.Secp256k1PrivateKey)} + jsonAdminSvc := adminsvc.NewService(db, node, appIface, nil, txSigner, d.cfg, + d.genesisCfg.ChainID, adminServerLogger) + jsonRPCAdminServer = buildJRPCAdminServer(d) + jsonRPCAdminServer.RegisterSvc(jsonAdminSvc) + jsonRPCAdminServer.RegisterSvc(jsonRPCTxSvc) + jsonRPCAdminServer.RegisterSvc(&funcsvc.Service{}) + } s := &server{ - cfg: d.cfg, - closers: closers, - node: node, - ce: ce, - dbCtx: db, - log: d.logger, + cfg: d.cfg, + closers: closers, + node: node, + ce: ce, + jsonRPCServer: jsonRPCServer, + jsonRPCAdminServer: jsonRPCAdminServer, + dbCtx: db, + log: d.logger, } return s } +var _ adminsvc.App = (*mysteryThing)(nil) + +type mysteryThing struct { + txApp *txapp.TxApp + ce *consensus.ConsensusEngine +} + +func (mt *mysteryThing) AccountInfo(ctx context.Context, db sql.DB, identifier []byte, pending bool) (balance *big.Int, nonce int64, err error) { + return mt.txApp.AccountInfo(ctx, db, identifier, pending) +} + +func (mt *mysteryThing) Price(ctx context.Context, db sql.DB, tx *types.Transaction) (*big.Int, error) { + return mt.ce.Price(ctx, db, tx) +} + func buildDB(ctx context.Context, d *coreDependencies, closers *closeFuncs) *pg.DB { // TODO: restore from snapshots @@ -198,8 +233,18 @@ func buildConsensusEngine(_ context.Context, d *coreDependencies, db *pg.DB, acc } ceCfg := &consensus.Config{ - PrivateKey: d.privKey, - Leader: leaderPubKey, + PrivateKey: d.privKey, + Leader: leaderPubKey, + GenesisParams: &consensus.GenesisParams{ + ChainID: d.genesisCfg.ChainID, + Params: &consensus.NetworkParams{ + MaxBlockSize: d.genesisCfg.MaxBlockSize, + JoinExpiry: d.genesisCfg.JoinExpiry, + VoteExpiry: d.genesisCfg.VoteExpiry, + DisabledGasCosts: d.genesisCfg.DisabledGasCosts, + MaxVotesPerTx: d.genesisCfg.MaxVotesPerTx, + }, + }, DB: db, Accounts: accounts, BlockStore: bs, @@ -311,3 +356,161 @@ func buildSnapshotStore(d *coreDependencies) *snapshotter.SnapshotStore { return ss } + +func buildJRPCAdminServer(d *coreDependencies) *rpcserver.Server { + var wantTLS bool + addr := d.cfg.Admin.ListenAddress + host, port, err := net.SplitHostPort(addr) + if err != nil { + if strings.Contains(err.Error(), "missing port in address") { + host = addr + port = "8484" + } else if strings.Contains(err.Error(), "too many colons in address") { + u, err := url.Parse(addr) + if err != nil { + failBuild(err, "unknown admin service address "+addr) + } + host, port = u.Hostname(), u.Port() + wantTLS = u.Scheme == "https" + } else { + failBuild(err, "unknown admin service address "+addr) + } + } + + opts := []rpcserver.Opt{rpcserver.WithTimeout(10 * time.Minute)} // this is an administrator + + adminPass := d.cfg.Admin.Pass + if adminPass != "" { + opts = append(opts, rpcserver.WithPass(adminPass)) + } + + // Require TLS only if not UNIX or not loopback TCP interface. + if isUNIX := strings.HasPrefix(host, "/"); isUNIX { + addr = host + // no port and no TLS + if wantTLS { + failBuild(errors.New("unix socket with TLS is not supported"), "") + } + } else { // TCP + addr = net.JoinHostPort(host, port) + + var loopback bool + if netAddr, err := net.ResolveIPAddr("ip", host); err != nil { + d.logger.Warn("unresolvable host, assuming not loopback, but will likely fail to listen", + "host", host, "error", err) + } else { // e.g. "localhost" usually resolves to a loopback IP address + loopback = netAddr.IP.IsLoopback() + } + if !loopback || wantTLS { // use TLS for encryption, maybe also client auth + if d.cfg.Admin.NoTLS { + d.logger.Warn("disabling TLS on non-loopback admin service listen address", + "addr", addr, "with_password", adminPass != "") + } else { + withClientAuth := adminPass == "" // no basic http auth => use transport layer auth + opts = append(opts, rpcserver.WithTLS(tlsConfig(d, withClientAuth))) + } + } + } + + // Note that rpcserver.WithPass is not mutually exclusive with TLS in + // general, only mutual TLS. It could be a simpler alternative to mutual + // TLS, or just coupled with TLS termination on a local reverse proxy. + opts = append(opts, rpcserver.WithServerInfo(&adminsvc.SpecInfo)) + svcLogger := d.logger.New("ADMINRPC") + jsonRPCAdminServer, err := rpcserver.NewServer(addr, svcLogger, opts...) + if err != nil { + failBuild(err, "unable to create json-rpc server") + } + + return jsonRPCAdminServer +} + +func loadTLSCertificate(keyFile, certFile, hostname string) (*tls.Certificate, error) { + keyExists, certExists := fileExists(keyFile), fileExists(certFile) + if certExists != keyExists { // one but not both + return nil, fmt.Errorf("missing a key/cert pair file") + + } + if !keyExists { + // Auto-generate a new key/cert pair using any provided host name in the + // "Subject Alternate Name" section of the certificate (either IP or a + // hostname like kwild23.applicationX.org). + var extraHosts []string + if hostname != "" { + extraHosts = []string{hostname} + } + if err := genCertPair(certFile, keyFile, extraHosts); err != nil { + return nil, fmt.Errorf("failed to generate TLS key pair: %v", err) + } + // TODO: generate a separate CA certificate. Browsers don't like that + // the site certificate is also a CA, but Go clients are fine with it. + } + keyPair, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("failed to load TLS key pair: %v", err) + } + return &keyPair, nil +} + +// tlsConfig returns a tls.Config to be used with the admin RPC service. If +// withClientAuth is true, the config will require client authentication (mutual +// TLS), otherwise it is standard TLS for encryption and server authentication. +func tlsConfig(d *coreDependencies, withClientAuth bool) *tls.Config { + if d.adminKey == nil { + return nil + } + if !withClientAuth { + // TLS only for encryption and authentication of server to client. + return &tls.Config{ + Certificates: []tls.Certificate{*d.adminKey}, + } + } // else try to load authorized client certs/pubkeys + + var err error + // client certs + caCertPool := x509.NewCertPool() + var clientsCerts []byte + if clientsFile := filepath.Join(d.rootDir, defaultAdminClients); fileExists(clientsFile) { + clientsCerts, err = os.ReadFile(clientsFile) + if err != nil { + failBuild(err, "failed to load client CAs file") + } + } else /*else if d.autogen { + clientCredsFileBase := filepath.Join(d.rootDir, "auth") + clientCertFile, clientKeyFile := clientCredsFileBase+".cert", clientCredsFileBase+".key" + err = transport.GenTLSKeyPair(clientCertFile, clientKeyFile, "local kwild CA", nil) + if err != nil { + failBuild(err, "failed to generate admin client credentials") + } + d.logger.Info("generated admin service client key pair", log.String("cert", clientCertFile), log.String("key", clientKeyFile)) + if clientsCerts, err = os.ReadFile(clientCertFile); err != nil { + failBuild(err, "failed to read auto-generate client certificate") + } + if err = os.WriteFile(clientsFile, clientsCerts, 0644); err != nil { + failBuild(err, "failed to write client CAs file") + } + d.logger.Info("generated admin service client CAs file", log.String("file", clientsFile)) + } */ + { + d.logger.Info("No admin client CAs file. Use kwil-admin's node gen-auth-key command to generate") + } + + if len(clientsCerts) > 0 && !caCertPool.AppendCertsFromPEM(clientsCerts) { + failBuild(err, "invalid client CAs file") + } + + // TLS configuration for mTLS (mutual TLS) protocol-level authentication + return &tls.Config{ + Certificates: []tls.Certificate{*d.adminKey}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: caCertPool, + } +} + +func fileExists(file string) bool { + fi, err := os.Stat(file) + if err != nil { + return false + } + return !fi.IsDir() +} diff --git a/app/node/conf/conf.go b/app/node/conf/conf.go index e8627e316..fd5001133 100644 --- a/app/node/conf/conf.go +++ b/app/node/conf/conf.go @@ -110,9 +110,6 @@ func PreRunBindFlags(cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("error loading config: %v", err) } - if k.Bool("debug") { - k.Set("log_level", "debug") - } return nil } diff --git a/app/node/deps.go b/app/node/deps.go index 62af2d3ff..7012f8608 100644 --- a/app/node/deps.go +++ b/app/node/deps.go @@ -2,6 +2,7 @@ package node import ( "context" + "crypto/tls" "errors" "slices" @@ -61,6 +62,9 @@ type coreDependencies struct { genesisCfg *config.GenesisConfig privKey crypto.PrivateKey + adminKey *tls.Certificate + // autogen bool + logger log.Logger dbOpener dbOpener poolOpener poolOpener diff --git a/app/node/node.go b/app/node/node.go index 040b9b89e..6cc83deec 100644 --- a/app/node/node.go +++ b/app/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "crypto/tls" "encoding/hex" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/node" "github.com/kwilteam/kwil-db/node/consensus" + rpcserver "github.com/kwilteam/kwil-db/node/services/jsonrpc" "github.com/kwilteam/kwil-db/version" "golang.org/x/sync/errgroup" @@ -30,9 +32,11 @@ type server struct { Err() error } - // Modules - node *node.Node - ce *consensus.ConsensusEngine + // subsystems + node *node.Node + ce *consensus.ConsensusEngine + jsonRPCServer *rpcserver.Server + jsonRPCAdminServer *rpcserver.Server } func runNode(ctx context.Context, rootDir string, cfg *config.Config) error { @@ -55,7 +59,7 @@ func runNode(ctx context.Context, rootDir string, cfg *config.Config) error { logger.Infof("Starting kwild version %v", version.KwilVersion) - genFile := filepath.Join(rootDir, config.GenesisFileName) + genFile := rootedPath(config.GenesisFileName, rootDir) logger.Infof("Loading the genesis configuration from %s", genFile) @@ -72,11 +76,25 @@ func runNode(ctx context.Context, rootDir string, cfg *config.Config) error { logger.Info("Parsing the pubkey", "key", hex.EncodeToString(pubKey)) + var tlsKeyPair *tls.Certificate + logger.Info("loading TLS key pair for the admin server", "key_file", cfg.Admin.TLSKeyFile, + "cert_file", cfg.Admin.TLSKeyFile) + if cfg.Admin.TLSKeyFile != "" || cfg.Admin.TLSCertFile != "" { + customHostname := "" // cfg TODO + keyFile := rootedPath(cfg.Admin.TLSKeyFile, rootDir) + certFile := rootedPath(cfg.Admin.TLSCertFile, rootDir) + tlsKeyPair, err = loadTLSCertificate(keyFile, certFile, customHostname) + if err != nil { + return err + } + } + host, port, user, pass := cfg.DB.Host, cfg.DB.Port, cfg.DB.User, cfg.DB.Pass d := &coreDependencies{ ctx: ctx, rootDir: rootDir, + adminKey: tlsKeyPair, cfg: cfg, genesisCfg: genConfig, privKey: privKey, @@ -124,6 +142,17 @@ func (s *server) Start(ctx context.Context) error { }) // start rpc services + group.Go(func() error { + s.log.Info("starting user json-rpc server", "listen", s.cfg.RPC.ListenAddress) + return s.jsonRPCServer.Serve(groupCtx) + }) + + if s.cfg.Admin.Enable { + group.Go(func() error { + s.log.Info("starting admin json-rpc server", "listen", s.cfg.Admin.ListenAddress) + return s.jsonRPCAdminServer.Serve(groupCtx) + }) + } // start node (p2p) group.Go(func() error { @@ -153,3 +182,12 @@ func (s *server) Start(ctx context.Context) error { return nil } + +// rootedPath returns an absolute path for the given path, relative to the root +// directory if it was a relative path. +func rootedPath(path, rootDir string) string { + if filepath.IsAbs(path) { + return path + } + return filepath.Join(rootDir, path) +} diff --git a/app/node/start.go b/app/node/start.go index 8c903e0ff..bff949a54 100644 --- a/app/node/start.go +++ b/app/node/start.go @@ -30,8 +30,7 @@ func StartCmd() *cobra.Command { } cfg := conf.ActiveConfig() - root2 := conf.RootDir() - fmt.Println(rootDir, "vs", root2) + // root2 := conf.RootDir(); fmt.Println(rootDir, "vs", root2) bind.Debugf("effective node config (toml):\n%s", bind.LazyPrinter(func() string { rawToml, err := cfg.ToTOML() diff --git a/_previous/cmd/kwild/server/tls.go b/app/node/tls.go similarity index 90% rename from _previous/cmd/kwild/server/tls.go rename to app/node/tls.go index 88d1657ae..6e4da3b80 100644 --- a/_previous/cmd/kwild/server/tls.go +++ b/app/node/tls.go @@ -1,4 +1,4 @@ -package server +package node import ( "github.com/kwilteam/kwil-db/core/rpc/transport" @@ -12,6 +12,6 @@ const defaultAdminClients = "clients.pem" // genCertPair generates a key/cert pair to the paths provided. func genCertPair(certFile, keyFile string, altDNSNames []string) error { - org := "kwild gRPC server autogenerated cert" + org := "kwild server autogenerated cert" return transport.GenTLSKeyPair(certFile, keyFile, org, altDNSNames) } diff --git a/app/setup/reset.go b/app/setup/reset.go index 6dc1e65e4..e7e9d8eb6 100644 --- a/app/setup/reset.go +++ b/app/setup/reset.go @@ -31,23 +31,19 @@ func ResetCmd() *cobra.Command { return fmt.Errorf("root directory %s does not exist", rootDir) } - // remove state.json file if it exists - stateFile := filepath.Join(rootDir, "state.json") - if _, err := os.Stat(stateFile); err == nil { - if err := os.Remove(stateFile); err != nil { - return err - } - fmt.Println("state.json removed") + // TODO: reset app DB + + if !all { + return nil } // remove the blockstore if all is set chainDir := filepath.Join(rootDir, "blockstore") - if _, err := os.Stat(chainDir); err == nil { - if err := os.RemoveAll(chainDir); err != nil { - return err - } - fmt.Println("blockstore removed") + if err := os.RemoveAll(chainDir); err != nil { + return err } + fmt.Println("blockstore removed") + return nil }, } diff --git a/app/setup/testnet.go b/app/setup/testnet.go index 4787171b5..a06b239f5 100644 --- a/app/setup/testnet.go +++ b/app/setup/testnet.go @@ -5,8 +5,10 @@ import ( "encoding/binary" "fmt" "math/rand/v2" + "net" "os" "path/filepath" + "strconv" "github.com/kwilteam/kwil-db/app/custom" "github.com/kwilteam/kwil-db/app/shared/bind" @@ -24,23 +26,29 @@ func TestnetCmd() *cobra.Command { var numVals, numNVals int var noPex bool var startingPort uint64 + var outDir string cmd := &cobra.Command{ Use: "testnet", Short: "Generate configuration for multiple nodes", RunE: func(cmd *cobra.Command, args []string) error { - rootDir, err := bind.RootDir(cmd) - if err != nil { - return err // the parent command needs to set a persistent flag named "root" - } - return generateNodeConfig(rootDir, numVals, numNVals, noPex, startingPort) + return generateNodeConfig(outDir, numVals, numNVals, noPex, startingPort) }, } + // root dir does not apply to this command. NOTE: probably this means we + // need a top level "node" command with root and all the other config binds. + // On the other hand, `setup reset` applies to a specific node instance... + cmd.SetHelpFunc(func(c *cobra.Command, s []string) { + cmd.InheritedFlags().MarkHidden(bind.RootFlagName) + cmd.Parent().HelpFunc()(c, s) + }) + cmd.Flags().IntVarP(&numVals, "vals", "v", 3, "number of validators (includes the one leader)") - cmd.Flags().IntVarP(&numNVals, "non-vals", "n", 0, "number of non-validators") + cmd.Flags().IntVarP(&numNVals, "non-vals", "n", 0, "number of non-validators (default 0)") cmd.Flags().BoolVar(&noPex, "no-pex", false, "disable peer exchange") - cmd.Flags().Uint64VarP(&startingPort, "port", "p", 6600, "starting port for the nodes") + cmd.Flags().Uint64VarP(&startingPort, "port", "p", 6600, "starting P2P port for the nodes") + cmd.Flags().StringVarP(&outDir, "out-dir", "o", ".testnet", "output directory for generated node root directories") return cmd } @@ -75,8 +83,14 @@ func generateNodeConfig(rootDir string, numVals, numNVals int, noPex bool, start leaderPubType := leaderPub.Type() genConfig := &config.GenesisConfig{ - Leader: leaderPub.Bytes(), // rethink this so it can be different key types? - Validators: make([]ktypes.Validator, numVals), + ChainID: "kwil-testnet", + Leader: leaderPub.Bytes(), // rethink this so it can be different key types? + Validators: make([]ktypes.Validator, numVals), + DisabledGasCosts: true, + JoinExpiry: 14400, + VoteExpiry: 108000, + MaxBlockSize: 6 * 1024 * 1024, + MaxVotesPerTx: 200, } for i := range numVals { @@ -93,10 +107,11 @@ func generateNodeConfig(rootDir string, numVals, numNVals int, noPex bool, start return err } - privKey := keys[i].Bytes() - cfg := custom.DefaultConfig() // not config.DefaultConfig(), so custom command config is used - cfg.PrivateKey = privKey + + cfg.PrivateKey = keys[i].Bytes() + + // P2P cfg.P2P.Port = startingPort + uint64(i) cfg.P2P.IP = "127.0.0.1" cfg.P2P.Pex = !noPex @@ -106,6 +121,15 @@ func generateNodeConfig(rootDir string, numVals, numNVals int, noPex bool, start leaderPub.Bytes(), leaderPubType, cfg.P2P.IP, int(startingPort))} } + // DB + cfg.DB.Port = strconv.Itoa(5432 + i) + + // RPC + cfg.RPC.ListenAddress = net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(8484+i), 10)) + + // Admin RPC + cfg.Admin.ListenAddress = net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(8584+i), 10)) + if err := cfg.SaveAs(filepath.Join(nodeDir, config.ConfigFileName)); err != nil { return err } diff --git a/config/config.go b/config/config.go index 6b6cdcb71..597245f33 100644 --- a/config/config.go +++ b/config/config.go @@ -36,12 +36,25 @@ func (d *Duration) UnmarshalText(text []byte) error { } type GenesisConfig struct { + ChainID string `json:"chain_id"` // Leader is the leader's public key. Leader types.HexBytes `json:"leader"` // Validators is the list of genesis validators (including the leader). Validators []ktypes.Validator `json:"validators"` - // TODO: more params like max block size, etc. + // MaxBlockSize is the maximum size of a block in bytes. + MaxBlockSize int64 `json:"max_block_size"` + // JoinExpiry is the number of blocks after which the validators + // join request expires if not approved. + JoinExpiry int64 `json:"join_expiry"` + // VoteExpiry is the default number of blocks after which the validators + // vote expires if not approved. + VoteExpiry int64 `json:"vote_expiry"` + // DisabledGasCosts dictates whether gas costs are disabled. + DisabledGasCosts bool `json:"disabled_gas_costs"` + // MaxVotesPerTx is the maximum number of votes that can be included in a + // single transaction. + MaxVotesPerTx int64 `json:"max_votes_per_tx"` } func (nc *GenesisConfig) SaveAs(filename string) error { @@ -67,6 +80,12 @@ func LoadGenesisConfig(filename string) (*GenesisConfig, error) { return &nc, nil } +// const ( +// defaultUserRPCPort = 8484 +// defaultAdminRPCPort = 8584 +// defaultP2PRPCPort = 6600 +// ) + // DefaultConfig generates an instance of the default config. func DefaultConfig() *Config { return &Config{ @@ -93,6 +112,22 @@ func DefaultConfig() *Config { ReadTxTimeout: Duration(45 * time.Second), MaxConns: 60, }, + RPC: RPCConfig{ + ListenAddress: "0.0.0.0:8484", + Timeout: 20 * time.Second, + MaxReqSize: 6_000_000, + Private: false, + ChallengeExpiry: 30 * time.Second, + ChallengeRateLimit: 10, + }, + Admin: AdminConfig{ + Enable: true, + ListenAddress: "/tmp/kwil2-admin.socket", + Pass: "", + NoTLS: false, + TLSCertFile: "admin.cert", + TLSKeyFile: "admin.key", + }, Snapshots: SnapshotConfig{ Enable: false, RecurringHeight: 14400, @@ -135,6 +170,7 @@ type Config struct { Consensus ConsensusConfig `koanf:"consensus" toml:"consensus"` DB DBConfig `koanf:"db" toml:"db"` RPC RPCConfig `koanf:"rpc" toml:"rpc"` + Admin AdminConfig `koanf:"admin" toml:"admin"` Snapshots SnapshotConfig `koanf:"snapshots" toml:"snapshots"` StateSync StateSyncConfig `koanf:"state_sync" toml:"state_sync"` } @@ -187,6 +223,15 @@ type RPCConfig struct { ChallengeRateLimit float64 `koanf:"challenge_rate_limit" toml:"challenge_rate_limit"` } +type AdminConfig struct { + Enable bool `koanf:"enable" toml:"enable"` + ListenAddress string `koanf:"listen" toml:"listen"` + Pass string `koanf:"pass" toml:"pass"` + NoTLS bool `koanf:"notls" toml:"notls"` + TLSCertFile string `koanf:"cert" toml:"cert"` + TLSKeyFile string `koanf:"key" toml:"key"` +} + type SnapshotConfig struct { Enable bool `koanf:"enable" toml:"enable"` RecurringHeight uint64 `koanf:"recurring_height" toml:"recurring_height"` diff --git a/node/admin/client.go b/node/admin/client.go index 64ae7a485..86478e00f 100644 --- a/node/admin/client.go +++ b/node/admin/client.go @@ -125,14 +125,14 @@ func prepareHTTPDialerURL(target string) (*neturl.URL, dialerFunc, error) { } else if parsedURL.Port() == "" { // Taking this liberty for admin tool, which is unlikely to be exposed // with DNS and implied port 80/443. - parsedURL.Host += ":8485" + parsedURL.Host += ":8584" } return parsedURL, dialer, nil } // NewClient creates a new admin client . The target arg is usually either -// "127.0.0.1:8485" or "/path/to/socket.socket". The scheme http:// or https:// +// "127.0.0.1:8585" or "/path/to/socket.socket". The scheme http:// or https:// // may be included, to dictate if TLS is required or not. If no scheme is given, // http:// is assumed. UNIX socket transport may not use TLS. The endpoint path // is a separate argument to distinguish it from the UNIX socket file path. diff --git a/node/admin/client_test.go b/node/admin/client_test.go index 2294f20b2..9abe9b459 100644 --- a/node/admin/client_test.go +++ b/node/admin/client_test.go @@ -18,21 +18,21 @@ func Test_prepareHTTPDialerURL(t *testing.T) { { "just ip", "127.0.0.1", - "http://127.0.0.1:8485", + "http://127.0.0.1:8584", false, false, }, { "just hostname", "localhost", - "http://localhost:8485", + "http://localhost:8584", false, false, }, { "http hostname", "http://localhost", - "http://localhost:8485", + "http://localhost:8584", false, false, }, diff --git a/node/consensus/block_executor.go b/node/consensus/block_executor.go index aa29a4dac..15c284beb 100644 --- a/node/consensus/block_executor.go +++ b/node/consensus/block_executor.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "math/big" "sort" "github.com/kwilteam/kwil-db/common" @@ -13,6 +14,7 @@ import ( ktypes "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/node/meta" "github.com/kwilteam/kwil-db/node/types" + "github.com/kwilteam/kwil-db/node/types/sql" ) // Block processing methods @@ -129,15 +131,10 @@ func (ce *ConsensusEngine) executeBlock() (err error) { // TODO: Notify the changesets to the migrator blockCtx := &common.BlockContext{ // TODO: fill in the network params once we have them - Height: ce.state.blkProp.height, - ChainContext: &common.ChainContext{ - NetworkParameters: &common.NetworkParameters{ - DisabledGasCosts: true, - JoinExpiry: 14400, - MaxBlockSize: 6 * 1024 * 1024, - }, - }, - Proposer: ce.leader.Bytes(), + Height: ce.state.blkProp.height, + Timestamp: blkProp.blk.Header.Timestamp.Unix(), + ChainContext: ce.chainCtx, + Proposer: ce.leader.Bytes(), } _, err = ce.txapp.Finalize(ctx, ce.state.consensusTx, blockCtx) if err != nil { @@ -192,6 +189,11 @@ func (ce *ConsensusEngine) executeBlock() (err error) { return nil } +// We probably don't want to do this long term +func (ce *ConsensusEngine) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transaction) (*big.Int, error) { + return ce.txapp.Price(ctx, dbTx, tx, ce.chainCtx) +} + // nextAppHash calculates the appHash that encapsulates the state changes occurred during the block execution. // sha256(prevAppHash || changesetHash || valUpdatesHash || accountsHash || txResultsHash) func (ce *ConsensusEngine) nextAppHash(prevAppHash, changesetHash, valUpdatesHash, accountsHash, txResultsHash types.Hash) types.Hash { diff --git a/node/consensus/engine.go b/node/consensus/engine.go index c77d3b3ba..7de30ec30 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" "github.com/kwilteam/kwil-db/core/log" @@ -59,6 +60,8 @@ type ConsensusEngine struct { // copy of the state info for the p2p layer usage. stateInfo StateInfo + chainCtx *common.ChainContext + // Channels msgChan chan consensusMessage haltChan chan struct{} // can take a msg or reason for halting the network @@ -153,6 +156,39 @@ type lastCommit struct { blk *types.Block // why is this needed? can be fetched from the blockstore too. } +type NetworkParams struct { + // MaxBlockSize is the maximum size of a block in bytes. + MaxBlockSize int64 + // JoinExpiry is the number of blocks after which the validators + // join request expires if not approved. + JoinExpiry int64 + // VoteExpiry is the default number of blocks after which the validators + // vote expires if not approved. + VoteExpiry int64 + // DisabledGasCosts dictates whether gas costs are disabled. + DisabledGasCosts bool + // MaxVotesPerTx is the maximum number of votes that can be included in a + // single transaction. + MaxVotesPerTx int64 +} + +type GenesisParams struct { + ChainID string + Params *NetworkParams +} + +// TODO: remove this +var defaultGenesisParams = &GenesisParams{ + ChainID: "test-chain", + Params: &NetworkParams{ + DisabledGasCosts: true, + JoinExpiry: 14400, + VoteExpiry: 108000, + MaxBlockSize: 6 * 1024 * 1024, + MaxVotesPerTx: 200, + }, +} + // Config is the struct given to the constructor, [New]. type Config struct { // Signer is the private key of the node. @@ -160,6 +196,8 @@ type Config struct { // Leader is the public key of the leader. Leader crypto.PublicKey + GenesisParams *GenesisParams // *config.GenesisConfig + DB *pg.DB // Mempool is the mempool of the node. Mempool Mempool @@ -216,6 +254,10 @@ func New(cfg *Config) *ConsensusEngine { signer := auth.GetSigner(cfg.PrivateKey) + if cfg.GenesisParams == nil { + cfg.GenesisParams = defaultGenesisParams // TODO: remove + } + // rethink how this state is initialized ce := &ConsensusEngine{ signer: signer, @@ -239,6 +281,17 @@ func New(cfg *Config) *ConsensusEngine { status: Committed, blkProp: nil, }, + chainCtx: &common.ChainContext{ + ChainID: cfg.GenesisParams.ChainID, + NetworkParameters: &common.NetworkParameters{ + MaxBlockSize: cfg.GenesisParams.Params.MaxBlockSize, + JoinExpiry: cfg.GenesisParams.Params.JoinExpiry, + VoteExpiry: cfg.GenesisParams.Params.VoteExpiry, + DisabledGasCosts: cfg.GenesisParams.Params.DisabledGasCosts, + MaxVotesPerTx: cfg.GenesisParams.Params.MaxVotesPerTx, + }, + // MigrationParams: + }, validatorSet: maps.Clone(cfg.ValidatorSet), msgChan: make(chan consensusMessage, 1), // buffer size?? haltChan: make(chan struct{}, 1), diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index b04d8eba6..4d4b40731 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "errors" "fmt" + "math/big" "os" "path/filepath" "sync" @@ -109,6 +110,7 @@ func generateTestCEConfig(t *testing.T, nodes int, leaderDB bool) []*Config { assert.NoError(t, err) ceConfigs[i] = &Config{ + GenesisParams: defaultGenesisParams, PrivateKey: privKeys[i], Leader: pubKeys[0], Mempool: mempool.New(), @@ -768,6 +770,10 @@ func (d *dummyTxApp) Finalize(ctx context.Context, db sql.DB, block *common.Bloc return d.vals, nil } +func (d *dummyTxApp) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transaction, chainContext *common.ChainContext) (*big.Int, error) { + return big.NewInt(0), nil +} + func (d *dummyTxApp) Commit() error { return nil } diff --git a/node/consensus/interfaces.go b/node/consensus/interfaces.go index 1d60edd63..1bca9d891 100644 --- a/node/consensus/interfaces.go +++ b/node/consensus/interfaces.go @@ -2,6 +2,7 @@ package consensus import ( "context" + "math/big" "github.com/kwilteam/kwil-db/common" ktypes "github.com/kwilteam/kwil-db/core/types" @@ -59,6 +60,8 @@ type TxApp interface { Execute(ctx *common.TxContext, db sql.DB, tx *ktypes.Transaction) *txapp.TxResponse Finalize(ctx context.Context, db sql.DB, block *common.BlockContext) (finalValidators []*ktypes.Validator, err error) Commit() error + + Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transaction, chainContext *common.ChainContext) (*big.Int, error) } // Question: diff --git a/node/node_live_test.go b/node/node_live_test.go index bbe87ab7a..771ad7be4 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -5,6 +5,7 @@ package node import ( "context" "encoding/hex" + "math/big" "os" "sync" "testing" @@ -259,6 +260,10 @@ func (d *dummyTxApp) Finalize(ctx context.Context, db sql.DB, block *common.Bloc return d.vals, nil } +func (d *dummyTxApp) Price(ctx context.Context, dbTx sql.DB, tx *ktypes.Transaction, chainContext *common.ChainContext) (*big.Int, error) { + return big.NewInt(0), nil +} + func (d *dummyTxApp) Commit() error { return nil } diff --git a/node/pg/replmon.go b/node/pg/replmon.go index 670327020..e5c6c029f 100644 --- a/node/pg/replmon.go +++ b/node/pg/replmon.go @@ -80,7 +80,6 @@ func newReplMon(ctx context.Context, host, port, user, pass, dbName string, sche var slotName = publicationName + random.String(8) // arbitrary, so just avoid collisions commitChan, errChan, quit, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter, cs) if err != nil { - quit() conn.Close(ctx) return nil, err } diff --git a/node/services/jsonrpc/adminsvc/service.go b/node/services/jsonrpc/adminsvc/service.go index 97e47eb5c..e8f7279b3 100644 --- a/node/services/jsonrpc/adminsvc/service.go +++ b/node/services/jsonrpc/adminsvc/service.go @@ -24,20 +24,28 @@ import ( // BlockchainTransactor specifies the methods required for the admin service to // interact with the blockchain. -type BlockchainTransactor interface { +type Node interface { Status(context.Context) (*types.Status, error) Peers(context.Context) ([]*types.PeerInfo, error) - BroadcastTx(ctx context.Context, tx []byte, sync uint8) (*coretypes.ResultBroadcastTx, error) + BroadcastTx(ctx context.Context, tx *coretypes.Transaction, sync uint8) (*coretypes.ResultBroadcastTx, error) } -type TxApp interface { +type P2P interface { + // AddPeer adds a peer to the node's peer list and persists it. + AddPeer(ctx context.Context, nodeID string) error + + // RemovePeer removes a peer from the node's peer list permanently. + RemovePeer(ctx context.Context, nodeID string) error + + // ListPeers returns the list of peers in the node's whitelist. + ListPeers(ctx context.Context) []string +} + +type App interface { // AccountInfo returns the unconfirmed account info for the given identifier. // If unconfirmed is true, the account found in the mempool is returned. // Otherwise, the account found in the blockchain is returned. AccountInfo(ctx context.Context, db sql.DB, identifier []byte, unconfirmed bool) (balance *big.Int, nonce int64, err error) -} - -type Pricer interface { Price(ctx context.Context, db sql.DB, tx *coretypes.Transaction) (*big.Int, error) } @@ -47,23 +55,13 @@ type Validators interface { GetValidators() []*coretypes.Validator } -type P2P interface { - // AddPeer adds a peer to the node's peer list and persists it. - AddAndPersistPeer(ctx context.Context, nodeID string) error - // RemovePeer removes a peer from the node's peer list permanently. - RemovePersistedPeer(ctx context.Context, nodeID string) error - // ListPeers returns the list of peers in the node's whitelist. - ListPeers(ctx context.Context) []string -} - type Service struct { log log.Logger - blockchain BlockchainTransactor // node is the local node that can accept transactions. - TxApp TxApp + blockchain Node // node is the local node that can accept transactions. + app App voting Validators db sql.DelayedReadTxMaker - pricer Pricer p2p P2P cfg *config.Config @@ -223,16 +221,15 @@ func (svc *Service) Handlers() map[jsonrpc.Method]rpcserver.MethodHandler { } // NewService constructs a new Service. -func NewService(db sql.DelayedReadTxMaker, blockchain BlockchainTransactor, txApp TxApp, - pricer Pricer, p2p P2P, signer auth.Signer, cfg *config.Config, +func NewService(db sql.DelayedReadTxMaker, blockchain Node, app App, + p2p P2P, txSigner auth.Signer, cfg *config.Config, chainID string, logger log.Logger) *Service { return &Service{ blockchain: blockchain, - TxApp: txApp, - signer: signer, - chainID: chainID, - pricer: pricer, p2p: p2p, + app: app, + signer: txSigner, + chainID: chainID, cfg: cfg, log: logger, db: db, @@ -289,7 +286,7 @@ func (svc *Service) sendTx(ctx context.Context, payload coretypes.Payload) (*use defer readTx.Rollback(ctx) // Get the latest nonce for the account, if it exists. - _, nonce, err := svc.TxApp.AccountInfo(ctx, readTx, svc.signer.Identity(), true) + _, nonce, err := svc.app.AccountInfo(ctx, readTx, svc.signer.Identity(), true) if err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorAccountInternal, "account info error", nil) } @@ -299,7 +296,7 @@ func (svc *Service) sendTx(ctx context.Context, payload coretypes.Payload) (*use return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "unable to create transaction", nil) } - fee, err := svc.pricer.Price(ctx, readTx, tx) + fee, err := svc.app.Price(ctx, readTx, tx) if err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorTxInternal, "unable to price transaction", nil) } @@ -311,13 +308,8 @@ func (svc *Service) sendTx(ctx context.Context, payload coretypes.Payload) (*use if err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "signing transaction failed", nil) } - encodedTx, err := tx.MarshalBinary() - if err != nil { - svc.log.Error("failed to serialize transaction data", "error", err) - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "failed to serialize transaction data", nil) - } - res, err := svc.blockchain.BroadcastTx(ctx, encodedTx, uint8(userjson.BroadcastSyncSync)) + res, err := svc.blockchain.BroadcastTx(ctx, tx, uint8(userjson.BroadcastSyncSync)) if err != nil { svc.log.Error("failed to broadcast tx", "error", err) return nil, jsonrpc.NewError(jsonrpc.ErrorTxInternal, "failed to broadcast transaction", nil) @@ -490,7 +482,7 @@ func (svc *Service) GetConfig(ctx context.Context, req *adminjson.GetConfigReque } func (svc *Service) AddPeer(ctx context.Context, req *adminjson.PeerRequest) (*adminjson.PeerResponse, *jsonrpc.Error) { - err := svc.p2p.AddAndPersistPeer(ctx, req.PeerID) + err := svc.p2p.AddPeer(ctx, req.PeerID) if err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to add a peer. Reason: "+err.Error(), nil) } @@ -498,8 +490,7 @@ func (svc *Service) AddPeer(ctx context.Context, req *adminjson.PeerRequest) (*a } func (svc *Service) RemovePeer(ctx context.Context, req *adminjson.PeerRequest) (*adminjson.PeerResponse, *jsonrpc.Error) { - fmt.Println("RemovePeer : ", req.PeerID) - err := svc.p2p.RemovePersistedPeer(ctx, req.PeerID) + err := svc.p2p.RemovePeer(ctx, req.PeerID) if err != nil { svc.log.Error("failed to remove peer", "error", err) return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to remove peer : "+err.Error(), nil) diff --git a/node/services/jsonrpc/server.go b/node/services/jsonrpc/server.go index f79aef82a..844a5ea92 100644 --- a/node/services/jsonrpc/server.go +++ b/node/services/jsonrpc/server.go @@ -152,7 +152,7 @@ func checkAddr(addr string) (string, bool, error) { if err != nil { if strings.Contains(err.Error(), "missing port in address") { host = addr - port = "8485" + port = "8484" } else if strings.Contains(err.Error(), "too many colons in address") { u, err := url.Parse(addr) if err != nil {