Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new config to speed up block sync #244

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ type P2PConfig struct { //nolint: maligned
// Comma separated list of nodes to keep persistent connections to
PersistentPeers string `mapstructure:"persistent-peers"`

// Comma separated list of nodes for block sync only
BlockSyncPeers string `mapstructure:"blocksync-peers"`

// UPNP port forwarding
UPNP bool `mapstructure:"upnp"`

Expand Down Expand Up @@ -712,7 +715,7 @@ func DefaultP2PConfig() *P2PConfig {
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
HandshakeTimeout: 5 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "simple-priority",
Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ bootstrap-peers = "{{ .P2P.BootstrapPeers }}"
# Comma separated list of nodes to keep persistent connections to
persistent-peers = "{{ .P2P.PersistentPeers }}"

# Comma separated list of nodes for block sync only
blocksync-peers = "{{ .P2P.BlockSyncPeers }}"

# UPNP port forwarding
upnp = {{ .P2P.UPNP }}

Expand Down
16 changes: 11 additions & 5 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
*/

const (
requestInterval = 10 * time.Millisecond
inactiveSleepInterval = 1 * time.Second
maxTotalRequesters = 600
requestInterval = 100 * time.Millisecond
maxTotalRequesters = 50
maxPeerErrBuffer = 1000
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
Expand All @@ -54,7 +53,7 @@
BadBlock RetryReason = "BadBlock"
)

var peerTimeout = 10 * time.Second // not const so we can override with tests
var peerTimeout = 2 * time.Second // not const so we can override with tests

/*
Peers self report their heights when we join the block pool.
Expand Down Expand Up @@ -356,6 +355,12 @@
pool.mtx.Lock()
defer pool.mtx.Unlock()

blockSyncPeers := pool.peerManager.GetBlockSyncPeers()
if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] {
pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers)))
return
}

Check warning on line 362 in internal/blocksync/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/pool.go#L360-L362

Added lines #L360 - L362 were not covered by tests

peer := pool.peers[peerID]
if peer != nil {
peer.base = base
Expand All @@ -370,7 +375,7 @@
logger: pool.logger.With("peer", peerID),
startAt: time.Now(),
}

pool.logger.Info(fmt.Sprintf("Adding peer %s to blocksync pool", peerID))
pool.peers[peerID] = peer
}

Expand Down Expand Up @@ -431,6 +436,7 @@
for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}

// Sort from high to low score
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
Expand Down
6 changes: 3 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,11 @@
continue

case r.pool.IsCaughtUp() && r.previousMaxPeerHeight <= r.pool.MaxPeerHeight():
r.logger.Info("switching to consensus reactor", "height", height)
r.logger.Info("switching to consensus reactor after caught up", "height", height)

Check warning on line 563 in internal/blocksync/reactor.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/reactor.go#L563

Added line #L563 was not covered by tests

case time.Since(lastAdvance) > syncTimeout:
r.logger.Error("no progress since last advance", "last_advance", lastAdvance)
continue

Check warning on line 567 in internal/blocksync/reactor.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/reactor.go#L567

Added line #L567 was not covered by tests

default:
r.logger.Info(
Expand Down Expand Up @@ -611,8 +612,7 @@
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
} else if first == nil || second == nil {
// we need to have fetched two consecutive blocks in order to
// perform blocksync verification
// we need to have fetched two consecutive blocks in order to perform blocksync verification
continue
}

Expand Down
31 changes: 29 additions & 2 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
const (
// retryNever is returned by retryDelay() when retries are disabled.
retryNever time.Duration = math.MaxInt64
// DefaultScore is the default score for a peer during initialization
// DefaultMutableScore is the default score for a peer during initialization
DefaultMutableScore int64 = 10
)

Expand Down Expand Up @@ -101,6 +101,9 @@
// Peers to which a connection will be (re)established ignoring any existing limits
UnconditionalPeers []types.NodeID

// Only include those peers for block sync
BlockSyncPeers []types.NodeID

// MaxPeers is the maximum number of peers to track information about, i.e.
// store in the peer store. When exceeded, the lowest-scored unconnected peers
// will be deleted. 0 means no limit.
Expand Down Expand Up @@ -157,6 +160,9 @@

// List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditionalPeers map[types.NodeID]struct{}

// blocksyncPeers provides fast blocksyncPeers lookups.
blocksyncPeers map[types.NodeID]bool
}

// Validate validates the options.
Expand Down Expand Up @@ -217,6 +223,13 @@
return o.persistentPeers[id]
}

func (o *PeerManagerOptions) isBlockSync(id types.NodeID) bool {
if o.blocksyncPeers == nil {
panic("isBlockSync() called before optimize()")

Check warning on line 228 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L228

Added line #L228 was not covered by tests
}
return o.blocksyncPeers[id]
}

func (o *PeerManagerOptions) isUnconditional(id types.NodeID) bool {
if o.unconditionalPeers == nil {
panic("isUnconditional() called before optimize()")
Expand All @@ -234,6 +247,11 @@
o.persistentPeers[p] = true
}

o.blocksyncPeers = make(map[types.NodeID]bool, len(o.BlockSyncPeers))
for _, p := range o.BlockSyncPeers {
o.blocksyncPeers[p] = true
}

Check warning on line 253 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L252-L253

Added lines #L252 - L253 were not covered by tests

o.unconditionalPeers = make(map[types.NodeID]struct{}, len(o.UnconditionalPeers))
for _, p := range o.UnconditionalPeers {
o.unconditionalPeers[p] = struct{}{}
Expand Down Expand Up @@ -367,6 +385,9 @@
for _, id := range m.options.UnconditionalPeers {
configure[id] = true
}
for _, id := range m.options.BlockSyncPeers {
configure[id] = true
}

Check warning on line 390 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L389-L390

Added lines #L389 - L390 were not covered by tests
for id := range m.options.PeerScores {
configure[id] = true
}
Expand All @@ -384,6 +405,7 @@
func (m *PeerManager) configurePeer(peer peerInfo) peerInfo {
peer.Persistent = m.options.isPersistent(peer.ID)
peer.Unconditional = m.options.isUnconditional(peer.ID)
peer.BlockSync = m.options.isBlockSync(peer.ID)
peer.FixedScore = m.options.PeerScores[peer.ID]
return peer
}
Expand Down Expand Up @@ -464,6 +486,10 @@
return true, nil
}

func (m *PeerManager) GetBlockSyncPeers() map[types.NodeID]bool {
return m.options.blocksyncPeers

Check warning on line 490 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L489-L490

Added lines #L489 - L490 were not covered by tests
}

// PeerRatio returns the ratio of peer addresses stored to the maximum size.
func (m *PeerManager) PeerRatio() float64 {
m.mtx.Lock()
Expand Down Expand Up @@ -1318,6 +1344,7 @@
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
Unconditional bool
BlockSync bool
Seed bool
Height int64
FixedScore PeerScore // mainly for tests
Expand Down Expand Up @@ -1388,7 +1415,7 @@
}

score := p.MutableScore
if p.Persistent {
if p.Persistent || p.BlockSync {
score = int64(PeerScorePersistent)
}

Expand Down
10 changes: 10 additions & 0 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@
peers = append(peers, address)
}

for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BlockSyncPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)

Check warning on line 261 in node/setup.go

View check run for this annotation

Codecov / codecov/patch

node/setup.go#L259-L261

Added lines #L259 - L261 were not covered by tests
}

peers = append(peers, address)
options.BlockSyncPeers = append(options.BlockSyncPeers, address.NodeID)

Check warning on line 265 in node/setup.go

View check run for this annotation

Codecov / codecov/patch

node/setup.go#L264-L265

Added lines #L264 - L265 were not covered by tests
}

for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ") {
options.UnconditionalPeers = append(options.UnconditionalPeers, types.NodeID(p))
}
Expand Down
Loading