diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 8e96a24dbe..aaf226223d 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -1216,26 +1216,26 @@ func (s *TransactionStreamer) PollSubmittedTransactionForFinality(ctx context.Co data, err := s.espressoClient.FetchTransactionByHash(ctx, &submittedTxHash) if err != nil { - log.Error("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) + log.Warn("failed to fetch the submitted transaction hash", "err", err, "hash", submittedTxHash.String()) return s.config().EspressoTxnsPollingInterval } // get the message at the submitted txn position msg, err := s.getMessageWithMetadataAndBlockHash(submittedTxnPos) if err != nil { - log.Error("failed to get espresso message at submitted txn pos", "err", err) + log.Warn("failed to get espresso message at submitted txn pos", "err", err) return s.config().EspressoTxnsPollingInterval } // parse the message to get the transaction bytes and the justification txns, jst, err := arbos.ParseEspressoMsg(msg.MessageWithMeta.Message) if err != nil { - log.Error("failed to parse espresso message", "err", err) + log.Warn("failed to parse espresso message", "err", err) return s.config().EspressoTxnsPollingInterval } espressoHeader, err := s.espressoClient.FetchHeaderByHeight(ctx, data.BlockHeight) if err != nil { - log.Error("espresso: failed to fetch header by height ", "err", err) + log.Warn("espresso: failed to fetch header by height ", "err", err) return s.config().EspressoTxnsPollingInterval } @@ -1442,7 +1442,6 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context, ig pendingTxnsPos, err := s.getEspressoPendingTxnsPos() if err != nil { - log.Warn("failed to get pending txns position", "err", err) return s.config().EspressoTxnsPollingInterval } diff --git a/ci_skip_tests b/ci_skip_tests index c24844584b..52df2f1ade 100644 --- a/ci_skip_tests +++ b/ci_skip_tests @@ -32,7 +32,8 @@ TestBlockValidatorSimpleJITOnchain TestMockChallengeManagerAsserterIncorrect TestChallengeStakersFaultyHonestActive TestChallengeManagerFullAsserterCorrectWithPublishedMachine - +TestRedisProduce +TestBlockValidatorSimpleOnchainWithPublishedMachine # These tests are specific to Espresso and we have a dedicated # CI workflow for them. See: .github/workflows/espresso-e2e.yml diff --git a/execution/gethexec/espresso_finality_node.go b/execution/gethexec/espresso_finality_node.go new file mode 100644 index 0000000000..5b92fe8771 --- /dev/null +++ b/execution/gethexec/espresso_finality_node.go @@ -0,0 +1,132 @@ +package gethexec + +import ( + "context" + "fmt" + espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client" + "github.com/ethereum/go-ethereum/arbitrum_types" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbos" + "time" + + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/l1pricing" + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +var ( + retryTime = time.Second * 1 +) + +/* +Espresso Finality Node creates blocks with finalized hotshot transactions +*/ +type EspressoFinalityNode struct { + stopwaiter.StopWaiter + + config SequencerConfigFetcher + execEngine *ExecutionEngine + namespace uint64 + + espressoClient *espressoClient.Client + nextSeqBlockNum uint64 +} + +func NewEspressoFinalityNode(execEngine *ExecutionEngine, configFetcher SequencerConfigFetcher) *EspressoFinalityNode { + config := configFetcher() + if err := config.Validate(); err != nil { + panic(err) + } + return &EspressoFinalityNode{ + execEngine: execEngine, + config: configFetcher, + namespace: config.EspressoFinalityNodeConfig.Namespace, + espressoClient: espressoClient.NewClient(config.EspressoFinalityNodeConfig.HotShotUrl), + nextSeqBlockNum: config.EspressoFinalityNodeConfig.StartBlock, + } +} + +func (n *EspressoFinalityNode) createBlock(ctx context.Context) (returnValue bool) { + if n.nextSeqBlockNum == 0 { + latestBlock, err := n.espressoClient.FetchLatestBlockHeight(ctx) + if err != nil && latestBlock == 0 { + log.Warn("unable to fetch latest hotshot block", "err", err) + return false + } + log.Info("Started espresso finality node at the latest hotshot block", "block number", latestBlock) + n.nextSeqBlockNum = latestBlock + } + + nextSeqBlockNum := n.nextSeqBlockNum + header, err := n.espressoClient.FetchHeaderByHeight(ctx, nextSeqBlockNum) + if err != nil { + arbos.LogFailedToFetchHeader(nextSeqBlockNum) + return false + } + + arbTxns, err := n.espressoClient.FetchTransactionsInBlock(ctx, header.Height, n.namespace) + if err != nil { + arbos.LogFailedToFetchTransactions(header.Height, err) + return false + } + arbHeader := &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: l1pricing.BatchPosterAddress, + BlockNumber: header.L1Head, + Timestamp: header.Timestamp, + RequestId: nil, + L1BaseFee: nil, + } + + // Deserialize the transactions and remove the signature from the transactions. + // Ignore the malformed transactions + txes := types.Transactions{} + for _, tx := range arbTxns.Transactions { + var out types.Transaction + // signature from the data poster is the first 65 bytes of a transaction + tx = tx[65:] + if err := out.UnmarshalBinary(tx); err != nil { + log.Warn("malformed tx found") + continue + } + txes = append(txes, &out) + } + + hooks := arbos.NoopSequencingHooks() + _, err = n.execEngine.SequenceTransactions(arbHeader, txes, hooks, false) + if err != nil { + log.Error("espresso finality node: failed to sequence transactions", "err", err) + return false + } + + return true +} + +func (n *EspressoFinalityNode) Start(ctx context.Context) error { + n.StopWaiter.Start(ctx, n) + err := n.CallIterativelySafe(func(ctx context.Context) time.Duration { + madeBlock := n.createBlock(ctx) + if madeBlock { + n.nextSeqBlockNum += 1 + return 0 + } + return retryTime + }) + if err != nil { + return fmt.Errorf("failed to start espresso finality node: %w", err) + } + return nil +} + +func (n *EspressoFinalityNode) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { + return nil +} + +func (n *EspressoFinalityNode) CheckHealth(ctx context.Context) error { + return nil +} + +func (n *EspressoFinalityNode) Initialize(ctx context.Context) error { + return nil +} diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 8ee16095d9..a0da6d6144 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -185,7 +185,18 @@ func CreateExecutionNode( if err != nil { return nil, err } - txPublisher = sequencer + + // sovereign sequencer should not be configured with espresso finality node + if config.Sequencer.EnableEspressoFinalityNode && config.Sequencer.EnableEspressoSovereign { + return nil, errors.New("espresso finality node cannot be configured with espresso sovereign sequencer") + } + + if config.Sequencer.EnableEspressoFinalityNode { + espressoFinalityNode := NewEspressoFinalityNode(execEngine, seqConfigFetcher) + txPublisher = espressoFinalityNode + } else { + txPublisher = sequencer + } } else { if config.Forwarder.RedisUrl != "" { txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder) @@ -329,6 +340,7 @@ func (n *ExecutionNode) StopAndWait() { if n.TxPublisher.Started() { n.TxPublisher.StopAndWait() } + n.Recorder.OrderlyShutdown() if n.ParentChainReader != nil && n.ParentChainReader.Started() { n.ParentChainReader.StopAndWait() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 6892210ed6..96367e8aef 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -80,7 +80,10 @@ type SequencerConfig struct { expectedSurplusHardThreshold int // Espresso specific flags - EnableEspressoSovereign bool `koanf:"enable-espresso-sovereign"` + EnableEspressoSovereign bool `koanf:"enable-espresso-sovereign"` + EspressoFinalityNodeConfig EspressoFinalityNodeConfig `koanf:"espresso-finality-node-config"` + // Espresso Finality Node creates blocks with finalized hotshot transactions + EnableEspressoFinalityNode bool `koanf:"enable-espresso-finality-node"` } func (c *SequencerConfig) Validate() error { @@ -101,6 +104,12 @@ func (c *SequencerConfig) Validate() error { type SequencerConfigFetcher func() *SequencerConfig +type EspressoFinalityNodeConfig struct { + HotShotUrl string `koanf:"hotshot-url"` + StartBlock uint64 `koanf:"start-block"` + Namespace uint64 `koanf:"namespace"` +} + var DefaultSequencerConfig = SequencerConfig{ Enable: false, MaxBlockSpeed: time.Millisecond * 250, @@ -119,7 +128,8 @@ var DefaultSequencerConfig = SequencerConfig{ ExpectedSurplusHardThreshold: "default", EnableProfiling: false, - EnableEspressoSovereign: false, + EnableEspressoFinalityNode: false, + EnableEspressoSovereign: false, } var TestSequencerConfig = SequencerConfig{ @@ -139,7 +149,8 @@ var TestSequencerConfig = SequencerConfig{ ExpectedSurplusHardThreshold: "default", EnableProfiling: false, - EnableEspressoSovereign: false, + EnableEspressoFinalityNode: false, + EnableEspressoSovereign: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -160,6 +171,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable-profiling", DefaultSequencerConfig.EnableProfiling, "enable CPU profiling and tracing") // Espresso specific flags + f.Bool(prefix+".enable-espresso-finality-node", DefaultSequencerConfig.EnableEspressoFinalityNode, "enable espresso finality node") f.Bool(prefix+".enable-espresso-sovereign", DefaultSequencerConfig.EnableEspressoSovereign, "enable sovereign sequencer mode for the Espresso integration") } diff --git a/system_tests/espresso_finality_node_test.go b/system_tests/espresso_finality_node_test.go new file mode 100644 index 0000000000..ab8e3b1ac6 --- /dev/null +++ b/system_tests/espresso_finality_node_test.go @@ -0,0 +1,89 @@ +package arbtest + +import ( + "context" + "fmt" + "testing" + "time" +) + +func createEspressoFinalityNode(t *testing.T, builder *NodeBuilder) (*TestClient, func()) { + nodeConfig := builder.nodeConfig + execConfig := builder.execConfig + // poster config + nodeConfig.BatchPoster.Enable = true + nodeConfig.BatchPoster.ErrorDelay = 5 * time.Second + nodeConfig.BatchPoster.MaxSize = 41 + nodeConfig.BatchPoster.PollInterval = 10 * time.Second + nodeConfig.BatchPoster.MaxDelay = -1000 * time.Hour + nodeConfig.BatchPoster.LightClientAddress = lightClientAddress + nodeConfig.BatchPoster.HotShotUrl = hotShotUrl + + nodeConfig.BlockValidator.Enable = true + nodeConfig.BlockValidator.ValidationPoll = 2 * time.Second + nodeConfig.BlockValidator.ValidationServer.URL = fmt.Sprintf("ws://127.0.0.1:%d", 54327) + nodeConfig.BlockValidator.LightClientAddress = lightClientAddress + nodeConfig.BlockValidator.Espresso = true + nodeConfig.DelayedSequencer.Enable = true + nodeConfig.DelayedSequencer.FinalizeDistance = 1 + nodeConfig.Sequencer = true + nodeConfig.Dangerous.NoSequencerCoordinator = true + execConfig.Sequencer.Enable = true + execConfig.Sequencer.EnableEspressoFinalityNode = true + execConfig.Sequencer.EspressoFinalityNodeConfig.Namespace = builder.chainConfig.ChainID.Uint64() + execConfig.Sequencer.EspressoFinalityNodeConfig.StartBlock = 1 + execConfig.Sequencer.EspressoFinalityNodeConfig.HotShotUrl = hotShotUrl + + // disable sovereign sequencer + execConfig.Sequencer.EnableEspressoSovereign = false + builder.nodeConfig.TransactionStreamer.SovereignSequencerEnabled = false + + return builder.Build2ndNode(t, &SecondNodeParams{ + nodeConfig: nodeConfig, + execConfig: execConfig, + }) +} + +func TestEspressoFinalityNode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + valNodeCleanup := createValidationNode(ctx, t, true) + defer valNodeCleanup() + + builder, cleanup := createL1AndL2Node(ctx, t) + defer cleanup() + + err := waitForL1Node(t, ctx) + Require(t, err) + + cleanEspresso := runEspresso(t, ctx) + defer cleanEspresso() + + // wait for the builder + err = waitForEspressoNode(t, ctx) + Require(t, err) + + err = checkTransferTxOnL2(t, ctx, builder.L2, "User14", builder.L2Info) + Require(t, err) + + msgCnt, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + + err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool { + validatedCnt := builder.L2.ConsensusNode.BlockValidator.Validated(t) + return validatedCnt == msgCnt + }) + Require(t, err) + + // start the finality node + builderEspressoFinalityNode, cleanupEspressoFinalityNode := createEspressoFinalityNode(t, builder) + defer cleanupEspressoFinalityNode() + + err = waitForWith(t, ctx, 6*time.Minute, 60*time.Second, func() bool { + msgCntFinalityNode, err := builderEspressoFinalityNode.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + return msgCntFinalityNode == msgCnt + }) + Require(t, err) +}