Skip to content

Commit

Permalink
Full node with espresso finality (#237)
Browse files Browse the repository at this point in the history
* wip

* wip

* Add espresso finality node test

* fix ci

* fix ci

* pin to before marketplace version

* fix tests

* add comments

* address comments

* address comments

* address comments

* cleanup

* fix test

* fmt

* rerun ci

* ci skip tests
  • Loading branch information
Sneh1999 authored Sep 30, 2024
1 parent 8d4bc6c commit 56b4889
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 10 deletions.
9 changes: 4 additions & 5 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,26 +1216,26 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co

data, err := s.espressoClient.FetchTransactionByHash(ctx, &submittedTxHash)
if err != nil {
log.Error("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String())
log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String())
return s.config().EspressoTxnsPollingInterval
}
// get the message at the submitted txn position
msg, err := s.getMessageWithMetadataAndBlockHash(submittedTxnPos)
if err != nil {
log.Error("failed to get espresso message at submitted txn pos", "err", err)
log.Warn("failed to get espresso message at submitted txn pos", "err", err)
return s.config().EspressoTxnsPollingInterval
}

// parse the message to get the transaction bytes and the justification
txns, jst, err := arbos.ParseEspressoMsg(msg.MessageWithMeta.Message)
if err != nil {
log.Error("failed to parse espresso message", "err", err)
log.Warn("failed to parse espresso message", "err", err)
return s.config().EspressoTxnsPollingInterval
}

espressoHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight)
if err != nil {
log.Error("espresso: failed to fetch header by height ", "err", err)
log.Warn("espresso: failed to fetch header by height ", "err", err)
return s.config().EspressoTxnsPollingInterval
}

Expand Down Expand Up @@ -1442,7 +1442,6 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig

pendingTxnsPos, err := s.getEspressoPendingTxnsPos()
if err != nil {
log.Warn("failed to get pending txns position", "err", err)
return s.config().EspressoTxnsPollingInterval
}

Expand Down
3 changes: 2 additions & 1 deletion ci_skip_tests
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ TestBlockValidatorSimpleJITOnchain
TestMockChallengeManagerAsserterIncorrect
TestChallengeStakersFaultyHonestActive
TestChallengeManagerFullAsserterCorrectWithPublishedMachine

TestRedisProduce
TestBlockValidatorSimpleOnchainWithPublishedMachine

# These tests are specific to Espresso and we have a dedicated
# CI workflow for them. See: .github/workflows/espresso-e2e.yml
Expand Down
132 changes: 132 additions & 0 deletions execution/gethexec/espresso_finality_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package gethexec

import (
"context"
"fmt"
espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client"
"github.com/ethereum/go-ethereum/arbitrum_types"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbos"
"time"

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

var (
retryTime = time.Second * 1
)

/*
Espresso Finality Node creates blocks with finalized hotshot transactions
*/
type EspressoFinalityNode struct {
stopwaiter.StopWaiter

config SequencerConfigFetcher
execEngine *ExecutionEngine
namespace uint64

espressoClient *espressoClient.Client
nextSeqBlockNum uint64
}

func NewEspressoFinalityNode(execEngine *ExecutionEngine, configFetcher SequencerConfigFetcher) *EspressoFinalityNode {
config := configFetcher()
if err := config.Validate(); err != nil {
panic(err)
}
return &EspressoFinalityNode{
execEngine: execEngine,
config: configFetcher,
namespace: config.EspressoFinalityNodeConfig.Namespace,
espressoClient: espressoClient.NewClient(config.EspressoFinalityNodeConfig.HotShotUrl),
nextSeqBlockNum: config.EspressoFinalityNodeConfig.StartBlock,
}
}

func (n *EspressoFinalityNode) createBlock(ctx context.Context) (returnValue bool) {
if n.nextSeqBlockNum == 0 {
latestBlock, err := n.espressoClient.FetchLatestBlockHeight(ctx)
if err != nil && latestBlock == 0 {
log.Warn("unable to fetch latest hotshot block", "err", err)
return false
}
log.Info("Started espresso finality node at the latest hotshot block", "block number", latestBlock)
n.nextSeqBlockNum = latestBlock
}

nextSeqBlockNum := n.nextSeqBlockNum
header, err := n.espressoClient.FetchHeaderByHeight(ctx, nextSeqBlockNum)
if err != nil {
arbos.LogFailedToFetchHeader(nextSeqBlockNum)
return false
}

arbTxns, err := n.espressoClient.FetchTransactionsInBlock(ctx, header.Height, n.namespace)
if err != nil {
arbos.LogFailedToFetchTransactions(header.Height, err)
return false
}
arbHeader := &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: l1pricing.BatchPosterAddress,
BlockNumber: header.L1Head,
Timestamp: header.Timestamp,
RequestId: nil,
L1BaseFee: nil,
}

// Deserialize the transactions and remove the signature from the transactions.
// Ignore the malformed transactions
txes := types.Transactions{}
for _, tx := range arbTxns.Transactions {
var out types.Transaction
// signature from the data poster is the first 65 bytes of a transaction
tx = tx[65:]
if err := out.UnmarshalBinary(tx); err != nil {
log.Warn("malformed tx found")
continue
}
txes = append(txes, &out)
}

hooks := arbos.NoopSequencingHooks()
_, err = n.execEngine.SequenceTransactions(arbHeader, txes, hooks, false)
if err != nil {
log.Error("espresso finality node: failed to sequence transactions", "err", err)
return false
}

return true
}

func (n *EspressoFinalityNode) Start(ctx context.Context) error {
n.StopWaiter.Start(ctx, n)
err := n.CallIterativelySafe(func(ctx context.Context) time.Duration {
madeBlock := n.createBlock(ctx)
if madeBlock {
n.nextSeqBlockNum += 1
return 0
}
return retryTime
})
if err != nil {
return fmt.Errorf("failed to start espresso finality node: %w", err)
}
return nil
}

func (n *EspressoFinalityNode) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
return nil
}

func (n *EspressoFinalityNode) CheckHealth(ctx context.Context) error {
return nil
}

func (n *EspressoFinalityNode) Initialize(ctx context.Context) error {
return nil
}
14 changes: 13 additions & 1 deletion execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,18 @@ func CreateExecutionNode(
if err != nil {
return nil, err
}
txPublisher = sequencer

// sovereign sequencer should not be configured with espresso finality node
if config.Sequencer.EnableEspressoFinalityNode && config.Sequencer.EnableEspressoSovereign {
return nil, errors.New("espresso finality node cannot be configured with espresso sovereign sequencer")
}

if config.Sequencer.EnableEspressoFinalityNode {
espressoFinalityNode := NewEspressoFinalityNode(execEngine, seqConfigFetcher)
txPublisher = espressoFinalityNode
} else {
txPublisher = sequencer
}
} else {
if config.Forwarder.RedisUrl != "" {
txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
Expand Down Expand Up @@ -329,6 +340,7 @@ func (n *ExecutionNode) StopAndWait() {
if n.TxPublisher.Started() {
n.TxPublisher.StopAndWait()
}

n.Recorder.OrderlyShutdown()
if n.ParentChainReader != nil && n.ParentChainReader.Started() {
n.ParentChainReader.StopAndWait()
Expand Down
18 changes: 15 additions & 3 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ type SequencerConfig struct {
expectedSurplusHardThreshold int

// Espresso specific flags
EnableEspressoSovereign bool `koanf:"enable-espresso-sovereign"`
EnableEspressoSovereign bool `koanf:"enable-espresso-sovereign"`
EspressoFinalityNodeConfig EspressoFinalityNodeConfig `koanf:"espresso-finality-node-config"`
// Espresso Finality Node creates blocks with finalized hotshot transactions
EnableEspressoFinalityNode bool `koanf:"enable-espresso-finality-node"`
}

func (c *SequencerConfig) Validate() error {
Expand All @@ -101,6 +104,12 @@ func (c *SequencerConfig) Validate() error {

type SequencerConfigFetcher func() *SequencerConfig

type EspressoFinalityNodeConfig struct {
HotShotUrl string `koanf:"hotshot-url"`
StartBlock uint64 `koanf:"start-block"`
Namespace uint64 `koanf:"namespace"`
}

var DefaultSequencerConfig = SequencerConfig{
Enable: false,
MaxBlockSpeed: time.Millisecond * 250,
Expand All @@ -119,7 +128,8 @@ var DefaultSequencerConfig = SequencerConfig{
ExpectedSurplusHardThreshold: "default",
EnableProfiling: false,

EnableEspressoSovereign: false,
EnableEspressoFinalityNode: false,
EnableEspressoSovereign: false,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -139,7 +149,8 @@ var TestSequencerConfig = SequencerConfig{
ExpectedSurplusHardThreshold: "default",
EnableProfiling: false,

EnableEspressoSovereign: false,
EnableEspressoFinalityNode: false,
EnableEspressoSovereign: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -160,6 +171,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable-profiling", DefaultSequencerConfig.EnableProfiling, "enable CPU profiling and tracing")

// Espresso specific flags
f.Bool(prefix+".enable-espresso-finality-node", DefaultSequencerConfig.EnableEspressoFinalityNode, "enable espresso finality node")
f.Bool(prefix+".enable-espresso-sovereign", DefaultSequencerConfig.EnableEspressoSovereign, "enable sovereign sequencer mode for the Espresso integration")
}

Expand Down
89 changes: 89 additions & 0 deletions system_tests/espresso_finality_node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package arbtest

import (
"context"
"fmt"
"testing"
"time"
)

func createEspressoFinalityNode(t *testing.T, builder *NodeBuilder) (*TestClient, func()) {
nodeConfig := builder.nodeConfig
execConfig := builder.execConfig
// poster config
nodeConfig.BatchPoster.Enable = true
nodeConfig.BatchPoster.ErrorDelay = 5 * time.Second
nodeConfig.BatchPoster.MaxSize = 41
nodeConfig.BatchPoster.PollInterval = 10 * time.Second
nodeConfig.BatchPoster.MaxDelay = -1000 * time.Hour
nodeConfig.BatchPoster.LightClientAddress = lightClientAddress
nodeConfig.BatchPoster.HotShotUrl = hotShotUrl

nodeConfig.BlockValidator.Enable = true
nodeConfig.BlockValidator.ValidationPoll = 2 * time.Second
nodeConfig.BlockValidator.ValidationServer.URL = fmt.Sprintf("ws://127.0.0.1:%d", 54327)
nodeConfig.BlockValidator.LightClientAddress = lightClientAddress
nodeConfig.BlockValidator.Espresso = true
nodeConfig.DelayedSequencer.Enable = true
nodeConfig.DelayedSequencer.FinalizeDistance = 1
nodeConfig.Sequencer = true
nodeConfig.Dangerous.NoSequencerCoordinator = true
execConfig.Sequencer.Enable = true
execConfig.Sequencer.EnableEspressoFinalityNode = true
execConfig.Sequencer.EspressoFinalityNodeConfig.Namespace = builder.chainConfig.ChainID.Uint64()
execConfig.Sequencer.EspressoFinalityNodeConfig.StartBlock = 1
execConfig.Sequencer.EspressoFinalityNodeConfig.HotShotUrl = hotShotUrl

// disable sovereign sequencer
execConfig.Sequencer.EnableEspressoSovereign = false
builder.nodeConfig.TransactionStreamer.SovereignSequencerEnabled = false

return builder.Build2ndNode(t, &SecondNodeParams{
nodeConfig: nodeConfig,
execConfig: execConfig,
})
}

func TestEspressoFinalityNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

valNodeCleanup := createValidationNode(ctx, t, true)
defer valNodeCleanup()

builder, cleanup := createL1AndL2Node(ctx, t)
defer cleanup()

err := waitForL1Node(t, ctx)
Require(t, err)

cleanEspresso := runEspresso(t, ctx)
defer cleanEspresso()

// wait for the builder
err = waitForEspressoNode(t, ctx)
Require(t, err)

err = checkTransferTxOnL2(t, ctx, builder.L2, "User14", builder.L2Info)
Require(t, err)

msgCnt, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount()
Require(t, err)

err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool {
validatedCnt := builder.L2.ConsensusNode.BlockValidator.Validated(t)
return validatedCnt == msgCnt
})
Require(t, err)

// start the finality node
builderEspressoFinalityNode, cleanupEspressoFinalityNode := createEspressoFinalityNode(t, builder)
defer cleanupEspressoFinalityNode()

err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool {
msgCntFinalityNode, err := builderEspressoFinalityNode.ConsensusNode.TxStreamer.GetMessageCount()
Require(t, err)
return msgCntFinalityNode == msgCnt
})
Require(t, err)
}

0 comments on commit 56b4889

Please sign in to comment.