diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index c92533f74..2b90826ee 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -69,13 +69,13 @@ type Aggregator struct { l1Syncr synchronizer.Synchronizer halted atomic.Bool - streamClientMutex sync.Mutex + streamClientMutex *sync.Mutex profitabilityChecker aggregatorTxProfitabilityChecker timeSendFinalProof time.Time timeCleanupLockedProofs types.Duration - stateDBMutex sync.Mutex - timeSendFinalProofMutex sync.RWMutex + stateDBMutex *sync.Mutex + timeSendFinalProofMutex *sync.RWMutex // Data stream handling variables currentBatchStreamData []byte @@ -180,8 +180,11 @@ func New( etherman: etherman, ethTxManager: ethTxManager, streamClient: streamClient, + streamClientMutex: &sync.Mutex{}, l1Syncr: l1Syncr, profitabilityChecker: profitabilityChecker, + stateDBMutex: &sync.Mutex{}, + timeSendFinalProofMutex: &sync.RWMutex{}, timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval, finalProof: make(chan finalProofMsg), currentBatchStreamData: []byte{}, @@ -204,6 +207,14 @@ func New( return a, nil } +func (a *Aggregator) resetCurrentBatchData() { + a.currentBatchStreamData = []byte{} + a.currentStreamBatchRaw = state.BatchRawV2{ + Blocks: make([]state.L2BlockRaw, 0), + } + a.currentStreamL2Block = state.L2BlockRaw{} +} + func (a *Aggregator) retrieveWitness() { var success bool for { @@ -275,7 +286,7 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat // Stop Reading the data stream err = a.streamClient.ExecCommandStop() if err != nil { - log.Errorf("failed to stop the data stream: %v.", err) + log.Errorf("failed to stop data stream: %v.", err) } else { log.Info("Data stream client stopped") } @@ -338,6 +349,7 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat if err == nil { // Reset current batch data previously read from the data stream a.resetCurrentBatchData() + a.currentStreamBatch = state.Batch{} log.Info("Current batch data reset") var marshalledBookMark []byte @@ -602,15 +614,6 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli return nil } -func (a *Aggregator) resetCurrentBatchData() { - a.currentBatchStreamData = []byte{} - a.currentStreamBatch = state.Batch{} - a.currentStreamBatchRaw = state.BatchRawV2{ - Blocks: make([]state.L2BlockRaw, 0), - } - a.currentStreamL2Block = state.L2BlockRaw{} -} - // Start starts the aggregator func (a *Aggregator) Start() error { // Initial L1 Sync blocking