Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reorg detector and syncer issues on develop #272

Merged
merged 12 commits into from
Jan 23, 2025
10 changes: 8 additions & 2 deletions bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

const (
bridgeSyncL1 = "L1"
bridgeSyncL2 = "L2"
bridgeSyncL1 = "BridgeSyncL1"
bridgeSyncL2 = "BridgeSyncL2"
downloadBufferSize = 1000
)

Expand Down Expand Up @@ -45,6 +45,7 @@ func NewL1(
maxRetryAttemptsAfterError int,
originNetwork uint32,
syncFullClaims bool,
finalizedBlockType etherman.BlockNumberFinality,
) (*BridgeSync, error) {
return newBridgeSync(
ctx,
Expand All @@ -61,6 +62,7 @@ func NewL1(
maxRetryAttemptsAfterError,
originNetwork,
syncFullClaims,
finalizedBlockType,
)
}

Expand All @@ -79,6 +81,7 @@ func NewL2(
maxRetryAttemptsAfterError int,
originNetwork uint32,
syncFullClaims bool,
finalizedBlockType etherman.BlockNumberFinality,
) (*BridgeSync, error) {
return newBridgeSync(
ctx,
Expand All @@ -95,6 +98,7 @@ func NewL2(
maxRetryAttemptsAfterError,
originNetwork,
syncFullClaims,
finalizedBlockType,
)
}

Expand All @@ -113,6 +117,7 @@ func newBridgeSync(
maxRetryAttemptsAfterError int,
originNetwork uint32,
syncFullClaims bool,
finalizedBlockType etherman.BlockNumberFinality,
) (*BridgeSync, error) {
logger := log.WithFields("bridge-syncer", layerID)
processor, err := newProcessor(dbPath, logger)
Expand Down Expand Up @@ -151,6 +156,7 @@ func newBridgeSync(
appender,
[]common.Address{bridge},
rh,
finalizedBlockType,
)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions bridgesync/bridgesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestNewLx(t *testing.T) {
maxRetryAttemptsAfterError,
originNetwork,
false,
blockFinalityType,
)

assert.NoError(t, err)
Expand All @@ -77,6 +78,7 @@ func TestNewLx(t *testing.T) {
maxRetryAttemptsAfterError,
originNetwork,
false,
blockFinalityType,
)

assert.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions bridgesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func TestBridgeEventE2E(t *testing.T) {
}
}

helpers.CommitBlocks(t, setup.L1Environment.SimBackend, 11, blockTime)

// Wait for syncer to catch up
time.Sleep(time.Second * 2) // sleeping since the processor could be up to date, but have pending reorgs
lb, err := setup.L1Environment.SimBackend.Client().BlockNumber(ctx)
Expand Down
3 changes: 3 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func runL1InfoTreeSyncerIfNeeded(
cfg.L1InfoTreeSync.RetryAfterErrorPeriod.Duration,
cfg.L1InfoTreeSync.MaxRetryAttemptsAfterError,
l1infotreesync.FlagNone,
etherman.FinalizedBlock,
)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -727,6 +728,7 @@ func runBridgeSyncL1IfNeeded(
cfg.MaxRetryAttemptsAfterError,
rollupID,
false,
etherman.FinalizedBlock,
)
if err != nil {
log.Fatalf("error creating bridgeSyncL1: %s", err)
Expand Down Expand Up @@ -762,6 +764,7 @@ func runBridgeSyncL2IfNeeded(
cfg.MaxRetryAttemptsAfterError,
rollupID,
true,
etherman.LatestBlock,
)
if err != nil {
log.Fatalf("error creating bridgeSyncL2: %s", err)
Expand Down
18 changes: 8 additions & 10 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func TestE2E(t *testing.T) {
rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)
syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3,
l1infotreesync.FlagAllowWrongContractsAddrs)
syncer, err := l1infotreesync.New(ctx, dbPath, gerAddr, verifyAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 25,
l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock)
require.NoError(t, err)

go syncer.Start(ctx)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestWithReorgs(t *testing.T) {
require.NoError(t, rd.Start(ctx))

syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25,
l1infotreesync.FlagAllowWrongContractsAddrs)
l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down Expand Up @@ -221,9 +221,6 @@ func TestWithReorgs(t *testing.T) {
// Block 4, 5, 6 after the fork
helpers.CommitBlocks(t, client, 3, time.Millisecond*500)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
Expand All @@ -236,11 +233,12 @@ func TestWithReorgs(t *testing.T) {
require.NoError(t, err)
time.Sleep(time.Millisecond * 500)

helpers.CommitBlocks(t, client, 1, time.Millisecond*100)

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(2, 1)

// Block 4, 5, 6, 7 after the fork
helpers.CommitBlocks(t, client, 4, time.Millisecond*100)
helpers.CommitBlocks(t, client, 1, time.Millisecond*100)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)
Expand Down Expand Up @@ -274,7 +272,7 @@ func TestStressAndReorgs(t *testing.T) {
require.NoError(t, rd.Start(ctx))

syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100,
l1infotreesync.FlagAllowWrongContractsAddrs)
l1infotreesync.FlagAllowWrongContractsAddrs, etherman.SafeBlock)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down Expand Up @@ -305,7 +303,7 @@ func TestStressAndReorgs(t *testing.T) {
}
}

helpers.CommitBlocks(t, client, 1, time.Millisecond*10)
helpers.CommitBlocks(t, client, 11, time.Millisecond*10)

waitForSyncerToCatchUp(ctx, t, syncer, client)

Expand Down
2 changes: 2 additions & 0 deletions l1infotreesync/l1infotreesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func New(
retryAfterErrorPeriod time.Duration,
maxRetryAttemptsAfterError int,
flags CreationFlags,
finalizedBlockType etherman.BlockNumberFinality,
) (*L1InfoTreeSync, error) {
processor, err := newProcessor(dbPath)
if err != nil {
Expand Down Expand Up @@ -83,6 +84,7 @@ func New(
appender,
[]common.Address{globalExitRoot, rollupManager},
rh,
finalizedBlockType,
)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
shouldRollback := true
defer func() {
if shouldRollback {
log.Debugf("rolling back reorg, first reorged block: %d", firstReorgedBlock)
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
}
Expand All @@ -269,6 +268,9 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
if err := tx.Commit(); err != nil {
return err
}

shouldRollback = false

sync.UnhaltIfAffectedRows(&p.halted, &p.haltedReason, &p.mu, rowsAffected)
return nil
}
Expand Down
77 changes: 67 additions & 10 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type EthClienter interface {

type EVMDownloaderInterface interface {
WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64)
GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock
GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks
GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log
GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool)
GetLastFinalizedBlock(ctx context.Context) (*types.Header, error)
}

type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error
Expand All @@ -50,16 +51,35 @@ func NewEVMDownloader(
appender LogAppenderMap,
adressessToQuery []common.Address,
rh *RetryHandler,
finalizedBlockType etherman.BlockNumberFinality,
) (*EVMDownloader, error) {
logger := log.WithFields("syncer", syncerID)
finality, err := blockFinalityType.ToBlockNum()
if err != nil {
return nil, err
}

topicsToQuery := make([]common.Hash, 0, len(appender))
for topic := range appender {
topicsToQuery = append(topicsToQuery, topic)
}

fbtEthermanType := finalizedBlockType
fbt, err := finalizedBlockType.ToBlockNum()
if err != nil {
return nil, err
}

if fbt.Cmp(finality) > 0 {
// if someone configured the syncer to query blocks by Safe or Finalized block
// finalized block type should be at least the same as the block finality
fbt = finality
fbtEthermanType = blockFinalityType
}

logger.Infof("downloader initialized with block finality: %s, finalized block type: %s. SyncChunkSize: %d",
blockFinalityType, fbtEthermanType, syncBlockChunkSize)

return &EVMDownloader{
syncBlockChunkSize: syncBlockChunkSize,
log: logger,
Expand All @@ -72,12 +92,14 @@ func NewEVMDownloader(
adressessToQuery: adressessToQuery,
rh: rh,
log: logger,
finalizedBlockType: fbt,
},
}, nil
}

func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) {
lastBlock := d.WaitForNewBlocks(ctx, 0)

for {
select {
case <-ctx.Done():
Expand All @@ -86,28 +108,44 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
return
default:
}

toBlock := fromBlock + d.syncBlockChunkSize
if toBlock > lastBlock {
toBlock = lastBlock
}

if fromBlock > toBlock {
d.log.Debugf(
d.log.Infof(
"waiting for new blocks, last block processed: %d, last block seen on L1: %d",
fromBlock-1, lastBlock,
)
lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1)
continue
}
d.log.Debugf("getting events from block %d to %d", fromBlock, toBlock)

lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx)
if err != nil {
d.log.Error("error getting last finalized block: ", err)
continue
}

lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64()

d.log.Infof("getting events from blocks %d to %d. lastFinalizedBlock: %d",
fromBlock, toBlock, lastFinalizedBlockNumber)
blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock)
for _, b := range blocks {
d.log.Debugf("sending block %d to the driver (with events)", b.Num)
downloadedCh <- b

reportBlocksFn := func(numOfBlocksToReport int) {
for i := 0; i < numOfBlocksToReport; i++ {
d.log.Infof("sending block %d to the driver (with events)", blocks[i].Num)
downloadedCh <- blocks[i]
}
}
if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock {

reportEmptyBlockFn := func(blockNum uint64) {
// Indicate the last downloaded block if there are not events on it
d.log.Debugf("sending block %d to the driver (without events)", toBlock)
header, isCanceled := d.GetBlockHeader(ctx, toBlock)
header, isCanceled := d.GetBlockHeader(ctx, blockNum)
if isCanceled {
return
}
Expand All @@ -116,7 +154,21 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
EVMBlockHeader: header,
}
}
fromBlock = toBlock + 1

if toBlock <= lastFinalizedBlockNumber {
reportBlocksFn(blocks.Len())
fromBlock = toBlock + 1

if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock {
reportEmptyBlockFn(toBlock)
}
} else {
lastFinalizedBlockNum, i, found := blocks.LastFinalizedBlock(lastFinalizedBlockNumber)
if found {
reportBlocksFn(i + 1)
fromBlock = lastFinalizedBlockNum + 1
}
}
}
}

Expand All @@ -129,6 +181,7 @@ type EVMDownloaderImplementation struct {
adressessToQuery []common.Address
rh *RetryHandler
log *log.Logger
finalizedBlockType *big.Int
}

func NewEVMDownloaderImplementation(
Expand All @@ -154,6 +207,10 @@ func NewEVMDownloaderImplementation(
}
}

func (d *EVMDownloaderImplementation) GetLastFinalizedBlock(ctx context.Context) (*types.Header, error) {
return d.ethClient.HeaderByNumber(ctx, d.finalizedBlockType)
}

func (d *EVMDownloaderImplementation) WaitForNewBlocks(
ctx context.Context, lastBlockSeen uint64,
) (newLastBlock uint64) {
Expand Down Expand Up @@ -184,7 +241,7 @@ func (d *EVMDownloaderImplementation) WaitForNewBlocks(
}
}

func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock {
func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) EVMBlocks {
select {
case <-ctx.Done():
return nil
Expand Down
Loading
Loading