Skip to content

Commit 400fe74

Browse files
stevenlandersdsseisigvyzang2019
authored
[seiv2] Merge main (#217)
* reformat logs to use simple concatenation with separators (#207) * Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209) ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim > Transactions returned are not removed from the mempool transaction > store or indexes. However, they use a priority queue to accomplish the claim > Transaction are retrieved in priority order. This is accomplished by popping all items out of the whole heap, and then pushing then back in sequentially. A copy of the heap cannot be obtained otherwise. Both of the mentioned functions use a read-lock (RLock) when doing this. This results in a potential scenario where multiple executions of the ReapMax can be started in parallel, and both would be popping items out of the priority queue. In practice, this can be abused by executing the `unconfirmed_txs` RPC call repeatedly. Based on our observations, running it multiple times per millisecond results in multiple threads picking it up at the same time. Such a scenario can be obtained via the WebSocket interface, and spamming `unconfirmed_txs` calls there. The behavior that happens is a `Panic in WSJSONRPC handler` when a queue item unexpectedly disappears for `mempool.(*TxPriorityQueue).Swap`. (`runtime error: index out of range [0] with length 0`) This can additionally lead to a `CONSENSUS FAILURE!!!` if the race condition occurs for `internal/consensus.(*State).finalizeCommit` when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but the ReapMax has already removed all elements from the underlying heap. (`runtime error: index out of range [-1]`) This commit switches the lock type to a write-lock (Lock) to ensure no parallel modifications take place. This commit additionally updates the tests to allow parallel execution of the func calls in testing, as to prevent regressions (in case someone wants to downgrade the locks without considering the implications from the underlying heap usage). * Fix root dir for tendermint reindex command (#210) * Replay events during restart to avoid tx missing (#211) --------- Co-authored-by: Denys S <150304777+dssei@users.noreply.github.com> Co-authored-by: Valters Jansons <sigv@users.noreply.github.com> Co-authored-by: Yiming Zang <50607998+yzang2019@users.noreply.github.com>
1 parent fbc1bc0 commit 400fe74

File tree

6 files changed

+44
-29
lines changed

6 files changed

+44
-29
lines changed

cmd/tendermint/commands/reindex_event.go

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/tendermint/tendermint/internal/state/indexer/sink/kv"
1818
"github.com/tendermint/tendermint/internal/state/indexer/sink/psql"
1919
"github.com/tendermint/tendermint/internal/store"
20+
"github.com/tendermint/tendermint/libs/cli"
2021
"github.com/tendermint/tendermint/libs/log"
2122
"github.com/tendermint/tendermint/libs/os"
2223
"github.com/tendermint/tendermint/rpc/coretypes"
@@ -52,6 +53,8 @@ either or both arguments.
5253
tendermint reindex-event --start-height 2 --end-height 10
5354
`,
5455
RunE: func(cmd *cobra.Command, args []string) error {
56+
home, err := cmd.Flags().GetString(cli.HomeFlag)
57+
conf.RootDir = home
5558
bs, ss, err := loadStateAndBlockStore(conf)
5659
if err != nil {
5760
return fmt.Errorf("%s: %w", reindexFailed, err)

config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func DefaultBaseConfig() BaseConfig {
250250
FilterPeers: false,
251251
DBBackend: "goleveldb",
252252
DBPath: "data",
253+
RootDir: "/root/.sei",
253254
}
254255
}
255256

internal/consensus/replay.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,11 @@ func (h *Handshaker) ReplayBlocks(
395395
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)
396396

397397
} else if appBlockHeight == storeBlockHeight {
398-
// We're good!
398+
// We're good! But we need to reindex events
399+
err := h.replayEvents(appBlockHeight)
400+
if err != nil {
401+
return nil, err
402+
}
399403
if err := checkAppHashEqualsOneFromState(appHash, state); err != nil {
400404
return nil, err
401405
}
@@ -550,6 +554,22 @@ func (h *Handshaker) replayBlock(
550554
return state, nil
551555
}
552556

557+
// replayEvents will be called during restart to avoid tx missing to be indexed
558+
func (h *Handshaker) replayEvents(height int64) error {
559+
block := h.store.LoadBlock(height)
560+
meta := h.store.LoadBlockMeta(height)
561+
res, err := h.stateStore.LoadFinalizeBlockResponses(height)
562+
if err != nil {
563+
return err
564+
}
565+
validatorUpdates, err := types.PB2TM.ValidatorUpdates(res.ValidatorUpdates)
566+
if err != nil {
567+
return err
568+
}
569+
sm.FireEvents(h.logger, h.eventBus, block, meta.BlockID, res, validatorUpdates)
570+
return nil
571+
}
572+
553573
func checkAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) error {
554574
if !bytes.Equal(appHash, block.AppHash) {
555575
return fmt.Errorf(`block.AppHash does not match AppHash after replay. Got '%X', expected '%X'.

internal/mempool/mempool.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package mempool
33
import (
44
"bytes"
55
"context"
6-
"encoding/json"
76
"errors"
87
"fmt"
8+
"strings"
99
"sync"
1010
"sync/atomic"
1111
"time"
@@ -1036,17 +1036,16 @@ func (txmp *TxMempool) GetPeerFailedCheckTxCount(nodeID types.NodeID) uint64 {
10361036

10371037
// AppendCheckTxErr wraps error message into an ABCIMessageLogs json string
10381038
func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string {
1039-
var logs []map[string]interface{}
1040-
json.Unmarshal([]byte(existingLogs), &logs)
1039+
var builder strings.Builder
10411040

1042-
// Append the new ABCIMessageLog to the slice
1043-
logs = append(logs, map[string]interface{}{
1044-
"log": log,
1045-
})
1041+
builder.WriteString(existingLogs)
1042+
// If there are already logs, append the new log with a separator
1043+
if builder.Len() > 0 {
1044+
builder.WriteString("; ")
1045+
}
1046+
builder.WriteString(log)
10461047

1047-
// Marshal the updated slice back into a JSON string
1048-
jsonData, _ := json.Marshal(logs)
1049-
return string(jsonData)
1048+
return builder.String()
10501049
}
10511050

10521051
func (txmp *TxMempool) handlePendingTransactions() {

internal/mempool/mempool_test.go

+7-15
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package mempool
33
import (
44
"bytes"
55
"context"
6-
"encoding/json"
76
"errors"
87
"fmt"
98
"math/rand"
@@ -947,24 +946,17 @@ func TestAppendCheckTxErr(t *testing.T) {
947946
}
948947
t.Cleanup(client.Wait)
949948
txmp := setup(t, client, 500)
950-
existingData := `[{"log":"existing error log"}]`
949+
existingLogData := "existing error log"
950+
newLogData := "sample error log"
951951

952952
// Append new error
953-
result := txmp.AppendCheckTxErr(existingData, "sample error msg")
953+
actualResult := txmp.AppendCheckTxErr(existingLogData, newLogData)
954+
expectedResult := fmt.Sprintf("%s; %s", existingLogData, newLogData)
954955

955-
// Unmarshal the result
956-
var data []map[string]interface{}
957-
err := json.Unmarshal([]byte(result), &data)
958-
require.NoError(t, err)
959-
require.Equal(t, len(data), 2)
960-
require.Equal(t, data[1]["log"], "sample error msg")
956+
require.Equal(t, expectedResult, actualResult)
961957

962958
// Append new error to empty log
963-
result = txmp.AppendCheckTxErr("", "sample error msg")
959+
actualResult = txmp.AppendCheckTxErr("", newLogData)
964960

965-
// Unmarshal the result
966-
err = json.Unmarshal([]byte(result), &data)
967-
require.NoError(t, err)
968-
require.Equal(t, len(data), 1)
969-
require.Equal(t, data[0]["log"], "sample error msg")
961+
require.Equal(t, newLogData, actualResult)
970962
}

internal/state/execution.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
360360

361361
// Events are fired after everything else.
362362
// NOTE: if we crash between Commit and Save, events wont be fired during replay
363-
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
363+
FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
364364

365365
return state, nil
366366
}
@@ -687,7 +687,7 @@ func (state State) Update(
687687
// Fire NewBlock, NewBlockHeader.
688688
// Fire TxEvent for every tx.
689689
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
690-
func fireEvents(
690+
func FireEvents(
691691
logger log.Logger,
692692
eventBus types.BlockEventPublisher,
693693
block *types.Block,
@@ -811,7 +811,7 @@ func ExecCommitBlock(
811811
}
812812

813813
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
814-
fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
814+
FireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
815815
}
816816

817817
// Commit block

0 commit comments

Comments
 (0)