Skip to content

Commit

Permalink
resetMsgs should carry txIDs to purge from the mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Dec 18, 2024
1 parent 04cd13e commit 498eab5
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 44 deletions.
5 changes: 3 additions & 2 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,10 @@ func (n *Node) startDiscoveryResponseGossip(ctx context.Context, ps *pubsub.PubS
return nil
}

func (n *Node) sendReset(height int64) error {
func (n *Node) sendReset(height int64, txIDs []ktypes.Hash) error {
n.resetMsg <- types.ConsensusReset{
ToHeight: height,
TxIDs: txIDs,
}
return nil
}
Expand Down Expand Up @@ -586,7 +587,7 @@ func (n *Node) startConsensusResetGossip(ctx context.Context, ps *pubsub.PubSub)
fromPeerID, resetMsg.ReceivedFrom, resetMsg.Message.Data)

// source of the reset message should be the leader
n.ce.NotifyResetState(reset.ToHeight)
n.ce.NotifyResetState(reset.ToHeight, reset.TxIDs)
}
}()

Expand Down
10 changes: 5 additions & 5 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type ConsensusEngine struct {
newRound chan struct{}
msgChan chan consensusMessage
haltChan chan string // can take a msg or reason for halting the network
resetChan chan int64 // to reset the state of the consensus engine
resetChan chan *resetMsg // to reset the state of the consensus engine
bestHeightCh chan *discoveryMsg // to sync the leader with the network

// interfaces
Expand Down Expand Up @@ -133,7 +133,7 @@ type AckBroadcaster func(ack bool, height int64, blkID types.Hash, appHash *type
// BlkRequester requests the block from the network based on the height
type BlkRequester func(ctx context.Context, height int64) (types.Hash, types.Hash, []byte, error)

type ResetStateBroadcaster func(height int64) error
type ResetStateBroadcaster func(height int64, txIDs []ktypes.Hash) error

type DiscoveryReqBroadcaster func()

Expand Down Expand Up @@ -242,7 +242,7 @@ func New(cfg *Config) *ConsensusEngine {
genesisHeight: cfg.GenesisHeight,
msgChan: make(chan consensusMessage, 1), // buffer size??
haltChan: make(chan string, 1),
resetChan: make(chan int64, 1),
resetChan: make(chan *resetMsg, 1),
bestHeightCh: make(chan *discoveryMsg, 1),
newRound: make(chan struct{}, 1),
// interfaces
Expand Down Expand Up @@ -384,8 +384,8 @@ func (ce *ConsensusEngine) resetEventLoop(ctx context.Context) error {
case <-ctx.Done():
ce.log.Info("Shutting down the reset event loop")
return nil
case height := <-ce.resetChan:
ce.resetBlockProp(ctx, height, nil)
case msg := <-ce.resetChan:
ce.resetBlockProp(ctx, msg.height, msg.txIDs)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(0)
val.sendResetMsg(&resetMsg{height: 0})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 0, zeroHash)
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(0)
val.sendResetMsg(&resetMsg{height: 0})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -490,7 +490,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(0)
val.sendResetMsg(&resetMsg{height: 0})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 0, zeroHash)
Expand All @@ -499,7 +499,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(0)
val.sendResetMsg(&resetMsg{height: 0})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 0, zeroHash)
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(1)
val.sendResetMsg(&resetMsg{height: 1})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -545,7 +545,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "reset",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.sendResetMsg(2)
val.sendResetMsg(&resetMsg{height: 2})
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -765,7 +765,7 @@ func mockVoteBroadcaster(ack bool, height int64, blkID types.Hash, appHash *type
func mockBlkAnnouncer(_ context.Context, blk *ktypes.Block, appHash types.Hash) {
}

func mockResetStateBroadcaster(_ int64) error {
func mockResetStateBroadcaster(_ int64, _ []ktypes.Hash) error {
return nil
}

Expand Down
16 changes: 7 additions & 9 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
// If we receive a new proposal for the same height, abort the execution of the current proposal and
// start processing the new proposal.
func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types.Hash, leaderSig []byte, timestamp int64) bool {
ce.log.Info("ENTER ACCEPT PROPOSAL", "height", height, "blkID", blkID, "prevBlockID", prevBlockID)
if ce.role.Load() != types.RoleValidator { // TODO: Should sentry nodes download the proposal and forward it to the validators?
if ce.role.Load() != types.RoleValidator {
return false
}
ce.updateNetworkHeight(height - 1)
Expand All @@ -28,7 +27,7 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
ce.log.Info("Accept proposal?", "height", height, "blkID", blkID, "prevHash", prevBlockID)

if height != ce.stateInfo.height+1 {
ce.log.Info("Block proposal is not for the next height", "blkPropHeight", height, "expected", ce.stateInfo.height+1)
ce.log.Debug("Block proposal is not for the next height", "blkPropHeight", height, "expected", ce.stateInfo.height+1)
return false
}

Expand All @@ -40,24 +39,23 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
}

if !valid {
ce.log.Info("Invalid leader signature, ignoring the block proposal msg: ", "height", height)
ce.log.Debug("Invalid leader signature, ignoring the block proposal msg: ", "height", height)
return false
}

// Check if the validator is busy processing a block.
if ce.stateInfo.status != Committed {
// check if we are processing a different block, if yes then reset the state.
if ce.stateInfo.blkProp.blkHash != blkID && ce.stateInfo.blkProp.blk.Header.Timestamp.UnixMilli() < timestamp {
ce.log.Info("Conflicting block proposals, abort block execution and requesting the latest block: ", "height", height)
ce.log.Debug("Conflicting block proposals, abort block execution and requesting the latest block: ", "height", height)
// go ce.sendResetMsg(ce.stateInfo.height)
return true
}
ce.log.Info("Already processing the block proposal", "height", height, "blkID", blkID)
ce.log.Debug("Already processing the block proposal", "height", height, "blkID", blkID)
return false
}

// not processing any block, accept the proposal
ce.log.Info("Accepting block proposal: EXIT", "height", height, "blkID", blkID)
return true
}

Expand Down Expand Up @@ -116,8 +114,8 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, appHash
// We are currently processing a block proposal, ensure that it's the correct block proposal.
if ce.stateInfo.blkProp.blkHash != blkID {
// Rollback and reprocess the block sent as part of the commit message.
ce.log.Info("Processing incorrect block, notify consensus engine to abort: ", "height", height)
go ce.sendResetMsg(ce.stateInfo.height)
ce.log.Debug("Processing incorrect block, notify consensus engine to abort: ", "height", height)
go ce.sendResetMsg(&resetMsg{height: height})
return true // fetch the correct block
}

Expand Down
7 changes: 3 additions & 4 deletions node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ func (ce *ConsensusEngine) startNewRound(ctx context.Context) error {
ce.log.Errorf("Error executing the block: %v", err)
if execCtx.Err() != nil && errors.Is(err, context.Canceled) {
ce.log.Warn("Block execution cancelled by the leader", "height", blkProp.height, "hash", blkProp.blkHash)

// trigger a reset state message
go ce.rstStateBroadcaster(ce.state.lc.height)

ce.cancelFnMtx.Lock()
// trigger a reset state message to the network
go ce.rstStateBroadcaster(ce.state.lc.height, ce.longRunningTxs)

// Remove the long running transactions from the mempool
ce.log.Info("Removing long running transactions from the mempool as per leader's request", "txIDs", ce.longRunningTxs)
for _, txID := range ce.longRunningTxs {
Expand Down
16 changes: 12 additions & 4 deletions node/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (bam *blockAnnounce) String() string {
// due to amnesia after leader restart.
// 3. Nodes receive a blockAnn message from the leader for a different blk
// than the one the node is currently processing or waiting on.
func (ce *ConsensusEngine) sendResetMsg(height int64) {
ce.resetChan <- height
func (ce *ConsensusEngine) sendResetMsg(msg *resetMsg) {
ce.resetChan <- msg
}

// NotifyBlockProposal is used by the p2p stream handler to notify the consensus engine of a block proposal.
Expand Down Expand Up @@ -154,14 +154,22 @@ func (ce *ConsensusEngine) NotifyACK(validatorPK []byte, ack types.AckRes) {
})
}

type resetMsg struct {
height int64
txIDs []types.Hash
}

// NotifyResetState is used by the p2p stream handler to notify the consensus engine to reset the state to the specified height.
// Only a validator should receive this message to abort the current block execution.
func (ce *ConsensusEngine) NotifyResetState(height int64) {
func (ce *ConsensusEngine) NotifyResetState(height int64, txIDs []types.Hash) {
if ce.role.Load() != types.RoleValidator {
return
}

go ce.sendResetMsg(height)
go ce.sendResetMsg(&resetMsg{
height: height,
txIDs: txIDs,
})
}

type discoveryMsg struct {
Expand Down
2 changes: 1 addition & 1 deletion node/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ConsensusEngine interface {

NotifyACK(validatorPK []byte, ack types.AckRes)

NotifyResetState(height int64)
NotifyResetState(height int64, txIDs []types.Hash)

NotifyDiscoveryMessage(validatorPK []byte, height int64)

Expand Down
12 changes: 6 additions & 6 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type dummyCE struct {
ackHandler func(validatorPK []byte, ack types.AckRes)
blockCommitHandler func(blk *ktypes.Block, appHash types.Hash)
blockPropHandler func(blk *ktypes.Block)
resetStateHandler func(height int64)
resetStateHandler func(height int64, txIDs []types.Hash)

// mtx sync.Mutex
// gotACKs map[string]types.AckRes // from NotifyACK: string(validatorPK) -> AckRes
Expand Down Expand Up @@ -179,9 +179,9 @@ func (ce *dummyCE) AcceptACK() bool {
return true
}

func (ce *dummyCE) NotifyResetState(height int64) {
func (ce *dummyCE) NotifyResetState(height int64, txIDs []types.Hash) {
if ce.resetStateHandler != nil {
ce.resetStateHandler(height)
ce.resetStateHandler(height, txIDs)
return
}
}
Expand Down Expand Up @@ -243,8 +243,8 @@ func (f *faker) ACK(ack bool, height int64, blkID types.Hash, appHash *types.Has
return f.ackBroadcaster(ack, height, blkID, appHash)
}

func (f *faker) ResetState(height int64) {
f.stateResetter(height)
func (f *faker) ResetState(height int64, txIDs []types.Hash) {
f.stateResetter(height, txIDs)
}

func (f *faker) RequestBlock(ctx context.Context, height int64) {
Expand Down Expand Up @@ -275,7 +275,7 @@ func (f *faker) SetBlockPropHandler(blockPropHandler func(blk *ktypes.Block)) {
f.blockPropHandler = blockPropHandler
}

func (f *faker) SetResetStateHandler(resetStateHandler func(height int64)) {
func (f *faker) SetResetStateHandler(resetStateHandler func(height int64, txIDs []types.Hash)) {
f.resetStateHandler = resetStateHandler
}

Expand Down
35 changes: 32 additions & 3 deletions node/types/consensus.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,61 @@
package types

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
)

type ConsensusReset struct {
ToHeight int64
TxIDs []Hash
}

func (cr ConsensusReset) String() string {
return fmt.Sprintf("ConsensusReset{Height: %d}", cr.ToHeight)
}

func (cr ConsensusReset) Bytes() []byte {
return binary.LittleEndian.AppendUint64(nil, uint64(cr.ToHeight))
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, uint64(cr.ToHeight))
binary.Write(buf, binary.LittleEndian, uint64(len(cr.TxIDs)))
for _, txID := range cr.TxIDs {
buf.Write(txID[:])
}

return buf.Bytes()
}

func (cr ConsensusReset) MarshalBinary() ([]byte, error) {
return cr.Bytes(), nil
}

func (cr *ConsensusReset) UnmarshalBinary(data []byte) error {
if len(data) != 8 {
if len(data) < 16 {
return errors.New("invalid ConsensusReset data")
}
cr.ToHeight = int64(binary.LittleEndian.Uint64(data))

buf := bytes.NewBuffer(data)

var height uint64
if err := binary.Read(buf, binary.LittleEndian, &height); err != nil {
return err
}
cr.ToHeight = int64(height)

var numTxIDs uint64
if err := binary.Read(buf, binary.LittleEndian, &numTxIDs); err != nil {
return err
}
cr.TxIDs = make([]Hash, numTxIDs)

for i := range cr.TxIDs {
if _, err := buf.Read(cr.TxIDs[i][:]); err != nil {
return err
}
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions node/types/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ func TestConsensusReset_Bytes(t *testing.T) {
{
name: "standard height",
height: 1000,
want: 8,
want: 16,
},
{
name: "zero height",
height: 0,
want: 8,
want: 16,
},
{
name: "max height",
height: 1<<63 - 1,
want: 8,
want: 16,
},
}

Expand Down

0 comments on commit 498eab5

Please sign in to comment.