Skip to content

Commit 90e34dd

Browse files
authored
handle false positive clickhouse errors during inserts (#195)
### TL;DR Added error handling for block data insertion that checks if data was actually inserted despite reported errors. ### What changed? - Added a new `getMaxBlockNumberConsistent` method that retrieves the maximum block number with sequential consistency guarantees - Enhanced the `InsertBlockData` error handling to verify if data was actually inserted despite reported errors - When an error occurs during batch insertion, the code now: 1. Determines the highest block number in the batch 2. Waits briefly (500ms) 3. Checks if the highest block from the batch matches the max consistent block in the database 4. Only returns an error if the verification fails, otherwise logs the error but continues execution ### How to test? 1. Trigger a scenario where ClickHouse reports an error during batch insertion but actually inserts the data 2. Verify that the process continues execution rather than failing 3. Check logs for the message "Failure while inserting block data, but insert still succeeded" ### Why make this change? ClickHouse can sometimes report errors during batch insertions even when the data was successfully inserted in the background. This change prevents unnecessary failures by verifying the actual state of the database before deciding whether to propagate the error, improving the reliability of the indexer.
2 parents 13fa6ae + 17e2e67 commit 90e34dd

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

internal/storage/clickhouse.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
788788
return maxBlockNumber, nil
789789
}
790790

791+
func (c *ClickHouseConnector) getMaxBlockNumberConsistent(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
792+
tableName := c.getTableName(chainId, "blocks")
793+
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1", c.cfg.Database, tableName)
794+
err = c.conn.QueryRow(context.Background(), query, chainId).Scan(&maxBlockNumber)
795+
if err != nil {
796+
if err == sql.ErrNoRows {
797+
return big.NewInt(0), nil
798+
}
799+
return nil, err
800+
}
801+
return maxBlockNumber, nil
802+
}
803+
791804
func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
792805
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
793806
if chainId.Sign() > 0 {
@@ -1136,7 +1149,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
11361149
return nil
11371150
}
11381151

1139-
tableName := c.getTableName(data[0].Block.ChainId, "inserts_null_table")
1152+
chainId := data[0].Block.ChainId
1153+
tableName := c.getTableName(chainId, "inserts_null_table")
11401154
columns := []string{
11411155
"chain_id", "block", "transactions", "logs", "traces", "sign", "insert_timestamp",
11421156
}
@@ -1286,7 +1300,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
12861300
}
12871301

12881302
if err := batch.Send(); err != nil {
1289-
return err
1303+
// if insert errors, it can actually still succeed in the background
1304+
// so we need to check if the consistent highest block matches the batch before we return an error
1305+
var highestBlockInBatch *big.Int
1306+
for _, blockData := range data[i:end] {
1307+
if highestBlockInBatch == nil || blockData.Block.Number.Cmp(highestBlockInBatch) > 0 {
1308+
highestBlockInBatch = blockData.Block.Number
1309+
}
1310+
}
1311+
1312+
time.Sleep(500 * time.Millisecond)
1313+
1314+
// Check if this matches the max consistent block
1315+
maxConsistentBlock, maxBlockErr := c.getMaxBlockNumberConsistent(chainId)
1316+
if maxBlockErr != nil || maxConsistentBlock.Cmp(highestBlockInBatch) != 0 {
1317+
if maxBlockErr != nil {
1318+
zLog.Error().Err(maxBlockErr).Msgf("Error getting consistent max block number for chain %s", chainId.String())
1319+
}
1320+
return err
1321+
} else {
1322+
zLog.Info().Err(err).Msgf("Failure while inserting block data, but insert still succeeded")
1323+
}
12901324
}
12911325
}
12921326

0 commit comments

Comments
 (0)