Skip to content

Commit 10254e6

Browse files
authored
return block data when deleting it (#175)
### TL;DR Enhanced `DeleteBlockData` to return deleted block data, enabling better handling of reorgs. ### What changed? Modified the `DeleteBlockData` method in the storage interface to return the deleted block data: - Updated the method signature in `IMainStorage` interface to return `([]common.BlockData, error)` - Implemented the new functionality in `ClickHouseConnector` to collect and return deleted data - Modified the individual delete methods (`deleteBlocks`, `deleteLogs`, etc.) to return the deleted entities - Updated the `MemoryConnector` implementation to match the new interface - Fixed the mock implementation to support the new signature - Updated the caller in `ReorgHandler` to handle the new return value ### How to test? 1. Run unit tests to verify the updated interface implementations 2. Test a reorg scenario to ensure deleted block data is properly returned 3. Verify that the reorg handler correctly processes the returned data ### Why make this change? This change improves the reorg handling process by providing access to the deleted block data, which can be useful for: - Auditing what was removed during a reorg - Potentially using the deleted data for other purposes - Making the reorg process more transparent and traceable - Setting the foundation for making delete and insert operations atomic
2 parents 8c78ece + b6cb3db commit 10254e6

File tree

6 files changed

+97
-39
lines changed

6 files changed

+97
-39
lines changed

internal/orchestrator/reorg_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
278278
blocksToDelete = append(blocksToDelete, result.BlockNumber)
279279
}
280280
// TODO make delete and insert atomic
281-
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
281+
if _, err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
282282
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
283283
}
284284
if err := rh.storage.MainStorage.InsertBlockData(data); err != nil {

internal/orchestrator/reorg_handler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ func TestHandleReorg(t *testing.T) {
504504
})
505505
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil)
506506

507-
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.Anything).Return(nil)
507+
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.Anything).Return([]common.BlockData{}, nil)
508508
mockMainStorage.EXPECT().InsertBlockData(mock.Anything).Return(nil)
509509

510510
handler := NewReorgHandler(mockRPC, mockStorage)
@@ -613,7 +613,7 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
613613

614614
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
615615
return len(blocks) == 1
616-
})).Return(nil)
616+
})).Return([]common.BlockData{}, nil)
617617
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
618618
return len(data) == 1
619619
})).Return(nil)
@@ -681,7 +681,7 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
681681

682682
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
683683
return len(blocks) == 8
684-
})).Return(nil)
684+
})).Return([]common.BlockData{}, nil)
685685
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
686686
return len(data) == 8
687687
})).Return(nil)
@@ -745,7 +745,7 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
745745

746746
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
747747
return len(blocks) == 5
748-
})).Return(nil)
748+
})).Return([]common.BlockData{}, nil)
749749
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
750750
return len(data) == 5
751751
})).Return(nil)

internal/storage/clickhouse.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -987,122 +987,168 @@ func (c *ClickHouseConnector) GetBlockHeadersDescending(chainId *big.Int, from *
987987
return blockHeaders, nil
988988
}
989989

990-
func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
990+
func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) {
991991
var deleteErr error
992992
var deleteErrMutex sync.Mutex
993993
var wg sync.WaitGroup
994994
wg.Add(4)
995995

996+
// Create a map to store block data that will be deleted
997+
deletedBlockDataByNumber := make(map[*big.Int]common.BlockData)
996998
go func() {
997999
defer wg.Done()
998-
if err := c.deleteBlocks(chainId, blockNumbers); err != nil {
1000+
deletedBlocks, err := c.deleteBlocks(chainId, blockNumbers)
1001+
if err != nil {
9991002
deleteErrMutex.Lock()
10001003
deleteErr = fmt.Errorf("error deleting blocks: %v", err)
10011004
deleteErrMutex.Unlock()
10021005
}
1006+
for _, block := range deletedBlocks {
1007+
data := deletedBlockDataByNumber[block.Number]
1008+
data.Block = block
1009+
deletedBlockDataByNumber[block.Number] = data
1010+
}
10031011
}()
10041012

10051013
go func() {
10061014
defer wg.Done()
1007-
if err := c.deleteLogs(chainId, blockNumbers); err != nil {
1015+
deletedLogs, err := c.deleteLogs(chainId, blockNumbers)
1016+
if err != nil {
10081017
deleteErrMutex.Lock()
10091018
deleteErr = fmt.Errorf("error deleting logs: %v", err)
10101019
deleteErrMutex.Unlock()
10111020
}
1021+
for _, log := range deletedLogs {
1022+
data := deletedBlockDataByNumber[log.BlockNumber]
1023+
data.Logs = append(data.Logs, log)
1024+
deletedBlockDataByNumber[log.BlockNumber] = data
1025+
}
10121026
}()
10131027

10141028
go func() {
10151029
defer wg.Done()
1016-
if err := c.deleteTransactions(chainId, blockNumbers); err != nil {
1030+
deletedTransactions, err := c.deleteTransactions(chainId, blockNumbers)
1031+
if err != nil {
10171032
deleteErrMutex.Lock()
10181033
deleteErr = fmt.Errorf("error deleting transactions: %v", err)
10191034
deleteErrMutex.Unlock()
10201035
}
1036+
for _, tx := range deletedTransactions {
1037+
data := deletedBlockDataByNumber[tx.BlockNumber]
1038+
data.Transactions = append(data.Transactions, tx)
1039+
deletedBlockDataByNumber[tx.BlockNumber] = data
1040+
}
10211041
}()
10221042

10231043
go func() {
10241044
defer wg.Done()
1025-
if err := c.deleteTraces(chainId, blockNumbers); err != nil {
1045+
deletedTraces, err := c.deleteTraces(chainId, blockNumbers)
1046+
if err != nil {
10261047
deleteErrMutex.Lock()
10271048
deleteErr = fmt.Errorf("error deleting traces: %v", err)
10281049
deleteErrMutex.Unlock()
10291050
}
1051+
for _, trace := range deletedTraces {
1052+
data := deletedBlockDataByNumber[trace.BlockNumber]
1053+
data.Traces = append(data.Traces, trace)
1054+
deletedBlockDataByNumber[trace.BlockNumber] = data
1055+
}
10301056
}()
10311057

10321058
wg.Wait()
10331059

10341060
if deleteErr != nil {
1035-
return deleteErr
1061+
return nil, deleteErr
10361062
}
1037-
return nil
1063+
deletedBlockData := make([]common.BlockData, 0, len(deletedBlockDataByNumber))
1064+
for _, data := range deletedBlockDataByNumber {
1065+
deletedBlockData = append(deletedBlockData, data)
1066+
}
1067+
return deletedBlockData, nil
10381068
}
10391069

1040-
func (c *ClickHouseConnector) deleteBlocks(chainId *big.Int, blockNumbers []*big.Int) error {
1070+
func (c *ClickHouseConnector) deleteBlocks(chainId *big.Int, blockNumbers []*big.Int) ([]common.Block, error) {
10411071
blocksQueryResult, err := c.GetBlocks(QueryFilter{
10421072
ChainId: chainId,
10431073
BlockNumbers: blockNumbers,
10441074
ForceConsistentData: true,
10451075
}, "*")
10461076
if err != nil {
1047-
return err
1077+
return nil, err
10481078
}
10491079
if len(blocksQueryResult.Data) == 0 {
1050-
return nil // No blocks to delete
1080+
return nil, nil // No blocks to delete
10511081
}
1052-
return c.insertBlocks(blocksQueryResult.Data, InsertOptions{
1082+
err = c.insertBlocks(blocksQueryResult.Data, InsertOptions{
10531083
AsDeleted: true,
10541084
})
1085+
if err != nil {
1086+
return nil, err
1087+
}
1088+
return blocksQueryResult.Data, nil
10551089
}
10561090

1057-
func (c *ClickHouseConnector) deleteLogs(chainId *big.Int, blockNumbers []*big.Int) error {
1091+
func (c *ClickHouseConnector) deleteLogs(chainId *big.Int, blockNumbers []*big.Int) ([]common.Log, error) {
10581092
logsQueryResult, err := c.GetLogs(QueryFilter{
10591093
ChainId: chainId,
10601094
BlockNumbers: blockNumbers,
10611095
ForceConsistentData: true,
10621096
}, "*")
10631097
if err != nil {
1064-
return err
1098+
return nil, err
10651099
}
10661100
if len(logsQueryResult.Data) == 0 {
1067-
return nil // No logs to delete
1101+
return nil, nil // No logs to delete
10681102
}
1069-
return c.insertLogs(logsQueryResult.Data, InsertOptions{
1103+
err = c.insertLogs(logsQueryResult.Data, InsertOptions{
10701104
AsDeleted: true,
10711105
})
1106+
if err != nil {
1107+
return nil, err
1108+
}
1109+
return logsQueryResult.Data, nil
10721110
}
10731111

1074-
func (c *ClickHouseConnector) deleteTransactions(chainId *big.Int, blockNumbers []*big.Int) error {
1112+
func (c *ClickHouseConnector) deleteTransactions(chainId *big.Int, blockNumbers []*big.Int) ([]common.Transaction, error) {
10751113
txsQueryResult, err := c.GetTransactions(QueryFilter{
10761114
ChainId: chainId,
10771115
BlockNumbers: blockNumbers,
10781116
ForceConsistentData: true,
10791117
}, "*")
10801118
if err != nil {
1081-
return err
1119+
return nil, err
10821120
}
10831121
if len(txsQueryResult.Data) == 0 {
1084-
return nil // No transactions to delete
1122+
return nil, nil // No transactions to delete
10851123
}
1086-
return c.insertTransactions(txsQueryResult.Data, InsertOptions{
1124+
err = c.insertTransactions(txsQueryResult.Data, InsertOptions{
10871125
AsDeleted: true,
10881126
})
1127+
if err != nil {
1128+
return nil, err
1129+
}
1130+
return txsQueryResult.Data, nil
10891131
}
10901132

1091-
func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big.Int) error {
1133+
func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big.Int) ([]common.Trace, error) {
10921134
tracesQueryResult, err := c.GetTraces(QueryFilter{
10931135
ChainId: chainId,
10941136
BlockNumbers: blockNumbers,
10951137
ForceConsistentData: true,
10961138
}, "*")
10971139
if err != nil {
1098-
return err
1140+
return nil, err
10991141
}
11001142
if len(tracesQueryResult.Data) == 0 {
1101-
return nil // No traces to delete
1143+
return nil, nil // No traces to delete
11021144
}
1103-
return c.insertTraces(tracesQueryResult.Data, InsertOptions{
1145+
err = c.insertTraces(tracesQueryResult.Data, InsertOptions{
11041146
AsDeleted: true,
11051147
})
1148+
if err != nil {
1149+
return nil, err
1150+
}
1151+
return tracesQueryResult.Data, nil
11061152
}
11071153

11081154
// TODO make this atomic

internal/storage/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type IMainStorage interface {
9797
* Get block headers ordered from latest to oldest.
9898
*/
9999
GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error)
100-
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
100+
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error)
101101

102102
GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
103103
GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)

internal/storage/memory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error {
435435
return nil
436436
}
437437

438-
func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
438+
func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) {
439439
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
440440
for _, key := range m.cache.Keys() {
441441
prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())}
@@ -444,7 +444,7 @@ func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.
444444
m.cache.Remove(key)
445445
}
446446
}
447-
return nil
447+
return []common.BlockData{}, nil // TODO implement
448448
}
449449

450450
func (m *MemoryConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error) {

test/mocks/MockIMainStorage.go

Lines changed: 20 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)