Skip to content

Commit

Permalink
reenable Engine and deploy/drop payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Dec 9, 2024
1 parent 79a23f4 commit ca861cf
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 93 deletions.
4 changes: 2 additions & 2 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {

// Node
node := buildNode(d, mp, bs, ce, ss, db)
appIface := &mysteryThing{txApp, ce}

// RPC Services
rpcSvcLogger := d.logger.New("USER")
jsonRPCTxSvc := usersvc.NewService(db, e, node, txApp, vs, rpcSvcLogger,
jsonRPCTxSvc := usersvc.NewService(db, e, node, appIface, vs, rpcSvcLogger,
usersvc.WithReadTxTimeout(time.Duration(d.cfg.DB.ReadTxTimeout)),
usersvc.WithPrivateMode(d.cfg.RPC.Private),
usersvc.WithChallengeExpiry(d.cfg.RPC.ChallengeExpiry),
Expand All @@ -112,7 +113,6 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
// The admin service uses a client-style signer rather than just a private
// key because it is used to sign transactions and provide an Identity for
// account information (nonce and balance).
appIface := &mysteryThing{txApp, ce}
txSigner := &auth.EthPersonalSigner{Key: *d.privKey.(*crypto.Secp256k1PrivateKey)}
jsonAdminSvc := adminsvc.NewService(db, node, appIface, nil, txSigner, d.cfg,
d.genesisCfg.ChainID, adminServerLogger)
Expand Down
4 changes: 2 additions & 2 deletions cmd/kwil-cli/cmds/database/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewCmdDatabase() *cobra.Command {

// writeCmds create a transactions, requiring a private key for signing/
writeCmds := []*cobra.Command{
// deployCmd(),
// dropCmd(),
deployCmd(),
dropCmd(),
executeCmd(),
batchCmd(),
}
Expand Down
File renamed without changes.
File renamed without changes.
15 changes: 7 additions & 8 deletions core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kwilteam/kwil-db/core/rpc/client/user"
userClient "github.com/kwilteam/kwil-db/core/rpc/client/user/jsonrpc"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/core/utils"
)

// Client is a client that interacts with a public Kwil provider.
Expand Down Expand Up @@ -231,13 +232,12 @@ func (c *Client) GetSchema(ctx context.Context, dbid string) (*types.Schema, err
return ds, nil
}

/*
// DeployDatabase deploys a database. TODO: remove
func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.Hash, error) {
txOpts := clientType.GetTxOpts(opts)
tx, err := c.newTx(ctx, schema, txOpts)
if err != nil {
return nil, err
return types.Hash{}, err
}

c.logger.Debug("deploying database",
Expand All @@ -249,21 +249,21 @@ func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts

// DropDatabase drops a database by name, using the configured signer to derive
// the DB ID. TODO: remove
func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.Hash, error) {
dbid := utils.GenerateDBID(name, c.Signer.Identity())
return c.DropDatabaseID(ctx, dbid, opts...)
}

// DropDatabaseID drops a database by ID. TODO: remove
func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.Hash, error) {
identifier := &types.DropSchema{
DBID: dbid,
}

txOpts := clientType.GetTxOpts(opts)
tx, err := c.newTx(ctx, identifier, txOpts)
if err != nil {
return nil, err
return types.Hash{}, err
}

c.logger.Debug("deploying database",
Expand All @@ -273,12 +273,11 @@ func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...client

res, err := c.txClient.Broadcast(ctx, tx, syncBcastFlag(txOpts.SyncBcast))
if err != nil {
return nil, err
return types.Hash{}, err
}

return res, nil
}
*/

// Execute executes a procedure or action.
// It returns the receipt, as well as outputs which is the decoded body of the receipt.
Expand Down
6 changes: 3 additions & 3 deletions core/client/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type Client interface {
Call(ctx context.Context, dbid string, procedure string, inputs []any) (*CallResult, error)
ChainID() string
ChainInfo(ctx context.Context) (*types.ChainInfo, error)
// DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error)
// DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error)
// DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error)
DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error)
DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error)
DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error)
// DEPRECATED: Use Execute instead.
// ExecuteAction(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error)
Execute(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error)
Expand Down
33 changes: 3 additions & 30 deletions core/types/payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Payload interface {
}

const (
PayloadTypeKV PayloadType = "kv"
// PayloadTypeDeploySchema PayloadType = "deploy_schema"
// PayloadTypeDropSchema PayloadType = "drop_schema"
PayloadTypeKV PayloadType = "kv"
PayloadTypeDeploySchema PayloadType = "deploy_schema"
PayloadTypeDropSchema PayloadType = "drop_schema"
PayloadTypeExecute PayloadType = "execute"
PayloadTypeTransfer PayloadType = "transfer"
PayloadTypeValidatorJoin PayloadType = "validator_join"
Expand Down Expand Up @@ -145,32 +145,6 @@ func RegisterPayload(pType PayloadType) {
payloadTypes[pType] = true
}

// KVPair payload for testing purposes
type KVPayload struct {
Key []byte
Value []byte
}

var _ Payload = &KVPayload{}

var _ encoding.BinaryMarshaler = (*KVPayload)(nil)
var _ encoding.BinaryMarshaler = KVPayload{}

func (p KVPayload) MarshalBinary() ([]byte, error) {
return serialize.Encode(p)
}

var _ encoding.BinaryUnmarshaler = (*KVPayload)(nil)

func (p *KVPayload) UnmarshalBinary(data []byte) error {
return serialize.Decode(data, p)
}

func (p *KVPayload) Type() PayloadType {
return PayloadTypeKV
}

/*
// DropSchema is the payload that is used to drop a schema
type DropSchema struct {
DBID string
Expand All @@ -189,7 +163,6 @@ func (s *DropSchema) UnmarshalBinary(b []byte) error {
func (s *DropSchema) Type() PayloadType {
return PayloadTypeDropSchema
}
*/

// ActionExecution is the payload that is used to execute an action
type ActionExecution struct {
Expand Down
2 changes: 1 addition & 1 deletion core/types/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Schema struct {
var _ Payload = (*Schema)(nil)

func (s *Schema) Type() PayloadType {
return "to remove!"
return PayloadTypeDeploySchema
}

func (s Schema) SerializeSize() int {
Expand Down
15 changes: 8 additions & 7 deletions node/consensus/block_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
return fmt.Errorf("begin outer tx failed: %w", err)
}

blockCtx := &common.BlockContext{
Height: ce.state.blkProp.height,
Timestamp: blkProp.blk.Header.Timestamp.Unix(),
ChainContext: ce.chainCtx,
Proposer: ce.leader.Bytes(),
}

// TODO: log tracker

txResults := make([]ktypes.TxResult, len(ce.state.blkProp.blk.Txns))
Expand All @@ -98,7 +105,7 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
Signer: decodedTx.Sender,
Authenticator: decodedTx.Signature.Type,
Caller: identifier,
// BlockContext: blkCtx,
BlockContext: blockCtx,
}

select {
Expand Down Expand Up @@ -129,12 +136,6 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {

// TODO: Notify the changesets to the migrator

blockCtx := &common.BlockContext{ // TODO: fill in the network params once we have them
Height: ce.state.blkProp.height,
Timestamp: blkProp.blk.Header.Timestamp.Unix(),
ChainContext: ce.chainCtx,
Proposer: ce.leader.Bytes(),
}
_, err = ce.txapp.Finalize(ctx, ce.state.consensusTx, blockCtx)
if err != nil {
ce.log.Error("Failed to finalize txapp", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {

if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) {
// This is not possible, PG mismatches with the Blockstore return error
return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %x", appHash, storeAppHash)
return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %v", appHash, storeAppHash)
}

if appHeight > 0 {
Expand Down
7 changes: 3 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ import (
)

const (
blockTxCount = 50 // for "mining"
dummyTxSize = 123_000 // for broadcast
dummyTxInterval = 1 * time.Second // broadcast freq
blockTxCount = 50 // for "mining"
txReAnnInterval = 30 * time.Second
)

type peerManager interface {
Expand Down Expand Up @@ -338,7 +337,7 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error {
// custom stream-based gossip uses txAnnStreamHandler and announceTx.
// This dummy method will make create+announce new pretend transactions.
// It also periodically rebroadcasts txns.
n.startTxAnns(ctx, dummyTxInterval, 30*time.Second, dummyTxSize) // nogossip.go
n.startTxAnns(ctx, txReAnnInterval)

// mine is our block anns goroutine, which must be only for leader
n.wg.Add(1)
Expand Down
26 changes: 12 additions & 14 deletions node/nogossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package node

import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"time"

"github.com/kwilteam/kwil-db/core/crypto"
"github.com/kwilteam/kwil-db/core/crypto/auth"
ktypes "github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/types"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -141,7 +139,7 @@ func (n *Node) advertiseTxToPeer(ctx context.Context, peerID peer.ID, txHash typ
return nil
}

func randomTx(size int, signer auth.Signer) ([]byte, error) {
/*func randomTx(size int, signer auth.Signer) ([]byte, error) {
payload := &ktypes.KVPayload{
Key: randBytes(32),
Value: randBytes(size),
Expand All @@ -159,15 +157,21 @@ func randomTx(size int, signer auth.Signer) ([]byte, error) {
return tx.MarshalBinary()
}
// startTxAnns creates pretend transactions, adds them to the tx index, and
// announces them to peers. This also does periodic reannouncement.
func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time.Duration, sz int) {
func randBytes(n int) []byte {
b := make([]byte, n)
rand.Read(b)
return b
}*/

// startTxAnns handles periodic reannouncement. It can also be modified to
// regularly create dummy transactions.
func (n *Node) startTxAnns(ctx context.Context, reannouncePeriod time.Duration) {
signer := secp256k1Signer()
if signer == nil {
panic("failed to create secp256k1 signer")
}

n.wg.Add(1)
/*n.wg.Add(1)
go func() {
defer n.wg.Done()
Expand All @@ -189,7 +193,7 @@ func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time
// n.log.Infof("announcing txid %v", txid)
n.announceTx(ctx, txHash, rawTx, n.host.ID())
}
}()
}()*/

n.wg.Add(1)
go func() {
Expand Down Expand Up @@ -222,12 +226,6 @@ func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time
}()
}

func randBytes(n int) []byte {
b := make([]byte, n)
rand.Read(b)
return b
}

func secp256k1Signer() *auth.EthPersonalSigner {
privKey, _, err := crypto.GenerateSecp256k1Key(nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions node/services/jsonrpc/usersvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type BlockchainTransactor interface {

type NodeApp interface {
AccountInfo(ctx context.Context, db sql.DB, identifier []byte, pending bool) (balance *big.Int, nonce int64, err error)
Price(ctx context.Context, dbTx sql.DB, tx *types.Transaction, chainContext *common.ChainContext) (*big.Int, error)
Price(ctx context.Context, dbTx sql.DB, tx *types.Transaction) (*big.Int, error)
// GetMigrationMetadata(ctx context.Context) (*types.MigrationMetadata, error)
}

Expand Down Expand Up @@ -497,7 +497,7 @@ func (svc *Service) EstimatePrice(ctx context.Context, req *userjson.EstimatePri
readTx := svc.db.BeginDelayedReadTx()
defer readTx.Rollback(ctx)

price, err := svc.nodeApp.Price(ctx, readTx, req.Tx, nil)
price, err := svc.nodeApp.Price(ctx, readTx, req.Tx)
if err != nil {
svc.log.Error("failed to estimate price", "error", err) // why not tell the client though?
return nil, jsonrpc.NewError(jsonrpc.ErrorTxInternal, "failed to estimate price", nil)
Expand Down
3 changes: 3 additions & 0 deletions node/txapp/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math/big"

"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/types/sql"
"github.com/kwilteam/kwil-db/node/voting"
Expand Down Expand Up @@ -40,6 +41,8 @@ type DB interface {
sql.SnapshotTxMaker
}

type Engine = common.Engine // ok? or can reduce this?

var (
// getEvents gets all events, even if they have been
// marked received
Expand Down
8 changes: 4 additions & 4 deletions node/txapp/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func (m *mempool) applyTransaction(ctx *common.TxContext, tx *types.Transaction,
return errors.New("validator vote ids are not allowed during migration")
case types.PayloadTypeValidatorVoteBodies:
return errors.New("validator vote bodies are not allowed during migration")
// case types.PayloadTypeDeploySchema:
// return errors.New("deploy schema transactions are not allowed during migration")
// case types.PayloadTypeDropSchema:
// return errors.New("drop schema transactions are not allowed during migration")
case types.PayloadTypeDeploySchema:
return errors.New("deploy schema transactions are not allowed during migration")
case types.PayloadTypeDropSchema:
return errors.New("drop schema transactions are not allowed during migration")
case types.PayloadTypeTransfer:
return errors.New("transfer transactions are not allowed during migration")
}
Expand Down
Loading

0 comments on commit ca861cf

Please sign in to comment.