Skip to content

Commit

Permalink
reenable Engine and deploy/drop payloads (#1133)
Browse files Browse the repository at this point in the history
* reenable Engine and deploy/drop payloads

* fix appHash mismatch on genesis restart

* log the action inside TestValidatorStateMachine test
  • Loading branch information
jchappelow authored Dec 9, 2024
1 parent 79a23f4 commit 8184eea
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 115 deletions.
4 changes: 2 additions & 2 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {

// Node
node := buildNode(d, mp, bs, ce, ss, db)
appIface := &mysteryThing{txApp, ce}

// RPC Services
rpcSvcLogger := d.logger.New("USER")
jsonRPCTxSvc := usersvc.NewService(db, e, node, txApp, vs, rpcSvcLogger,
jsonRPCTxSvc := usersvc.NewService(db, e, node, appIface, vs, rpcSvcLogger,
usersvc.WithReadTxTimeout(time.Duration(d.cfg.DB.ReadTxTimeout)),
usersvc.WithPrivateMode(d.cfg.RPC.Private),
usersvc.WithChallengeExpiry(d.cfg.RPC.ChallengeExpiry),
Expand All @@ -112,7 +113,6 @@ func buildServer(ctx context.Context, d *coreDependencies) *server {
// The admin service uses a client-style signer rather than just a private
// key because it is used to sign transactions and provide an Identity for
// account information (nonce and balance).
appIface := &mysteryThing{txApp, ce}
txSigner := &auth.EthPersonalSigner{Key: *d.privKey.(*crypto.Secp256k1PrivateKey)}
jsonAdminSvc := adminsvc.NewService(db, node, appIface, nil, txSigner, d.cfg,
d.genesisCfg.ChainID, adminServerLogger)
Expand Down
4 changes: 2 additions & 2 deletions cmd/kwil-cli/cmds/database/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func NewCmdDatabase() *cobra.Command {

// writeCmds create a transactions, requiring a private key for signing/
writeCmds := []*cobra.Command{
// deployCmd(),
// dropCmd(),
deployCmd(),
dropCmd(),
executeCmd(),
batchCmd(),
}
Expand Down
File renamed without changes.
File renamed without changes.
15 changes: 7 additions & 8 deletions core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kwilteam/kwil-db/core/rpc/client/user"
userClient "github.com/kwilteam/kwil-db/core/rpc/client/user/jsonrpc"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/core/utils"
)

// Client is a client that interacts with a public Kwil provider.
Expand Down Expand Up @@ -231,13 +232,12 @@ func (c *Client) GetSchema(ctx context.Context, dbid string) (*types.Schema, err
return ds, nil
}

/*
// DeployDatabase deploys a database. TODO: remove
func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts ...clientType.TxOpt) (types.Hash, error) {
txOpts := clientType.GetTxOpts(opts)
tx, err := c.newTx(ctx, schema, txOpts)
if err != nil {
return nil, err
return types.Hash{}, err
}

c.logger.Debug("deploying database",
Expand All @@ -249,21 +249,21 @@ func (c *Client) DeployDatabase(ctx context.Context, schema *types.Schema, opts

// DropDatabase drops a database by name, using the configured signer to derive
// the DB ID. TODO: remove
func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DropDatabase(ctx context.Context, name string, opts ...clientType.TxOpt) (types.Hash, error) {
dbid := utils.GenerateDBID(name, c.Signer.Identity())
return c.DropDatabaseID(ctx, dbid, opts...)
}

// DropDatabaseID drops a database by ID. TODO: remove
func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.TxHash, error) {
func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...clientType.TxOpt) (types.Hash, error) {
identifier := &types.DropSchema{
DBID: dbid,
}

txOpts := clientType.GetTxOpts(opts)
tx, err := c.newTx(ctx, identifier, txOpts)
if err != nil {
return nil, err
return types.Hash{}, err
}

c.logger.Debug("deploying database",
Expand All @@ -273,12 +273,11 @@ func (c *Client) DropDatabaseID(ctx context.Context, dbid string, opts ...client

res, err := c.txClient.Broadcast(ctx, tx, syncBcastFlag(txOpts.SyncBcast))
if err != nil {
return nil, err
return types.Hash{}, err
}

return res, nil
}
*/

// Execute executes a procedure or action.
// It returns the receipt, as well as outputs which is the decoded body of the receipt.
Expand Down
6 changes: 3 additions & 3 deletions core/client/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type Client interface {
Call(ctx context.Context, dbid string, procedure string, inputs []any) (*CallResult, error)
ChainID() string
ChainInfo(ctx context.Context) (*types.ChainInfo, error)
// DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error)
// DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error)
// DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error)
DeployDatabase(ctx context.Context, payload *types.Schema, opts ...TxOpt) (types.Hash, error)
DropDatabase(ctx context.Context, name string, opts ...TxOpt) (types.Hash, error)
DropDatabaseID(ctx context.Context, dbid string, opts ...TxOpt) (types.Hash, error)
// DEPRECATED: Use Execute instead.
// ExecuteAction(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error)
Execute(ctx context.Context, dbid string, action string, tuples [][]any, opts ...TxOpt) (types.Hash, error)
Expand Down
2 changes: 1 addition & 1 deletion core/types/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestValidPayload(t *testing.T) {
pt types.PayloadType
valid bool
}{
{"kv pair payload", types.PayloadTypeKV, true},
{"kv pair payload", types.PayloadTypeExecute, true},
{"registered payload", "testPayload", true},
{"invalid payload", types.PayloadType("unknown"), false},
}
Expand Down
48 changes: 9 additions & 39 deletions core/types/payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ type Payload interface {
}

const (
PayloadTypeKV PayloadType = "kv"
// PayloadTypeDeploySchema PayloadType = "deploy_schema"
// PayloadTypeDropSchema PayloadType = "drop_schema"
PayloadTypeDeploySchema PayloadType = "deploy_schema"
PayloadTypeDropSchema PayloadType = "drop_schema"
PayloadTypeExecute PayloadType = "execute"
PayloadTypeTransfer PayloadType = "transfer"
PayloadTypeValidatorJoin PayloadType = "validator_join"
Expand Down Expand Up @@ -90,10 +89,9 @@ func UnmarshalPayload(payloadType PayloadType, payload []byte) (Payload, error)

// payloadTypes includes native types and types registered from extensions.
var payloadTypes = map[PayloadType]bool{
PayloadTypeKV: true, // TODO: remove this later
// PayloadTypeDeploySchema: true,
// PayloadTypeDropSchema: true,
// PayloadTypeExecute: true,
PayloadTypeDeploySchema: true,
PayloadTypeDropSchema: true,
PayloadTypeExecute: true,
PayloadTypeTransfer: true,
PayloadTypeValidatorJoin: true,
PayloadTypeValidatorLeave: true,
Expand All @@ -119,14 +117,13 @@ func (p PayloadType) Valid() bool {
PayloadTypeCreateResolution,
PayloadTypeApproveResolution,
PayloadTypeDeleteResolution,
// PayloadTypeDeploySchema,
// PayloadTypeDropSchema,
// PayloadTypeExecute,
PayloadTypeDeploySchema,
PayloadTypeDropSchema,
PayloadTypeExecute,
// These should not come in user transactions, but they are not invalid
// payload types in general.
PayloadTypeValidatorVoteIDs,
PayloadTypeValidatorVoteBodies,
PayloadTypeKV: // TODO: remove this later
PayloadTypeValidatorVoteBodies:

return true
default: // check map that includes registered payloads from extensions
Expand All @@ -145,32 +142,6 @@ func RegisterPayload(pType PayloadType) {
payloadTypes[pType] = true
}

// KVPair payload for testing purposes
type KVPayload struct {
Key []byte
Value []byte
}

var _ Payload = &KVPayload{}

var _ encoding.BinaryMarshaler = (*KVPayload)(nil)
var _ encoding.BinaryMarshaler = KVPayload{}

func (p KVPayload) MarshalBinary() ([]byte, error) {
return serialize.Encode(p)
}

var _ encoding.BinaryUnmarshaler = (*KVPayload)(nil)

func (p *KVPayload) UnmarshalBinary(data []byte) error {
return serialize.Decode(data, p)
}

func (p *KVPayload) Type() PayloadType {
return PayloadTypeKV
}

/*
// DropSchema is the payload that is used to drop a schema
type DropSchema struct {
DBID string
Expand All @@ -189,7 +160,6 @@ func (s *DropSchema) UnmarshalBinary(b []byte) error {
func (s *DropSchema) Type() PayloadType {
return PayloadTypeDropSchema
}
*/

// ActionExecution is the payload that is used to execute an action
type ActionExecution struct {
Expand Down
2 changes: 1 addition & 1 deletion core/types/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Schema struct {
var _ Payload = (*Schema)(nil)

func (s *Schema) Type() PayloadType {
return "to remove!"
return PayloadTypeDeploySchema
}

func (s Schema) SerializeSize() int {
Expand Down
15 changes: 8 additions & 7 deletions node/consensus/block_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
return fmt.Errorf("begin outer tx failed: %w", err)
}

blockCtx := &common.BlockContext{
Height: ce.state.blkProp.height,
Timestamp: blkProp.blk.Header.Timestamp.Unix(),
ChainContext: ce.chainCtx,
Proposer: ce.leader.Bytes(),
}

// TODO: log tracker

txResults := make([]ktypes.TxResult, len(ce.state.blkProp.blk.Txns))
Expand All @@ -98,7 +105,7 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {
Signer: decodedTx.Sender,
Authenticator: decodedTx.Signature.Type,
Caller: identifier,
// BlockContext: blkCtx,
BlockContext: blockCtx,
}

select {
Expand Down Expand Up @@ -129,12 +136,6 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context) (err error) {

// TODO: Notify the changesets to the migrator

blockCtx := &common.BlockContext{ // TODO: fill in the network params once we have them
Height: ce.state.blkProp.height,
Timestamp: blkProp.blk.Header.Timestamp.Unix(),
ChainContext: ce.chainCtx,
Proposer: ce.leader.Bytes(),
}
_, err = ce.txapp.Finalize(ctx, ce.state.consensusTx, blockCtx)
if err != nil {
ce.log.Error("Failed to finalize txapp", "err", err)
Expand Down
24 changes: 11 additions & 13 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,7 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {

if appHeight > storeHeight {
// This is not possible, App can't be ahead of the store
return fmt.Errorf("app height %d is greater than the store height %d", appHeight, storeHeight)
}

if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) {
// This is not possible, PG mismatches with the Blockstore return error
return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %x", appHash, storeAppHash)
}

if appHeight > 0 {
ce.setLastCommitInfo(appHeight, blkHash, types.Hash(appHash))
return fmt.Errorf("app height %d is greater than the store height %d (did you forget to reset postgres?)", appHeight, storeHeight)
}

if appHeight == -1 {
Expand All @@ -601,6 +592,13 @@ func (ce *ConsensusEngine) catchup(ctx context.Context) error {
if err := ce.GenesisInit(ctx); err != nil {
return fmt.Errorf("error initializing the genesis state: %w", err)
}
} else if appHeight > 0 {
if appHeight == storeHeight && !bytes.Equal(appHash, storeAppHash[:]) {
// This is not possible, PG mismatches with the Blockstore return error
return fmt.Errorf("AppHash mismatch, appHash: %x, storeAppHash: %v", appHash, storeAppHash)
}

ce.setLastCommitInfo(appHeight, blkHash, types.Hash(appHash))
}

// Replay the blocks from the blockstore if the app hasn't played all the blocks yet.
Expand Down Expand Up @@ -724,16 +722,16 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.resetState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

blk, err := types.DecodeBlock(rawBlk)
if err != nil {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, appHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %x, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}
} else {
if appHash == ce.state.blockRes.appHash {
Expand Down
1 change: 1 addition & 0 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ func TestValidatorStateMachine(t *testing.T) {
})

for _, act := range tc.actions {
t.Log("action", act.name)
act.trigger(t, leader, val)
require.Eventually(t, func() bool {
err := act.verify(t, leader, val)
Expand Down
7 changes: 3 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ import (
)

const (
blockTxCount = 50 // for "mining"
dummyTxSize = 123_000 // for broadcast
dummyTxInterval = 1 * time.Second // broadcast freq
blockTxCount = 50 // for "mining"
txReAnnInterval = 30 * time.Second
)

type peerManager interface {
Expand Down Expand Up @@ -338,7 +337,7 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error {
// custom stream-based gossip uses txAnnStreamHandler and announceTx.
// This dummy method will make create+announce new pretend transactions.
// It also periodically rebroadcasts txns.
n.startTxAnns(ctx, dummyTxInterval, 30*time.Second, dummyTxSize) // nogossip.go
n.startTxAnns(ctx, txReAnnInterval)

// mine is our block anns goroutine, which must be only for leader
n.wg.Add(1)
Expand Down
Loading

0 comments on commit 8184eea

Please sign in to comment.