diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 398333bed..7e23f91d9 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -256,7 +256,7 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { // Halt the aggregator a.halted.Store(true) for { - log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.") + log.Errorf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.") time.Sleep(10 * time.Second) // nolint:gomnd } } @@ -380,9 +380,11 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat if err == nil { log.Info("Handling rollback batches event finished successfully") } else { + // Halt the aggregator + a.halted.Store(true) for { - log.Errorf("Error handling rollback batches event: %v", err) - time.Sleep(a.cfg.RetryTime.Duration) + log.Errorf("Halting the aggregator due to an error handling rollback batches event: %v", err) + time.Sleep(10 * time.Second) // nolint:gomnd } } } @@ -687,6 +689,9 @@ func (a *Aggregator) Start() error { } // Start stream client + a.streamClientMutex.Lock() + defer a.streamClientMutex.Unlock() + err = a.streamClient.Start() if err != nil { log.Fatalf("failed to start stream client, error: %v", err)