Skip to content

Commit 0dc2ec9

Browse files
authored
Merge pull request #5 from sei-protocol/tony-chen-fix-connections
Fix open connection race condition
2 parents 783df1e + 172da15 commit 0dc2ec9

16 files changed

+248
-291
lines changed

internal/blocksync/reactor.go

+11-21
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/tendermint/tendermint/internal/consensus"
1212
"github.com/tendermint/tendermint/internal/eventbus"
1313
"github.com/tendermint/tendermint/internal/p2p"
14-
"github.com/tendermint/tendermint/internal/p2p/conn"
1514
sm "github.com/tendermint/tendermint/internal/state"
1615
"github.com/tendermint/tendermint/internal/store"
1716
"github.com/tendermint/tendermint/libs/log"
@@ -81,8 +80,8 @@ type Reactor struct {
8180
consReactor consensusReactor
8281
blockSync *atomicBool
8382

84-
chCreator p2p.ChannelCreator
8583
peerEvents p2p.PeerEventSubscriber
84+
channel *p2p.Channel
8685

8786
requestsCh <-chan BlockRequest
8887
errorsCh <-chan peerError
@@ -100,7 +99,6 @@ func NewReactor(
10099
blockExec *sm.BlockExecutor,
101100
store *store.BlockStore,
102101
consReactor consensusReactor,
103-
channelCreator p2p.ChannelCreator,
104102
peerEvents p2p.PeerEventSubscriber,
105103
blockSync bool,
106104
metrics *consensus.Metrics,
@@ -113,7 +111,6 @@ func NewReactor(
113111
store: store,
114112
consReactor: consReactor,
115113
blockSync: newAtomicBool(blockSync),
116-
chCreator: channelCreator,
117114
peerEvents: peerEvents,
118115
metrics: metrics,
119116
eventBus: eventBus,
@@ -123,6 +120,10 @@ func NewReactor(
123120
return r
124121
}
125122

123+
func (r *Reactor) SetChannel(ch *p2p.Channel) {
124+
r.channel = ch
125+
}
126+
126127
// OnStart starts separate go routines for each p2p Channel and listens for
127128
// envelopes on each. In addition, it also listens for peer updates and handles
128129
// messages on that p2p channel accordingly. The caller must be sure to execute
@@ -131,12 +132,6 @@ func NewReactor(
131132
// If blockSync is enabled, we also start the pool and the pool processing
132133
// goroutine. If the pool fails to start, an error is returned.
133134
func (r *Reactor) OnStart(ctx context.Context) error {
134-
blockSyncCh, err := r.chCreator(ctx, GetChannelDescriptor())
135-
if err != nil {
136-
return err
137-
}
138-
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
139-
140135
state, err := r.stateStore.Load()
141136
if err != nil {
142137
return err
@@ -162,13 +157,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
162157
if err := r.pool.Start(ctx); err != nil {
163158
return err
164159
}
165-
go r.requestRoutine(ctx, blockSyncCh)
160+
go r.requestRoutine(ctx, r.channel)
166161

167-
go r.poolRoutine(ctx, false, blockSyncCh)
162+
go r.poolRoutine(ctx, false, r.channel)
168163
}
169164

170-
go r.processBlockSyncCh(ctx, blockSyncCh)
171-
go r.processPeerUpdates(ctx, r.peerEvents(ctx), blockSyncCh)
165+
go r.processBlockSyncCh(ctx, r.channel)
166+
go r.processPeerUpdates(ctx, r.peerEvents(ctx), r.channel)
172167

173168
return nil
174169
}
@@ -378,13 +373,8 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
378373

379374
r.syncStartTime = time.Now()
380375

381-
bsCh, err := r.chCreator(ctx, GetChannelDescriptor())
382-
if err != nil {
383-
return err
384-
}
385-
386-
go r.requestRoutine(ctx, bsCh)
387-
go r.poolRoutine(ctx, true, bsCh)
376+
go r.requestRoutine(ctx, r.channel)
377+
go r.poolRoutine(ctx, true, r.channel)
388378

389379
if err := r.PublishStatus(types.EventDataBlockSyncStatus{
390380
Complete: false,

internal/blocksync/reactor_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ func makeReactor(
149149
blockExec,
150150
blockStore,
151151
nil,
152-
channelCreator,
153152
peerEvents,
154153
true,
155154
consensus.NopMetrics(),

internal/consensus/reactor.go

+75-77
Original file line numberDiff line numberDiff line change
@@ -27,49 +27,54 @@ var (
2727
_ p2p.Wrapper = (*tmcons.Message)(nil)
2828
)
2929

30-
// GetChannelDescriptor produces an instance of a descriptor for this
31-
// package's required channels.
32-
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
33-
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
34-
StateChannel: {
35-
ID: StateChannel,
36-
MessageType: new(tmcons.Message),
37-
Priority: 8,
38-
SendQueueCapacity: 64,
39-
RecvMessageCapacity: maxMsgSize,
40-
RecvBufferCapacity: 128,
41-
Name: "state",
42-
},
43-
DataChannel: {
44-
// TODO: Consider a split between gossiping current block and catchup
45-
// stuff. Once we gossip the whole block there is nothing left to send
46-
// until next height or round.
47-
ID: DataChannel,
48-
MessageType: new(tmcons.Message),
49-
Priority: 12,
50-
SendQueueCapacity: 64,
51-
RecvBufferCapacity: 512,
52-
RecvMessageCapacity: maxMsgSize,
53-
Name: "data",
54-
},
55-
VoteChannel: {
56-
ID: VoteChannel,
57-
MessageType: new(tmcons.Message),
58-
Priority: 10,
59-
SendQueueCapacity: 64,
60-
RecvBufferCapacity: 128,
61-
RecvMessageCapacity: maxMsgSize,
62-
Name: "vote",
63-
},
64-
VoteSetBitsChannel: {
65-
ID: VoteSetBitsChannel,
66-
MessageType: new(tmcons.Message),
67-
Priority: 5,
68-
SendQueueCapacity: 8,
69-
RecvBufferCapacity: 128,
70-
RecvMessageCapacity: maxMsgSize,
71-
Name: "voteSet",
72-
},
30+
func GetStateChannelDescriptor() *p2p.ChannelDescriptor {
31+
return &p2p.ChannelDescriptor{
32+
ID: StateChannel,
33+
MessageType: new(tmcons.Message),
34+
Priority: 8,
35+
SendQueueCapacity: 64,
36+
RecvMessageCapacity: maxMsgSize,
37+
RecvBufferCapacity: 128,
38+
Name: "state",
39+
}
40+
}
41+
42+
func GetDataChannelDescriptor() *p2p.ChannelDescriptor {
43+
return &p2p.ChannelDescriptor{
44+
// TODO: Consider a split between gossiping current block and catchup
45+
// stuff. Once we gossip the whole block there is nothing left to send
46+
// until next height or round.
47+
ID: DataChannel,
48+
MessageType: new(tmcons.Message),
49+
Priority: 12,
50+
SendQueueCapacity: 64,
51+
RecvBufferCapacity: 512,
52+
RecvMessageCapacity: maxMsgSize,
53+
Name: "data",
54+
}
55+
}
56+
57+
func GetVoteChannelDescriptor() *p2p.ChannelDescriptor {
58+
return &p2p.ChannelDescriptor{
59+
ID: VoteChannel,
60+
MessageType: new(tmcons.Message),
61+
Priority: 10,
62+
SendQueueCapacity: 64,
63+
RecvBufferCapacity: 128,
64+
RecvMessageCapacity: maxMsgSize,
65+
Name: "vote",
66+
}
67+
}
68+
69+
func GetVoteSetChannelDescriptor() *p2p.ChannelDescriptor {
70+
return &p2p.ChannelDescriptor{
71+
ID: VoteSetBitsChannel,
72+
MessageType: new(tmcons.Message),
73+
Priority: 5,
74+
SendQueueCapacity: 8,
75+
RecvBufferCapacity: 128,
76+
RecvMessageCapacity: maxMsgSize,
77+
Name: "voteSet",
7378
}
7479
}
7580

@@ -103,8 +108,9 @@ type BlockSyncReactor interface {
103108
GetRemainingSyncTime() time.Duration
104109
}
105110

106-
//go:generate ../../scripts/mockery_generate.sh ConsSyncReactor
107111
// ConsSyncReactor defines an interface used for testing abilities of node.startStateSync.
112+
//
113+
//go:generate ../../scripts/mockery_generate.sh ConsSyncReactor
108114
type ConsSyncReactor interface {
109115
SwitchToConsensus(sm.State, bool)
110116
SetStateSyncingMetrics(float64)
@@ -127,7 +133,8 @@ type Reactor struct {
127133
readySignal chan struct{} // closed when the node is ready to start consensus
128134

129135
peerEvents p2p.PeerEventSubscriber
130-
chCreator p2p.ChannelCreator
136+
137+
channels *channelBundle
131138
}
132139

133140
// NewReactor returns a reference to a new consensus reactor, which implements
@@ -137,7 +144,6 @@ type Reactor struct {
137144
func NewReactor(
138145
logger log.Logger,
139146
cs *State,
140-
channelCreator p2p.ChannelCreator,
141147
peerEvents p2p.PeerEventSubscriber,
142148
eventBus *eventbus.EventBus,
143149
waitSync bool,
@@ -152,8 +158,8 @@ func NewReactor(
152158
eventBus: eventBus,
153159
Metrics: metrics,
154160
peerEvents: peerEvents,
155-
chCreator: channelCreator,
156161
readySignal: make(chan struct{}),
162+
channels: &channelBundle{},
157163
}
158164
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
159165

@@ -171,6 +177,22 @@ type channelBundle struct {
171177
votSet *p2p.Channel
172178
}
173179

180+
func (r *Reactor) SetStateChannel(ch *p2p.Channel) {
181+
r.channels.state = ch
182+
}
183+
184+
func (r *Reactor) SetDataChannel(ch *p2p.Channel) {
185+
r.channels.data = ch
186+
}
187+
188+
func (r *Reactor) SetVoteChannel(ch *p2p.Channel) {
189+
r.channels.vote = ch
190+
}
191+
192+
func (r *Reactor) SetVoteSetChannel(ch *p2p.Channel) {
193+
r.channels.votSet = ch
194+
}
195+
174196
// OnStart starts separate go routines for each p2p Channel and listens for
175197
// envelopes on each. In addition, it also listens for peer updates and handles
176198
// messages on that p2p channel accordingly. The caller must be sure to execute
@@ -180,37 +202,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
180202

181203
peerUpdates := r.peerEvents(ctx)
182204

183-
var chBundle channelBundle
184-
var err error
185-
186-
chans := getChannelDescriptors()
187-
chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
188-
if err != nil {
189-
return err
190-
}
191-
192-
chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
193-
if err != nil {
194-
return err
195-
}
196-
197-
chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
198-
if err != nil {
199-
return err
200-
}
201-
202-
chBundle.votSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
203-
if err != nil {
204-
return err
205-
}
206-
207205
// start routine that computes peer statistics for evaluating peer quality
208206
//
209207
// TODO: Evaluate if we need this to be synchronized via WaitGroup as to not
210208
// leak the goroutine when stopping the reactor.
211209
go r.peerStatsRoutine(ctx, peerUpdates)
212210

213-
r.subscribeToBroadcastEvents(ctx, chBundle.state)
211+
r.subscribeToBroadcastEvents(ctx, r.channels.state)
214212

215213
if !r.WaitSync() {
216214
if err := r.state.Start(ctx); err != nil {
@@ -222,11 +220,11 @@ func (r *Reactor) OnStart(ctx context.Context) error {
222220

223221
go r.updateRoundStateRoutine(ctx)
224222

225-
go r.processStateCh(ctx, chBundle)
226-
go r.processDataCh(ctx, chBundle)
227-
go r.processVoteCh(ctx, chBundle)
228-
go r.processVoteSetBitsCh(ctx, chBundle)
229-
go r.processPeerUpdates(ctx, peerUpdates, chBundle)
223+
go r.processStateCh(ctx, *r.channels)
224+
go r.processDataCh(ctx, *r.channels)
225+
go r.processVoteCh(ctx, *r.channels)
226+
go r.processVoteSetBitsCh(ctx, *r.channels)
227+
go r.processPeerUpdates(ctx, peerUpdates, *r.channels)
230228

231229
return nil
232230
}

internal/consensus/reactor_test.go

-19
Original file line numberDiff line numberDiff line change
@@ -85,31 +85,13 @@ func setup(
8585
ctx, cancel := context.WithCancel(ctx)
8686
t.Cleanup(cancel)
8787

88-
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
89-
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
90-
switch desc.ID {
91-
case StateChannel:
92-
return rts.stateChannels[nodeID], nil
93-
case DataChannel:
94-
return rts.dataChannels[nodeID], nil
95-
case VoteChannel:
96-
return rts.voteChannels[nodeID], nil
97-
case VoteSetBitsChannel:
98-
return rts.voteSetBitsChannels[nodeID], nil
99-
default:
100-
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
101-
}
102-
}
103-
}
104-
10588
i := 0
10689
for nodeID, node := range rts.network.Nodes {
10790
state := states[i]
10891

10992
reactor := NewReactor(
11093
state.logger.With("node", nodeID),
11194
state,
112-
chCreator(nodeID),
11395
func(ctx context.Context) *p2p.PeerUpdates { return node.MakePeerUpdates(ctx, t) },
11496
state.eventBus,
11597
true,
@@ -696,7 +678,6 @@ func TestSwitchToConsensusVoteExtensions(t *testing.T) {
696678
log.NewNopLogger(),
697679
cs,
698680
nil,
699-
nil,
700681
cs.eventBus,
701682
true,
702683
NopMetrics(),

0 commit comments

Comments
 (0)