Skip to content

Commit

Permalink
feat: resetCurrentBatchData
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Aug 30, 2024
1 parent 79607c2 commit 512da7c
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ type Aggregator struct {
l1Syncr synchronizer.Synchronizer
halted atomic.Bool

streamClientMutex sync.Mutex
streamClientMutex *sync.Mutex

profitabilityChecker aggregatorTxProfitabilityChecker
timeSendFinalProof time.Time
timeCleanupLockedProofs types.Duration
stateDBMutex sync.Mutex
timeSendFinalProofMutex sync.RWMutex
stateDBMutex *sync.Mutex
timeSendFinalProofMutex *sync.RWMutex

// Data stream handling variables
currentBatchStreamData []byte
Expand Down Expand Up @@ -180,8 +180,11 @@ func New(
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
streamClientMutex: &sync.Mutex{},
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
Expand All @@ -204,6 +207,14 @@ func New(
return a, nil
}

func (a *Aggregator) resetCurrentBatchData() {
a.currentBatchStreamData = []byte{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
}

func (a *Aggregator) retrieveWitness() {
var success bool
for {
Expand Down Expand Up @@ -275,7 +286,7 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
// Stop Reading the data stream
err = a.streamClient.ExecCommandStop()
if err != nil {
log.Errorf("failed to stop the data stream: %v.", err)
log.Errorf("failed to stop data stream: %v.", err)
} else {
log.Info("Data stream client stopped")
}
Expand Down Expand Up @@ -338,6 +349,7 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
if err == nil {
// Reset current batch data previously read from the data stream
a.resetCurrentBatchData()
a.currentStreamBatch = state.Batch{}
log.Info("Current batch data reset")

var marshalledBookMark []byte
Expand Down Expand Up @@ -602,15 +614,6 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
return nil
}

func (a *Aggregator) resetCurrentBatchData() {
a.currentBatchStreamData = []byte{}
a.currentStreamBatch = state.Batch{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
}

// Start starts the aggregator
func (a *Aggregator) Start() error {
// Initial L1 Sync blocking
Expand Down

0 comments on commit 512da7c

Please sign in to comment.