From 8184eea8c3661a746e3c2f673daf1d5ced32c004 Mon Sep 17 00:00:00 2001 From: jchappelow Date: Mon, 9 Dec 2024 12:47:06 -0600 Subject: [PATCH] reenable Engine and deploy/drop payloads (#1133) * reenable Engine and deploy/drop payloads * fix appHash mismatch on genesis restart * log the action inside TestValidatorStateMachine test --- app/node/build.go | 4 +- cmd/kwil-cli/cmds/database/cmd.go | 4 +- .../cmds/database/{_deploy.go => deploy.go} | 0 .../cmds/database/{_drop.go => drop.go} | 0 core/client/client.go | 15 +- core/client/types/client.go | 6 +- core/types/payload_test.go | 2 +- core/types/payloads.go | 48 +---- core/types/schema.go | 2 +- node/consensus/block_executor.go | 15 +- node/consensus/engine.go | 24 +-- node/consensus/engine_test.go | 1 + node/node.go | 7 +- node/nogossip.go | 26 ++- node/services/jsonrpc/usersvc/service.go | 4 +- node/txapp/interfaces.go | 3 + node/txapp/mempool.go | 8 +- node/txapp/routes.go | 188 ++++++++++++++++-- node/txapp/txapp.go | 4 +- 19 files changed, 246 insertions(+), 115 deletions(-) rename cmd/kwil-cli/cmds/database/{_deploy.go => deploy.go} (100%) rename cmd/kwil-cli/cmds/database/{_drop.go => drop.go} (100%) diff --git a/app/node/build.go b/app/node/build.go index 2860f7a53..781c79788 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -82,10 +82,11 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { // Node node := buildNode(d, mp, bs, ce, ss, db) + appIface := &mysteryThing{txApp, ce} // RPC Services rpcSvcLogger := d.logger.New("USER") - jsonRPCTxSvc := usersvc.NewService(db, e, node, txApp, vs, rpcSvcLogger, + jsonRPCTxSvc := usersvc.NewService(db, e, node, appIface, vs, rpcSvcLogger, usersvc.WithReadTxTimeout(time.Duration(d.cfg.DB.ReadTxTimeout)), usersvc.WithPrivateMode(d.cfg.RPC.Private), usersvc.WithChallengeExpiry(d.cfg.RPC.ChallengeExpiry), @@ -112,7 +113,6 @@ func buildServer(ctx context.Context, d *coreDependencies) *server { // The admin service uses a client-style signer rather than just a private // key because it is used to sign transactions and provide an Identity for // account information (nonce and balance). - appIface := &mysteryThing{txApp, ce} txSigner := &auth.EthPersonalSigner{Key: *d.privKey.(*crypto.Secp256k1PrivateKey)} jsonAdminSvc := adminsvc.NewService(db, node, appIface, nil, txSigner, d.cfg, d.genesisCfg.ChainID, adminServerLogger) diff --git a/cmd/kwil-cli/cmds/database/cmd.go b/cmd/kwil-cli/cmds/database/cmd.go index d76f171a1..54ba49193 100644 --- a/cmd/kwil-cli/cmds/database/cmd.go +++ b/cmd/kwil-cli/cmds/database/cmd.go @@ -28,8 +28,8 @@ func NewCmdDatabase() *cobra.Command { // writeCmds create a transactions, requiring a private key for signing/ writeCmds := []*cobra.Command{ - // deployCmd(), - // dropCmd(), + deployCmd(), + dropCmd(), executeCmd(), batchCmd(), } diff --git a/cmd/kwil-cli/cmds/database/_deploy.go b/cmd/kwil-cli/cmds/database/deploy.go similarity index 100% rename from cmd/kwil-cli/cmds/database/_deploy.go rename to cmd/kwil-cli/cmds/database/deploy.go diff --git a/cmd/kwil-cli/cmds/database/_drop.go b/cmd/kwil-cli/cmds/database/drop.go similarity index 100% rename from cmd/kwil-cli/cmds/database/_drop.go rename to cmd/kwil-cli/cmds/database/drop.go diff --git a/core/client/client.go b/core/client/client.go index 7327929ac..37ca6e8fe 100644 --- a/core/client/client.go +++ b/core/client/client.go @@ -19,6 +19,7 @@ import ( "github.com/kwilteam/kwil-db/core/rpc/client/user" userClient "github.com/kwilteam/kwil-db/core/rpc/client/user/jsonrpc" "github.com/kwilteam/kwil-db/core/types" + "github.com/kwilteam/kwil-db/core/utils" ) // Client is a client that interacts with a public Kwil provider. @@ -231,13 +232,12 @@ func (c *Client) GetSchema(ctx context.Context, dbid string) (*types.Schema, err return ds, nil } -/* // DeployDatabase deploys a database. TODO: remove -func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.TxHash, error) { +func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.Hash, error) { txOpts := clientType.GetTxOpts(opts) tx, err := c.newTx(ctx, schema, txOpts) if err != nil { - return nil, err + return types.Hash{}, err } c.logger.Debug("deploying database", @@ -249,13 +249,13 @@ func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts // DropDatabase drops a database by name, using the configured signer to derive // the DB ID. TODO: remove -func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.TxHash, error) { +func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.Hash, error) { dbid := utils.GenerateDBID(name, c.Signer.Identity()) return c.DropDatabaseID(ctx, dbid, opts...) } // DropDatabaseID drops a database by ID. TODO: remove -func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.TxHash, error) { +func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.Hash, error) { identifier := &types.DropSchema{ DBID: dbid, } @@ -263,7 +263,7 @@ func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...client txOpts := clientType.GetTxOpts(opts) tx, err := c.newTx(ctx, identifier, txOpts) if err != nil { - return nil, err + return types.Hash{}, err } c.logger.Debug("deploying database", @@ -273,12 +273,11 @@ func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...client res, err := c.txClient.Broadcast(ctx, tx, syncBcastFlag(txOpts.SyncBcast)) if err != nil { - return nil, err + return types.Hash{}, err } return res, nil } -*/ // Execute executes a procedure or action. // It returns the receipt, as well as outputs which is the decoded body of the receipt. diff --git a/core/client/types/client.go b/core/client/types/client.go index 2ef73d780..43b8d9546 100644 --- a/core/client/types/client.go +++ b/core/client/types/client.go @@ -20,9 +20,9 @@ type Client interface { Call(ctx context.Context, dbid string, procedure string, inputs []any) (*CallResult, error) ChainID() string ChainInfo(ctx context.Context) (*types.ChainInfo, error) - // DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error) - // DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error) - // DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error) + DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error) + DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error) + DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error) // DEPRECATED: Use Execute instead. // ExecuteAction(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error) Execute(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error) diff --git a/core/types/payload_test.go b/core/types/payload_test.go index 1ef754419..91cabf5af 100644 --- a/core/types/payload_test.go +++ b/core/types/payload_test.go @@ -36,7 +36,7 @@ func TestValidPayload(t *testing.T) { pt types.PayloadType valid bool }{ - {"kv pair payload", types.PayloadTypeKV, true}, + {"kv pair payload", types.PayloadTypeExecute, true}, {"registered payload", "testPayload", true}, {"invalid payload", types.PayloadType("unknown"), false}, } diff --git a/core/types/payloads.go b/core/types/payloads.go index af82a26c7..7ee15101f 100644 --- a/core/types/payloads.go +++ b/core/types/payloads.go @@ -29,9 +29,8 @@ type Payload interface { } const ( - PayloadTypeKV PayloadType = "kv" - // PayloadTypeDeploySchema PayloadType = "deploy_schema" - // PayloadTypeDropSchema PayloadType = "drop_schema" + PayloadTypeDeploySchema PayloadType = "deploy_schema" + PayloadTypeDropSchema PayloadType = "drop_schema" PayloadTypeExecute PayloadType = "execute" PayloadTypeTransfer PayloadType = "transfer" PayloadTypeValidatorJoin PayloadType = "validator_join" @@ -90,10 +89,9 @@ func UnmarshalPayload(payloadType PayloadType, payload []byte) (Payload, error) // payloadTypes includes native types and types registered from extensions. var payloadTypes = map[PayloadType]bool{ - PayloadTypeKV: true, // TODO: remove this later - // PayloadTypeDeploySchema: true, - // PayloadTypeDropSchema: true, - // PayloadTypeExecute: true, + PayloadTypeDeploySchema: true, + PayloadTypeDropSchema: true, + PayloadTypeExecute: true, PayloadTypeTransfer: true, PayloadTypeValidatorJoin: true, PayloadTypeValidatorLeave: true, @@ -119,14 +117,13 @@ func (p PayloadType) Valid() bool { PayloadTypeCreateResolution, PayloadTypeApproveResolution, PayloadTypeDeleteResolution, - // PayloadTypeDeploySchema, - // PayloadTypeDropSchema, - // PayloadTypeExecute, + PayloadTypeDeploySchema, + PayloadTypeDropSchema, + PayloadTypeExecute, // These should not come in user transactions, but they are not invalid // payload types in general. PayloadTypeValidatorVoteIDs, - PayloadTypeValidatorVoteBodies, - PayloadTypeKV: // TODO: remove this later + PayloadTypeValidatorVoteBodies: return true default: // check map that includes registered payloads from extensions @@ -145,32 +142,6 @@ func RegisterPayload(pType PayloadType) { payloadTypes[pType] = true } -// KVPair payload for testing purposes -type KVPayload struct { - Key []byte - Value []byte -} - -var _ Payload = &KVPayload{} - -var _ encoding.BinaryMarshaler = (*KVPayload)(nil) -var _ encoding.BinaryMarshaler = KVPayload{} - -func (p KVPayload) MarshalBinary() ([]byte, error) { - return serialize.Encode(p) -} - -var _ encoding.BinaryUnmarshaler = (*KVPayload)(nil) - -func (p *KVPayload) UnmarshalBinary(data []byte) error { - return serialize.Decode(data, p) -} - -func (p *KVPayload) Type() PayloadType { - return PayloadTypeKV -} - -/* // DropSchema is the payload that is used to drop a schema type DropSchema struct { DBID string @@ -189,7 +160,6 @@ func (s *DropSchema) UnmarshalBinary(b []byte) error { func (s *DropSchema) Type() PayloadType { return PayloadTypeDropSchema } -*/ // ActionExecution is the payload that is used to execute an action type ActionExecution struct { diff --git a/core/types/schema.go b/core/types/schema.go index e377fbcf3..30e09f126 100644 --- a/core/types/schema.go +++ b/core/types/schema.go @@ -30,7 +30,7 @@ type Schema struct { var _ Payload = (*Schema)(nil) func (s *Schema) Type() PayloadType { - return "to remove!" + return PayloadTypeDeploySchema } func (s Schema) SerializeSize() int { diff --git a/node/consensus/block_executor.go b/node/consensus/block_executor.go index 6f8264de2..b916536f9 100644 --- a/node/consensus/block_executor.go +++ b/node/consensus/block_executor.go @@ -73,6 +73,13 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) { return fmt.Errorf("begin outer tx failed: %w", err) } + blockCtx := &common.BlockContext{ + Height: ce.state.blkProp.height, + Timestamp: blkProp.blk.Header.Timestamp.Unix(), + ChainContext: ce.chainCtx, + Proposer: ce.leader.Bytes(), + } + // TODO: log tracker txResults := make([]ktypes.TxResult, len(ce.state.blkProp.blk.Txns)) @@ -98,7 +105,7 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) { Signer: decodedTx.Sender, Authenticator: decodedTx.Signature.Type, Caller: identifier, - // BlockContext: blkCtx, + BlockContext: blockCtx, } select { @@ -129,12 +136,6 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) { // TODO: Notify the changesets to the migrator - blockCtx := &common.BlockContext{ // TODO: fill in the network params once we have them - Height: ce.state.blkProp.height, - Timestamp: blkProp.blk.Header.Timestamp.Unix(), - ChainContext: ce.chainCtx, - Proposer: ce.leader.Bytes(), - } _, err = ce.txapp.Finalize(ctx, ce.state.consensusTx, blockCtx) if err != nil { ce.log.Error("Failed to finalize txapp", "err", err) diff --git a/node/consensus/engine.go b/node/consensus/engine.go index 8fbe4266c..2c97df251 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -583,16 +583,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { if appHeight > storeHeight { // This is not possible, App can't be ahead of the store - return fmt.Errorf("app height %d is greater than the store height %d", appHeight, storeHeight) - } - - if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) { - // This is not possible, PG mismatches with the Blockstore return error - return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %x", appHash, storeAppHash) - } - - if appHeight > 0 { - ce.setLastCommitInfo(appHeight, blkHash, types.Hash(appHash)) + return fmt.Errorf("app height %d is greater than the store height %d (did you forget to reset postgres?)", appHeight, storeHeight) } if appHeight == -1 { @@ -601,6 +592,13 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error { if err := ce.GenesisInit(ctx); err != nil { return fmt.Errorf("error initializing the genesis state: %w", err) } + } else if appHeight > 0 { + if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) { + // This is not possible, PG mismatches with the Blockstore return error + return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %v", appHash, storeAppHash) + } + + ce.setLastCommitInfo(appHeight, blkHash, types.Hash(appHash)) } // Replay the blocks from the blockstore if the app hasn't played all the blocks yet. @@ -724,16 +722,16 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error { if blkHash != ce.state.blkProp.blkHash { // processed incorrect block if err := ce.resetState(ctx); err != nil { - return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err) + return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } blk, err := types.DecodeBlock(rawBlk) if err != nil { - return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err) + return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } if err := ce.processAndCommit(ctx, blk, appHash); err != nil { - return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err) + return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err) } } else { if appHash == ce.state.blockRes.appHash { diff --git a/node/consensus/engine_test.go b/node/consensus/engine_test.go index 37a8fcb82..54af9e36c 100644 --- a/node/consensus/engine_test.go +++ b/node/consensus/engine_test.go @@ -571,6 +571,7 @@ func TestValidatorStateMachine(t *testing.T) { }) for _, act := range tc.actions { + t.Log("action", act.name) act.trigger(t, leader, val) require.Eventually(t, func() bool { err := act.verify(t, leader, val) diff --git a/node/node.go b/node/node.go index 032d5eaf3..8f2292728 100644 --- a/node/node.go +++ b/node/node.go @@ -40,9 +40,8 @@ import ( ) const ( - blockTxCount = 50 // for "mining" - dummyTxSize = 123_000 // for broadcast - dummyTxInterval = 1 * time.Second // broadcast freq + blockTxCount = 50 // for "mining" + txReAnnInterval = 30 * time.Second ) type peerManager interface { @@ -338,7 +337,7 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error { // custom stream-based gossip uses txAnnStreamHandler and announceTx. // This dummy method will make create+announce new pretend transactions. // It also periodically rebroadcasts txns. - n.startTxAnns(ctx, dummyTxInterval, 30*time.Second, dummyTxSize) // nogossip.go + n.startTxAnns(ctx, txReAnnInterval) // mine is our block anns goroutine, which must be only for leader n.wg.Add(1) diff --git a/node/nogossip.go b/node/nogossip.go index b6b1349bb..4e8c97a35 100644 --- a/node/nogossip.go +++ b/node/nogossip.go @@ -2,7 +2,6 @@ package node import ( "context" - "crypto/rand" "errors" "fmt" "io" @@ -10,7 +9,6 @@ import ( "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" - ktypes "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/node/types" "github.com/libp2p/go-libp2p/core/network" @@ -141,7 +139,7 @@ func (n *Node) advertiseTxToPeer(ctx context.Context, peerID peer.ID, txHash typ return nil } -func randomTx(size int, signer auth.Signer) ([]byte, error) { +/*func randomTx(size int, signer auth.Signer) ([]byte, error) { payload := &ktypes.KVPayload{ Key: randBytes(32), Value: randBytes(size), @@ -159,15 +157,21 @@ func randomTx(size int, signer auth.Signer) ([]byte, error) { return tx.MarshalBinary() } -// startTxAnns creates pretend transactions, adds them to the tx index, and -// announces them to peers. This also does periodic reannouncement. -func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time.Duration, sz int) { +func randBytes(n int) []byte { + b := make([]byte, n) + rand.Read(b) + return b +}*/ + +// startTxAnns handles periodic reannouncement. It can also be modified to +// regularly create dummy transactions. +func (n *Node) startTxAnns(ctx context.Context, reannouncePeriod time.Duration) { signer := secp256k1Signer() if signer == nil { panic("failed to create secp256k1 signer") } - n.wg.Add(1) + /*n.wg.Add(1) go func() { defer n.wg.Done() @@ -189,7 +193,7 @@ func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time // n.log.Infof("announcing txid %v", txid) n.announceTx(ctx, txHash, rawTx, n.host.ID()) } - }() + }()*/ n.wg.Add(1) go func() { @@ -222,12 +226,6 @@ func (n *Node) startTxAnns(ctx context.Context, newPeriod, reannouncePeriod time }() } -func randBytes(n int) []byte { - b := make([]byte, n) - rand.Read(b) - return b -} - func secp256k1Signer() *auth.EthPersonalSigner { privKey, _, err := crypto.GenerateSecp256k1Key(nil) if err != nil { diff --git a/node/services/jsonrpc/usersvc/service.go b/node/services/jsonrpc/usersvc/service.go index c5327c6d6..8113c9cab 100644 --- a/node/services/jsonrpc/usersvc/service.go +++ b/node/services/jsonrpc/usersvc/service.go @@ -42,7 +42,7 @@ type BlockchainTransactor interface { type NodeApp interface { AccountInfo(ctx context.Context, db sql.DB, identifier []byte, pending bool) (balance *big.Int, nonce int64, err error) - Price(ctx context.Context, dbTx sql.DB, tx *types.Transaction, chainContext *common.ChainContext) (*big.Int, error) + Price(ctx context.Context, dbTx sql.DB, tx *types.Transaction) (*big.Int, error) // GetMigrationMetadata(ctx context.Context) (*types.MigrationMetadata, error) } @@ -497,7 +497,7 @@ func (svc *Service) EstimatePrice(ctx context.Context, req *userjson.EstimatePri readTx := svc.db.BeginDelayedReadTx() defer readTx.Rollback(ctx) - price, err := svc.nodeApp.Price(ctx, readTx, req.Tx, nil) + price, err := svc.nodeApp.Price(ctx, readTx, req.Tx) if err != nil { svc.log.Error("failed to estimate price", "error", err) // why not tell the client though? return nil, jsonrpc.NewError(jsonrpc.ErrorTxInternal, "failed to estimate price", nil) diff --git a/node/txapp/interfaces.go b/node/txapp/interfaces.go index 755dedc5e..9c2ce8338 100644 --- a/node/txapp/interfaces.go +++ b/node/txapp/interfaces.go @@ -4,6 +4,7 @@ import ( "context" "math/big" + "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/node/types/sql" "github.com/kwilteam/kwil-db/node/voting" @@ -40,6 +41,8 @@ type DB interface { sql.SnapshotTxMaker } +type Engine = common.Engine // ok? or can reduce this? + var ( // getEvents gets all events, even if they have been // marked received diff --git a/node/txapp/mempool.go b/node/txapp/mempool.go index 47ff67dfa..83016f0ea 100644 --- a/node/txapp/mempool.go +++ b/node/txapp/mempool.go @@ -77,10 +77,10 @@ func (m *mempool) applyTransaction(ctx *common.TxContext, tx *types.Transaction, return errors.New("validator vote ids are not allowed during migration") case types.PayloadTypeValidatorVoteBodies: return errors.New("validator vote bodies are not allowed during migration") - // case types.PayloadTypeDeploySchema: - // return errors.New("deploy schema transactions are not allowed during migration") - // case types.PayloadTypeDropSchema: - // return errors.New("drop schema transactions are not allowed during migration") + case types.PayloadTypeDeploySchema: + return errors.New("deploy schema transactions are not allowed during migration") + case types.PayloadTypeDropSchema: + return errors.New("drop schema transactions are not allowed during migration") case types.PayloadTypeTransfer: return errors.New("transfer transactions are not allowed during migration") } diff --git a/node/txapp/routes.go b/node/txapp/routes.go index b3420bf28..0b428effb 100644 --- a/node/txapp/routes.go +++ b/node/txapp/routes.go @@ -13,15 +13,17 @@ import ( "github.com/kwilteam/kwil-db/extensions/consensus" "github.com/kwilteam/kwil-db/extensions/resolutions" "github.com/kwilteam/kwil-db/node/accounts" + "github.com/kwilteam/kwil-db/node/engine/execution" + "github.com/kwilteam/kwil-db/node/ident" "github.com/kwilteam/kwil-db/node/types/sql" "github.com/kwilteam/kwil-db/node/voting" ) func init() { err := errors.Join( - // RegisterRoute(types.PayloadTypeDeploySchema, NewRoute(&deployDatasetRoute{})), - // RegisterRoute(types.PayloadTypeDropSchema, NewRoute(&dropDatasetRoute{})), - // RegisterRoute(types.PayloadTypeExecute, NewRoute(&executeActionRoute{})), + RegisterRoute(types.PayloadTypeDeploySchema, NewRoute(&deployDatasetRoute{})), + RegisterRoute(types.PayloadTypeDropSchema, NewRoute(&dropDatasetRoute{})), + RegisterRoute(types.PayloadTypeExecute, NewRoute(&executeActionRoute{})), RegisterRoute(types.PayloadTypeTransfer, NewRoute(&transferRoute{})), RegisterRoute(types.PayloadTypeValidatorJoin, NewRoute(&validatorJoinRoute{})), RegisterRoute(types.PayloadTypeValidatorApprove, NewRoute(&validatorApproveRoute{})), @@ -76,13 +78,6 @@ type Pricer interface { Price(ctx context.Context, router *TxApp, db sql.DB, tx *types.Transaction) (*big.Int, error) } -// func codeForEngineError(err error) types.TxCode { -// if err == nil { -// return types.CodeOk -// } -// return types.CodeUnknownError -// } - // routes is a map of transaction payload types to their respective routes. This // should be updated if a coordinated height-based update introduces new routes // (or removes existing routes). @@ -177,9 +172,9 @@ func (d *baseRoute) Execute(ctx *common.TxContext, router *TxApp, db sql.DB, tx defer tx2.Rollback(ctx.Ctx) // no-op if Commit succeeded app := &common.App{ - Service: svc, - DB: tx2, - // Engine: router.Engine, + Service: svc, + DB: tx2, + Engine: router.Engine, Accounts: router.Accounts, Validators: router.Validators, } @@ -205,6 +200,173 @@ func (d *baseRoute) Execute(ctx *common.TxContext, router *TxApp, db sql.DB, tx // of the route, which is modified by the app. Alternatively, create a new // route or replace the route entirely (same payload type, new impl). +func codeForEngineError(err error) types.TxCode { + if err == nil { + return types.CodeOk + } + if errors.Is(err, execution.ErrDatasetExists) { + return types.CodeDatasetExists + } + if errors.Is(err, execution.ErrDatasetNotFound) { + return types.CodeDatasetMissing + } + if errors.Is(err, execution.ErrInvalidSchema) { + return types.CodeInvalidSchema + } + + return types.CodeUnknownError +} + +type deployDatasetRoute struct { + schema *types.Schema // set by PreTx + identifier string + authType string +} + +var _ consensus.Route = (*deployDatasetRoute)(nil) + +func (d *deployDatasetRoute) Name() string { + return types.PayloadTypeDeploySchema.String() +} + +func (d *deployDatasetRoute) Price(ctx context.Context, app *common.App, tx *types.Transaction) (*big.Int, error) { + return big.NewInt(1000000000000000000), nil +} + +func (d *deployDatasetRoute) PreTx(ctx *common.TxContext, svc *common.Service, tx *types.Transaction) (types.TxCode, error) { + if ctx.BlockContext.ChainContext.NetworkParameters.MigrationStatus == types.MigrationInProgress || + ctx.BlockContext.ChainContext.NetworkParameters.MigrationStatus == types.MigrationCompleted { + return types.CodeNetworkInMigration, errors.New("cannot deploy dataset during migration") + } + + schemaPayload := &types.Schema{} + err := schemaPayload.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return types.CodeEncodingError, err + } + + d.schema = schemaPayload + + d.identifier, err = ident.Identifier(tx.Signature.Type, tx.Sender) + if err != nil { + return types.CodeUnknownError, err + } + + d.authType = tx.Signature.Type + + return 0, nil +} + +func (d *deployDatasetRoute) InTx(ctx *common.TxContext, app *common.App, tx *types.Transaction) (types.TxCode, error) { + err := app.Engine.CreateDataset(ctx, app.DB, d.schema) + if err != nil { + return codeForEngineError(err), err + } + return 0, nil +} + +type dropDatasetRoute struct { + dbid string +} + +var _ consensus.Route = (*dropDatasetRoute)(nil) + +func (d *dropDatasetRoute) Name() string { + return types.PayloadTypeDropSchema.String() +} + +func (d *dropDatasetRoute) Price(ctx context.Context, app *common.App, tx *types.Transaction) (*big.Int, error) { + return big.NewInt(10000000000000), nil +} + +func (d *dropDatasetRoute) PreTx(ctx *common.TxContext, svc *common.Service, tx *types.Transaction) (types.TxCode, error) { + if ctx.BlockContext.ChainContext.NetworkParameters.MigrationStatus == types.MigrationInProgress || + ctx.BlockContext.ChainContext.NetworkParameters.MigrationStatus == types.MigrationCompleted { + return types.CodeNetworkInMigration, errors.New("cannot drop dataset during migration") + } + + drop := &types.DropSchema{} + err := drop.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return types.CodeEncodingError, err + } + + d.dbid = drop.DBID + return 0, nil +} + +func (d *dropDatasetRoute) InTx(ctx *common.TxContext, app *common.App, tx *types.Transaction) (types.TxCode, error) { + err := app.Engine.DeleteDataset(ctx, app.DB, d.dbid) + if err != nil { + return codeForEngineError(err), err + } + return 0, nil +} + +type executeActionRoute struct { + dbid string + action string + args [][]any +} + +var _ consensus.Route = (*executeActionRoute)(nil) + +func (d *executeActionRoute) Name() string { + return types.PayloadTypeExecute.String() +} + +func (d *executeActionRoute) Price(ctx context.Context, app *common.App, tx *types.Transaction) (*big.Int, error) { + return big.NewInt(2000000000000000), nil +} + +func (d *executeActionRoute) PreTx(ctx *common.TxContext, svc *common.Service, tx *types.Transaction) (types.TxCode, error) { + action := &types.ActionExecution{} + err := action.UnmarshalBinary(tx.Body.Payload) + if err != nil { + return types.CodeEncodingError, err + } + + d.action = action.Action + d.dbid = action.DBID + + // here, we decode the [][]types.EncodedTypes into [][]any + args := make([][]any, len(action.Arguments)) + for i, arg := range action.Arguments { + args[i] = make([]any, len(arg)) + for j, val := range arg { + decoded, err := val.Decode() + if err != nil { + return types.CodeEncodingError, err + } + args[i][j] = decoded + } + } + + // we want to execute the tx for as many arg arrays exist + // if there are no arg arrays, we want to execute it once + if len(args) == 0 { + args = make([][]any, 1) + } + + d.args = args + + return 0, nil +} + +func (d *executeActionRoute) InTx(ctx *common.TxContext, app *common.App, tx *types.Transaction) (types.TxCode, error) { + for i := range d.args { + _, err := app.Engine.Procedure(ctx, app.DB, &common.ExecutionData{ + Dataset: d.dbid, + Procedure: d.action, + Args: d.args[i], + }) + if err != nil { + return codeForEngineError(err), err + } + } + return 0, nil +} + type transferRoute struct { to []byte amt *big.Int diff --git a/node/txapp/txapp.go b/node/txapp/txapp.go index b8079a3fe..5126bf82b 100644 --- a/node/txapp/txapp.go +++ b/node/txapp/txapp.go @@ -26,7 +26,7 @@ import ( // maintaining a mempool for uncommitted accounts, pricing transactions, // managing atomicity of the database, and managing the validator set. type TxApp struct { - // Engine types.Engine // tracks deployed schemas + Engine Engine // tracks deployed schemas Accounts Accounts // tracks account balances and nonces Validators Validators // tracks validator power @@ -57,7 +57,7 @@ func NewTxApp(ctx context.Context, db sql.Executor, engine common.Engine, signer slices.Sort(resTypes) t := &TxApp{ - // Engine: engine, + Engine: engine, Accounts: accounts, Validators: validators,