Skip to content

Commit

Permalink
Espresso streamer tests (#511)
Browse files Browse the repository at this point in the history
* Changes to espresso_streamer to facilitate mock testing

* Initial testing framework for espresso streamer

* Fix compiltion of caff node

* Fix test compilation

* merge

* fix merge

* espresso streamer test working

* Add espresso streamer test to CI

* lint

* Add espresso streamer tests

* Lint

---------

Co-authored-by: Zach Showalter <zacshowa@gmail.com>
Co-authored-by: Jeremy <297323986@qq.com>
  • Loading branch information
3 people authored Feb 27, 2025
1 parent 9f86753 commit 70370fd
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 56 deletions.
113 changes: 70 additions & 43 deletions espressostreamer/espresso_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,26 @@ import (
"time"

espressoClient "github.com/EspressoSystems/espresso-sequencer-go/client"
espressoTypes "github.com/EspressoSystems/espresso-sequencer-go/types"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

type EspressoTEEVerifierInterface interface {
Verify(opts *bind.CallOpts, rawQuote []byte, reportDataHash [32]byte) error
}

type EspressoClientInterface interface {
FetchLatestBlockHeight(ctx context.Context) (uint64, error)
FetchTransactionsInBlock(ctx context.Context, blockHeight uint64, namespace uint64) (espressoClient.TransactionsInBlock, error)
}

type MessageWithMetadataAndPos struct {
MessageWithMeta arbostypes.MessageWithMetadata
Pos uint64
Expand All @@ -26,27 +35,28 @@ type MessageWithMetadataAndPos struct {

type EspressoStreamer struct {
stopwaiter.StopWaiter
espressoClient *espressoClient.MultipleNodesClient
espressoClient EspressoClientInterface
nextHotshotBlockNum uint64
currentMessagePos uint64
namespace uint64
retryTime time.Duration
pollingHotshotPollingInterval time.Duration
messageWithMetadataAndPos []*MessageWithMetadataAndPos
espressoTEEVerifierCaller bridgegen.EspressoTEEVerifier
espressoTEEVerifierCaller EspressoTEEVerifierInterface

messageMutex sync.Mutex
}

func NewEspressoStreamer(namespace uint64, hotshotUrls []string,
func NewEspressoStreamer(namespace uint64,
nextHotshotBlockNum uint64,
retryTime time.Duration,
pollingHotshotPollingInterval time.Duration,
espressoTEEVerifierCaller bridgegen.EspressoTEEVerifier,
espressoTEEVerifierCaller EspressoTEEVerifierInterface,
espressoClientInterface EspressoClientInterface,
) *EspressoStreamer {

return &EspressoStreamer{
espressoClient: espressoClient.NewMultipleNodesClient(hotshotUrls),
espressoClient: espressoClientInterface,
nextHotshotBlockNum: nextHotshotBlockNum,
retryTime: retryTime,
pollingHotshotPollingInterval: pollingHotshotPollingInterval,
Expand Down Expand Up @@ -93,12 +103,60 @@ func (s *EspressoStreamer) verifyAttestationQuote(attestation []byte, userDataHa
return nil
}

/**
func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes) ([]*MessageWithMetadataAndPos, error) {
attestation, userDataHash, indices, messages, err := arbutil.ParseHotShotPayload(tx)
if err != nil {
log.Warn("failed to parse hotshot payload", "err", err)
return nil, err
}
// if attestation verification fails, we should skip this message
// Parse the messages
if len(userDataHash) != 32 {
log.Warn("user data hash is not 32 bytes")
return nil, fmt.Errorf("user data hash is not 32 bytes")
}
userDataHashArr := [32]byte(userDataHash)
err = s.verifyAttestationQuote(attestation, userDataHashArr)
if err != nil {
log.Warn("failed to verify attestation quote", "err", err)
return nil, err
}
result := []*MessageWithMetadataAndPos{}

for i, message := range messages {
var messageWithMetadata arbostypes.MessageWithMetadata
err = rlp.DecodeBytes(message, &messageWithMetadata)
if err != nil {
log.Warn("failed to decode message", "err", err)
// Instead of returnning an error, we should just skip this message
continue
}
if indices[i] < s.currentMessagePos {
log.Warn("message index is less than current message pos, skipping", "messageIndex", indices[i], "currentMessagePos", s.currentMessagePos)
continue
}
result = append(result, &MessageWithMetadataAndPos{
MessageWithMeta: messageWithMetadata,
Pos: indices[i],
HotshotHeight: s.nextHotshotBlockNum,
})
log.Info("Added message to queue", "message", indices[i])
}
return result, nil
}

/*
*
* Create a queue of messages from the hotshot to be processed by the node
* It will sort the messages by the message index
* and store the messages in `messagesWithMetadata` queue
*
* Expose the *parseHotShotPayloadFn* to the caller for testing purposes
*/
func (s *EspressoStreamer) queueMessagesFromHotshot(ctx context.Context) error {
func (s *EspressoStreamer) QueueMessagesFromHotshot(
ctx context.Context,
parseHotShotPayloadFn func(tx espressoTypes.Bytes) ([]*MessageWithMetadataAndPos, error),
) error {
// Note: Adding the lock on top level
// because s.nextHotshotBlockNum is updated if n.nextHotshotBlockNum == 0
s.messageMutex.Lock()
Expand Down Expand Up @@ -132,43 +190,12 @@ func (s *EspressoStreamer) queueMessagesFromHotshot(ctx context.Context) error {
}

for _, tx := range arbTxns.Transactions {
// Parse hotshot payload
attestation, userDataHash, indices, messages, err := arbutil.ParseHotShotPayload(tx)
messages, err := parseHotShotPayloadFn(tx)
if err != nil {
log.Warn("failed to parse hotshot payload", "err", err)
log.Warn("failed to verify espresso transaction", "err", err)
continue
}
// if attestation verification fails, we should skip this message
// Parse the messages
if len(userDataHash) != 32 {
log.Warn("user data hash is not 32 bytes")
continue
}
userDataHashArr := [32]byte(userDataHash)
err = s.verifyAttestationQuote(attestation, userDataHashArr)
if err != nil {
log.Warn("failed to verify attestation quote", "err", err)
continue
}
for i, message := range messages {
var messageWithMetadata arbostypes.MessageWithMetadata
err = rlp.DecodeBytes(message, &messageWithMetadata)
if err != nil {
log.Warn("failed to decode message", "err", err)
// Instead of returnning an error, we should just skip this message
continue
}
if indices[i] < s.currentMessagePos {
log.Warn("message index is less than current message pos, skipping", "messageIndex", indices[i], "currentMessagePos", s.currentMessagePos)
continue
}
s.messageWithMetadataAndPos = append(s.messageWithMetadataAndPos, &MessageWithMetadataAndPos{
MessageWithMeta: messageWithMetadata,
Pos: indices[i],
HotshotHeight: s.nextHotshotBlockNum,
})
log.Info("Added message to queue", "message", indices[i])
}
s.messageWithMetadataAndPos = append(s.messageWithMetadataAndPos, messages...)
}

s.nextHotshotBlockNum += 1
Expand All @@ -179,7 +206,7 @@ func (s *EspressoStreamer) queueMessagesFromHotshot(ctx context.Context) error {
func (s *EspressoStreamer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)
err := s.CallIterativelySafe(func(ctx context.Context) time.Duration {
err := s.queueMessagesFromHotshot(ctx)
err := s.QueueMessagesFromHotshot(ctx, s.parseEspressoTransaction)
if err != nil {
log.Error("error while queueing messages from hotshot", "err", err)
return s.retryTime
Expand Down
Loading

0 comments on commit 70370fd

Please sign in to comment.