Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: epoch notifier #144

Merged
merged 27 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ff7641c
feat: Epoch Notifier
joanestebanr Oct 31, 2024
6919628
fix: epoch, rebase error
joanestebanr Oct 31, 2024
497488d
feat: rename ClockConfiguration to EpochConfiguration that is the fin…
joanestebanr Nov 7, 2024
0b8ba05
fix: lint
joanestebanr Nov 7, 2024
0fe7463
fix: adapt to rebase
joanestebanr Nov 8, 2024
321dec5
fix: add a new bridge test ETH L1 -> L2 -> L1
joanestebanr Nov 11, 2024
692c305
feat: remove second go-routine. The check of previous certificate is …
joanestebanr Nov 11, 2024
053b649
feat: remove logs
joanestebanr Nov 11, 2024
438371f
feat: remove logs
joanestebanr Nov 11, 2024
3e4025b
feat: set for L1InfoTreeSync SyncBlockChunkSize=100
joanestebanr Nov 11, 2024
b49fc8e
fix: output of local_config script
joanestebanr Nov 11, 2024
aaf9254
fix: unittest
joanestebanr Nov 11, 2024
8b382e3
fix: unittest
joanestebanr Nov 11, 2024
ad267df
fix: wip
joanestebanr Nov 11, 2024
8f5cd47
fix: wip
joanestebanr Nov 11, 2024
a1524fe
feat: merge 'transfer ETH L1 to L2 to L1' test with 'Native gas token…
joanestebanr Nov 11, 2024
223a93b
fix: e2e
joanestebanr Nov 11, 2024
f680873
fix: PR comments
joanestebanr Nov 11, 2024
1ebcc5a
fix: e2e
joanestebanr Nov 11, 2024
705d5cc
fix: e2e
joanestebanr Nov 11, 2024
5047239
fix: e2e
joanestebanr Nov 11, 2024
251cb70
fix: e2e
joanestebanr Nov 11, 2024
12ddfe6
fix: e2e
joanestebanr Nov 11, 2024
06c121a
fix: e2e
joanestebanr Nov 11, 2024
41378a7
feat: notify epoch is a percentage of epoch
joanestebanr Nov 12, 2024
3cab2b3
feat: edge case of 99%
joanestebanr Nov 12, 2024
bb7e5d4
feat: PR comment, checkPendingCertificatesStatus returns bool and error
joanestebanr Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion agglayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ import (

const errCodeAgglayerRateLimitExceeded int = -10007

var ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
var (
ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
jSONRPCCall = rpc.JSONRPCCall
)

type AggLayerClientGetEpochConfiguration interface {
GetEpochConfiguration() (*ClockConfiguration, error)
}

// AgglayerClientInterface is the interface that defines the methods that the AggLayerClient will implement
type AgglayerClientInterface interface {
SendTx(signedTx SignedTx) (common.Hash, error)
WaitTxToBeMined(hash common.Hash, ctx context.Context) error
SendCertificate(certificate *SignedCertificate) (common.Hash, error)
GetCertificateHeader(certificateHash common.Hash) (*CertificateHeader, error)
AggLayerClientGetEpochConfiguration
}

// AggLayerClient is the client that will be used to interact with the AggLayer
Expand Down Expand Up @@ -130,3 +138,23 @@ func (c *AggLayerClient) GetCertificateHeader(certificateHash common.Hash) (*Cer

return result, nil
}

// GetEpochConfiguration returns the clock configuration of AggLayer
func (c *AggLayerClient) GetEpochConfiguration() (*ClockConfiguration, error) {
response, err := jSONRPCCall(c.url, "interop_getEpochConfiguration")
if err != nil {
return nil, err
}

if response.Error != nil {
return nil, fmt.Errorf("GetEpochConfiguration code=%d msg=%s", response.Error.Code, response.Error.Message)
}

var result *ClockConfiguration
err = json.Unmarshal(response.Result, &result)
if err != nil {
return nil, err
}

return result, nil
}
76 changes: 76 additions & 0 deletions agglayer/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package agglayer

import (
"fmt"
"testing"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/stretchr/testify/require"
)

const (
testURL = "http://localhost:8080"
)

func TestExploratoryClient(t *testing.T) {
t.Skip("This test is for exploratory purposes only")
sut := NewAggLayerClient("http://127.0.0.1:32853")
config, err := sut.GetEpochConfiguration()
require.NoError(t, err)
require.NotNil(t, config)
fmt.Printf("Config: %s", config.String())
}

func TestGetEpochConfigurationResponseWithError(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Error: &rpc.ErrorObject{},
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationResponseBadJson(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationErrorResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)

jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return rpc.Response{}, fmt.Errorf("unittest error")
}
clockConfig, err := sut.GetEpochConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetEpochConfigurationOkResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{"epoch_duration": 1, "genesis_block": 1}`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetEpochConfiguration()
require.NotNil(t, clockConfig)
require.NoError(t, err)
require.Equal(t, ClockConfiguration{
EpochDuration: 1,
GenesisBlock: 1,
}, *clockConfig)
}
32 changes: 31 additions & 1 deletion agglayer/mock_agglayer_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions agglayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,14 @@ func (c CertificateHeader) String() string {
return fmt.Sprintf("Height: %d, CertificateID: %s, NewLocalExitRoot: %s",
c.Height, c.CertificateID.String(), c.NewLocalExitRoot.String())
}

// ClockConfiguration represents the configuration of the epoch clock
// returned by the interop_GetEpochConfiguration RPC call
type ClockConfiguration struct {
EpochDuration uint64 `json:"epoch_duration"`
GenesisBlock uint64 `json:"genesis_block"`
}

func (c ClockConfiguration) String() string {
return fmt.Sprintf("EpochDuration: %d, GenesisBlock: %d", c.EpochDuration, c.GenesisBlock)
}
79 changes: 43 additions & 36 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"fmt"
"math/big"
"os"
"slices"
"time"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db"
aggsendertypes "github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/bridgesync"
cdkcommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/l1infotreesync"
Expand All @@ -33,10 +34,11 @@ var (

// AggSender is a component that will send certificates to the aggLayer
type AggSender struct {
log aggsendertypes.Logger
log types.Logger

l2Syncer aggsendertypes.L2BridgeSyncer
l1infoTreeSyncer aggsendertypes.L1InfoTreeSyncer
l2Syncer types.L2BridgeSyncer
l1infoTreeSyncer types.L1InfoTreeSyncer
epochNotifier types.EpochNotifier

storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface
Expand All @@ -53,7 +55,8 @@ func New(
cfg Config,
aggLayerClient agglayer.AgglayerClientInterface,
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer *bridgesync.BridgeSync) (*AggSender, error) {
l2Syncer *bridgesync.BridgeSync,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
if err != nil {
return nil, err
Expand All @@ -74,24 +77,30 @@ func New(
aggLayerClient: aggLayerClient,
l1infoTreeSyncer: l1InfoTreeSyncer,
sequencerKey: sequencerPrivateKey,
epochNotifier: epochNotifier,
}, nil
}

// Start starts the AggSender
func (a *AggSender) Start(ctx context.Context) {
go a.sendCertificates(ctx)
go a.checkIfCertificatesAreSettled(ctx)
a.sendCertificates(ctx)
}

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
ticker := time.NewTicker(a.cfg.BlockGetInterval.Duration)

chEpoch := a.epochNotifier.Subscribe("aggsender")
for {
select {
case <-ticker.C:
if _, err := a.sendCertificate(ctx); err != nil {
log.Error(err)
case epoch := <-chEpoch:
a.log.Infof("Epoch received: %s", epoch.String())
thereArePendingCerts, err := a.checkPendingCertificatesStatus(ctx)
if err == nil && !thereArePendingCerts {
if _, err := a.sendCertificate(ctx); err != nil {
log.Error(err)
}
} else {
log.Warnf("Skipping epoch %s because there are pending certificates %v or error: %w",
epoch.String(), thereArePendingCerts, err)
}
case <-ctx.Done():
a.log.Info("AggSender stopped")
Expand Down Expand Up @@ -183,7 +192,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
}

createdTime := time.Now().UTC().UnixMilli()
certInfo := aggsendertypes.CertificateInfo{
certInfo := types.CertificateInfo{
Height: certificate.Height,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
Expand Down Expand Up @@ -224,7 +233,7 @@ func (a *AggSender) saveCertificateToFile(signedCertificate *agglayer.SignedCert

// getNextHeightAndPreviousLER returns the height and previous LER for the new certificate
func (a *AggSender) getNextHeightAndPreviousLER(
lastSentCertificateInfo *aggsendertypes.CertificateInfo) (uint64, common.Hash) {
lastSentCertificateInfo *types.CertificateInfo) (uint64, common.Hash) {
height := lastSentCertificateInfo.Height + 1
if lastSentCertificateInfo.Status == agglayer.InError {
// previous certificate was in error, so we need to resend it
Expand All @@ -247,7 +256,7 @@ func (a *AggSender) getNextHeightAndPreviousLER(
func (a *AggSender) buildCertificate(ctx context.Context,
bridges []bridgesync.Bridge,
claims []bridgesync.Claim,
lastSentCertificateInfo aggsendertypes.CertificateInfo,
lastSentCertificateInfo types.CertificateInfo,
toBlock uint64) (*agglayer.Certificate, error) {
if len(bridges) == 0 && len(claims) == 0 {
return nil, errNoBridgesAndClaims
Expand Down Expand Up @@ -475,34 +484,30 @@ func (a *AggSender) signCertificate(certificate *agglayer.Certificate) (*agglaye
}, nil
}

// checkIfCertificatesAreSettled checks if certificates are settled
func (a *AggSender) checkIfCertificatesAreSettled(ctx context.Context) {
ticker := time.NewTicker(a.cfg.CheckSettledInterval.Duration)
for {
select {
case <-ticker.C:
a.checkPendingCertificatesStatus(ctx)
case <-ctx.Done():
return
}
}
}

// checkPendingCertificatesStatus checks the status of pending certificates
// and updates in the storage if it changed on agglayer
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) {
// It returns:
// bool -> if there are pending certificates
// error -> if there was an error
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) (bool, error) {
pendingCertificates, err := a.storage.GetCertificatesByStatus(nonSettledStatuses)
if err != nil {
a.log.Errorf("error getting pending certificates: %w", err)
return
err = fmt.Errorf("error getting pending certificates: %w", err)
a.log.Error(err)
return true, err
}
thereArePendingCertificates := false
a.log.Debugf("checkPendingCertificatesStatus num of pendingCertificates: %d", len(pendingCertificates))
for _, certificate := range pendingCertificates {
certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificate.CertificateID)
if err != nil {
a.log.Errorf("error getting certificate header of %s from agglayer: %w",
certificate.String(), err)
continue
err = fmt.Errorf("error getting certificate header of %d/%s from agglayer: %w",
certificate.Height, certificate.String(), err)
a.log.Error(err)
return true, err
}
if slices.Contains(nonSettledStatuses, certificateHeader.Status) {
thereArePendingCertificates = true
}
a.log.Debugf("aggLayerClient.GetCertificateHeader status [%s] of certificate %s ",
certificateHeader.Status,
Expand All @@ -516,11 +521,13 @@ func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) {
certificate.UpdatedAt = time.Now().UTC().UnixMilli()

if err := a.storage.UpdateCertificateStatus(ctx, *certificate); err != nil {
a.log.Errorf("error updating certificate %s status in storage: %w", certificateHeader.String(), err)
continue
err = fmt.Errorf("error updating certificate %s status in storage: %w", certificateHeader.String(), err)
a.log.Error(err)
return true, err
}
}
}
return thereArePendingCertificates, nil
}

// shouldSendCertificate checks if a certificate should be sent at given time
Expand Down
Loading
Loading