Skip to content

Commit

Permalink
node/peers: basic conn count mgr (#1137)
Browse files Browse the repository at this point in the history
* node/peers: basic conn count mgr

* use CompressDialError when applicable
  • Loading branch information
jchappelow authored Dec 10, 2024
1 parent 8184eea commit 9d5393a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 14 deletions.
43 changes: 37 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
Expand All @@ -48,7 +49,7 @@ type peerManager interface {
network.Notifiee
Start(context.Context) error
ConnectedPeers() []peers.PeerInfo
KnownPeers() []peers.PeerInfo
KnownPeers() ([]peers.PeerInfo, []peers.PeerInfo, []peers.PeerInfo)
}

type Node struct {
Expand Down Expand Up @@ -247,16 +248,25 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error {
return err
}

bootpeers, err = peers.ConvertPeersToMultiAddr(bootpeers)
bootpeersMA, err := peers.ConvertPeersToMultiAddr(bootpeers)
if err != nil {
return err
}

// connect to bootstrap peers, if any
for _, peer := range bootpeers {
peerInfo, err := connectPeer(ctx, peer, n.host)
for i, peer := range bootpeersMA {
peerInfo, err := makePeerAddrInfo(peer)
if err != nil {
n.log.Warnf("invalid bootnode address %v from setting %v", peer, bootpeers[i])
continue
}
err = connectPeerAddrInfo(ctx, peerInfo, n.host)
if err != nil {
n.log.Errorf("failed to connect to %v: %v", peer, err)
// Add it to the peer store anyway since this was specified as a
// bootnode, which is supposed to be persistent, so we should try to
// connect again later.
n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
continue
}
if err = n.checkPeerProtos(ctx, peerInfo.ID); err != nil {
Expand Down Expand Up @@ -294,6 +304,7 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error {
Addrs: peerAddrs,
}); err != nil {
n.log.Warnf("Unable to connect to peer %s: %v", peerID, peers.CompressDialError(err))
continue
}
n.log.Infof("Connected to address book peer %v", peerID)
}
Expand Down Expand Up @@ -567,13 +578,24 @@ func newHost(ip string, port uint64, privKey crypto.PrivateKey) (host.Host, erro

// cg := peers.NewProtocolGater()

// The BasicConnManager can keep connections below an upper limit, dropping
// down to some lower limit (presumably to keep a dynamic peer list), but it
// won't try to initiation new connections to reach the min. Maybe this will
// be helpful later, so leaving as a comment:

// cm, err := connmgr.NewConnManager(60, 100, connmgr.WithGracePeriod(20*time.Minute)) // TODO: absorb this into peerman
// if err != nil {
// return nil, nil, err
// }

h, err := libp2p.New(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Security(noise.ID, noise.New), // modified TLS based on node-ID
libp2p.ListenAddrs(sourceMultiAddr),
// listenAddrs,
libp2p.Identity(privKeyP2P),
// libp2p.ConnectionGater(cg),
// libp2p.ConnectionManager(cm),
) // libp2p.RandomIdentity, in-mem peer store, ...
if err != nil {
return nil, err
Expand Down Expand Up @@ -605,15 +627,24 @@ func hostPort(host host.Host) ([]string, []int, []string) {
return addrStr, ports, protocols
}

func connectPeer(ctx context.Context, addr string, host host.Host) (*peer.AddrInfo, error) {
func makePeerAddrInfo(addr string) (*peer.AddrInfo, error) {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}

// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(maddr)
return peer.AddrInfoFromP2pAddr(maddr)
}

func connectPeerAddrInfo(ctx context.Context, info *peer.AddrInfo, host host.Host) error {
return host.Connect(ctx, *info)
}

func connectPeer(ctx context.Context, addr string, host host.Host) (*peer.AddrInfo, error) {
// Extract the peer ID and address info from the multiaddr.
info, err := makePeerAddrInfo(addr)
if err != nil {
return nil, err
}
Expand Down
79 changes: 71 additions & 8 deletions node/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type PeerMan struct {

requiredProtocols []protocol.ID

pex bool
addrBook string
pex bool
addrBook string
targetConnections int

done chan struct{}
close func()
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewPeerMan(pex bool, addrBook string, logger log.Logger, h host.Host,
pex: pex,
requestPeers: requestPeers,
addrBook: addrBook,
targetConnections: 20, // TODO: configurable max(1, targetConnections)
disconnects: make(map[peer.ID]time.Time),
noReconnect: make(map[peer.ID]bool),
}
Expand Down Expand Up @@ -114,6 +116,12 @@ func (pm *PeerMan) Start(ctx context.Context) error {
pm.removeOldPeers()
}()

pm.wg.Add(1)
go func() {
defer pm.wg.Done()
pm.maintainMinPeers(ctx)
}()

<-ctx.Done()

pm.close()
Expand All @@ -123,6 +131,60 @@ func (pm *PeerMan) Start(ctx context.Context) error {
return nil
}

const (
urgentConnInterval = time.Second
normalConnInterval = 20 * urgentConnInterval
)

func (pm *PeerMan) maintainMinPeers(ctx context.Context) {
// Start with a fast iteration until we determine that we either have some
// connected peers, or we don't even have candidate addresses yet.
ticker := time.NewTicker(urgentConnInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
case <-ctx.Done():
return
}

_, activeConns, unconnectedPeers := pm.KnownPeers()
if numActive := len(activeConns); numActive < pm.targetConnections {
if numActive == 0 && len(unconnectedPeers) == 0 {
pm.log.Warnln("No connected peers and no known addresses to dial!")
continue
}

pm.log.Infof("Active connections: %d, below target: %d. Initiating new connections.",
numActive, pm.targetConnections)

var added int
for _, peerInfo := range unconnectedPeers {
pid := peerInfo.ID
err := pm.h.Connect(ctx, peer.AddrInfo{ID: pid})
if err != nil {
pm.log.Warnf("Failed to connect to peer %s: %v", pid, CompressDialError(err))
} else {
pm.log.Infof("Connected to peer %s", pid)
added++
}
}

if added == 0 && numActive == 0 {
// Keep trying known peer addresses more frequently until we
// have at least on connection.
ticker.Reset(urgentConnInterval)
} else {
ticker.Reset(normalConnInterval)
}
} else {
pm.log.Debugf("Have %d connections and %d candidates of %d target", numActive, len(unconnectedPeers), pm.targetConnections)
}

}
}

func (pm *PeerMan) startPex(ctx context.Context) {
for {
// discover for this node
Expand All @@ -136,7 +198,7 @@ func (pm *PeerMan) startPex(ctx context.Context) {
if pm.addPeerAddrs(peer) {
// TODO: connection manager, with limits
if err = pm.c.Connect(ctx, peer); err != nil {
pm.log.Warnf("Failed to connect to %s: %v", peer.ID, err)
pm.log.Warnf("Failed to connect to %s: %v", peer.ID, CompressDialError(err))
}
}
count++
Expand Down Expand Up @@ -219,12 +281,13 @@ func (pm *PeerMan) ConnectedPeers() []PeerInfo {

// KnownPeers returns a list of peer info for all known peers (connected or just
// in peer store).
func (pm *PeerMan) KnownPeers() []PeerInfo {
func (pm *PeerMan) KnownPeers() (all, connected, disconnected []PeerInfo) {
// connected peers first
peers := pm.ConnectedPeers()
connectedPeers := make(map[peer.ID]bool)
for _, peerInfo := range peers {
connectedPeers[peerInfo.ID] = true
connected = append(connected, peerInfo)
}

// all others in peer store
Expand All @@ -241,10 +304,11 @@ func (pm *PeerMan) KnownPeers() []PeerInfo {
continue
}

disconnected = append(disconnected, *peerInfo)
peers = append(peers, *peerInfo)
}

return peers
return peers, connected, disconnected
}

func CheckProtocolSupport(_ context.Context, ps peerstore.Peerstore, peerID peer.ID, protoIDs ...protocol.ID) (bool, error) {
Expand Down Expand Up @@ -304,14 +368,14 @@ func peerInfo(ps peerstore.Peerstore, peerID peer.ID) (*PeerInfo, error) {
}

func (pm *PeerMan) PrintKnownPeers() {
peers := pm.KnownPeers()
peers, _, _ := pm.KnownPeers()
for _, p := range peers {
pm.log.Info("Known peer", "id", p.ID.String())
}
}

func (pm *PeerMan) savePeers() error {
peerList := pm.KnownPeers()
peerList, _, _ := pm.KnownPeers()
pm.log.Infof("saving %d peers to address book", len(peerList))
if err := persistPeers(peerList, pm.addrBook); err != nil {
return err
Expand Down Expand Up @@ -493,7 +557,6 @@ func (pm *PeerMan) reconnectWithRetry(ctx context.Context, peerID peer.ID) {
cancel()
err = CompressDialError(err)
pm.log.Infof("Failed to reconnect to peer %s (trying again in %v): %v", peerID, delay, err)

} else {
cancel()
pm.log.Infof("Successfully reconnected to peer %s", peerID)
Expand Down

0 comments on commit 9d5393a

Please sign in to comment.