Skip to content

Commit 17e2e67

Browse files
committed
handle clickhouse errors during inserts
1 parent 13fa6ae commit 17e2e67

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

internal/storage/clickhouse.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
788788
return maxBlockNumber, nil
789789
}
790790

791+
func (c *ClickHouseConnector) getMaxBlockNumberConsistent(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
792+
tableName := c.getTableName(chainId, "blocks")
793+
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1", c.cfg.Database, tableName)
794+
err = c.conn.QueryRow(context.Background(), query, chainId).Scan(&maxBlockNumber)
795+
if err != nil {
796+
if err == sql.ErrNoRows {
797+
return big.NewInt(0), nil
798+
}
799+
return nil, err
800+
}
801+
return maxBlockNumber, nil
802+
}
803+
791804
func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
792805
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
793806
if chainId.Sign() > 0 {
@@ -1136,7 +1149,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
11361149
return nil
11371150
}
11381151

1139-
tableName := c.getTableName(data[0].Block.ChainId, "inserts_null_table")
1152+
chainId := data[0].Block.ChainId
1153+
tableName := c.getTableName(chainId, "inserts_null_table")
11401154
columns := []string{
11411155
"chain_id", "block", "transactions", "logs", "traces", "sign", "insert_timestamp",
11421156
}
@@ -1286,7 +1300,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
12861300
}
12871301

12881302
if err := batch.Send(); err != nil {
1289-
return err
1303+
// if insert errors, it can actually still succeed in the background
1304+
// so we need to check if the consistent highest block matches the batch before we return an error
1305+
var highestBlockInBatch *big.Int
1306+
for _, blockData := range data[i:end] {
1307+
if highestBlockInBatch == nil || blockData.Block.Number.Cmp(highestBlockInBatch) > 0 {
1308+
highestBlockInBatch = blockData.Block.Number
1309+
}
1310+
}
1311+
1312+
time.Sleep(500 * time.Millisecond)
1313+
1314+
// Check if this matches the max consistent block
1315+
maxConsistentBlock, maxBlockErr := c.getMaxBlockNumberConsistent(chainId)
1316+
if maxBlockErr != nil || maxConsistentBlock.Cmp(highestBlockInBatch) != 0 {
1317+
if maxBlockErr != nil {
1318+
zLog.Error().Err(maxBlockErr).Msgf("Error getting consistent max block number for chain %s", chainId.String())
1319+
}
1320+
return err
1321+
} else {
1322+
zLog.Info().Err(err).Msgf("Failure while inserting block data, but insert still succeeded")
1323+
}
12901324
}
12911325
}
12921326

0 commit comments

Comments
 (0)