From 9d5393a982b0041209444878534966a932732450 Mon Sep 17 00:00:00 2001 From: jchappelow Date: Tue, 10 Dec 2024 17:18:03 -0600 Subject: [PATCH] node/peers: basic conn count mgr (#1137) * node/peers: basic conn count mgr * use CompressDialError when applicable --- node/node.go | 43 ++++++++++++++++++++---- node/peers/peers.go | 79 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/node/node.go b/node/node.go index 8f2292728..8643c87cb 100644 --- a/node/node.go +++ b/node/node.go @@ -32,6 +32,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" noise "github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -48,7 +49,7 @@ type peerManager interface { network.Notifiee Start(context.Context) error ConnectedPeers() []peers.PeerInfo - KnownPeers() []peers.PeerInfo + KnownPeers() ([]peers.PeerInfo, []peers.PeerInfo, []peers.PeerInfo) } type Node struct { @@ -247,16 +248,25 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error { return err } - bootpeers, err = peers.ConvertPeersToMultiAddr(bootpeers) + bootpeersMA, err := peers.ConvertPeersToMultiAddr(bootpeers) if err != nil { return err } // connect to bootstrap peers, if any - for _, peer := range bootpeers { - peerInfo, err := connectPeer(ctx, peer, n.host) + for i, peer := range bootpeersMA { + peerInfo, err := makePeerAddrInfo(peer) + if err != nil { + n.log.Warnf("invalid bootnode address %v from setting %v", peer, bootpeers[i]) + continue + } + err = connectPeerAddrInfo(ctx, peerInfo, n.host) if err != nil { n.log.Errorf("failed to connect to %v: %v", peer, err) + // Add it to the peer store anyway since this was specified as a + // bootnode, which is supposed to be persistent, so we should try to + // connect again later. + n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL) continue } if err = n.checkPeerProtos(ctx, peerInfo.ID); err != nil { @@ -294,6 +304,7 @@ func (n *Node) Start(ctx context.Context, bootpeers ...string) error { Addrs: peerAddrs, }); err != nil { n.log.Warnf("Unable to connect to peer %s: %v", peerID, peers.CompressDialError(err)) + continue } n.log.Infof("Connected to address book peer %v", peerID) } @@ -567,6 +578,16 @@ func newHost(ip string, port uint64, privKey crypto.PrivateKey) (host.Host, erro // cg := peers.NewProtocolGater() + // The BasicConnManager can keep connections below an upper limit, dropping + // down to some lower limit (presumably to keep a dynamic peer list), but it + // won't try to initiation new connections to reach the min. Maybe this will + // be helpful later, so leaving as a comment: + + // cm, err := connmgr.NewConnManager(60, 100, connmgr.WithGracePeriod(20*time.Minute)) // TODO: absorb this into peerman + // if err != nil { + // return nil, nil, err + // } + h, err := libp2p.New( libp2p.Transport(tcp.NewTCPTransport), libp2p.Security(noise.ID, noise.New), // modified TLS based on node-ID @@ -574,6 +595,7 @@ func newHost(ip string, port uint64, privKey crypto.PrivateKey) (host.Host, erro // listenAddrs, libp2p.Identity(privKeyP2P), // libp2p.ConnectionGater(cg), + // libp2p.ConnectionManager(cm), ) // libp2p.RandomIdentity, in-mem peer store, ... if err != nil { return nil, err @@ -605,7 +627,7 @@ func hostPort(host host.Host) ([]string, []int, []string) { return addrStr, ports, protocols } -func connectPeer(ctx context.Context, addr string, host host.Host) (*peer.AddrInfo, error) { +func makePeerAddrInfo(addr string) (*peer.AddrInfo, error) { // Turn the destination into a multiaddr. maddr, err := multiaddr.NewMultiaddr(addr) if err != nil { @@ -613,7 +635,16 @@ func connectPeer(ctx context.Context, addr string, host host.Host) (*peer.AddrIn } // Extract the peer ID from the multiaddr. - info, err := peer.AddrInfoFromP2pAddr(maddr) + return peer.AddrInfoFromP2pAddr(maddr) +} + +func connectPeerAddrInfo(ctx context.Context, info *peer.AddrInfo, host host.Host) error { + return host.Connect(ctx, *info) +} + +func connectPeer(ctx context.Context, addr string, host host.Host) (*peer.AddrInfo, error) { + // Extract the peer ID and address info from the multiaddr. + info, err := makePeerAddrInfo(addr) if err != nil { return nil, err } diff --git a/node/peers/peers.go b/node/peers/peers.go index 79d048111..72eb88f81 100644 --- a/node/peers/peers.go +++ b/node/peers/peers.go @@ -51,8 +51,9 @@ type PeerMan struct { requiredProtocols []protocol.ID - pex bool - addrBook string + pex bool + addrBook string + targetConnections int done chan struct{} close func() @@ -83,6 +84,7 @@ func NewPeerMan(pex bool, addrBook string, logger log.Logger, h host.Host, pex: pex, requestPeers: requestPeers, addrBook: addrBook, + targetConnections: 20, // TODO: configurable max(1, targetConnections) disconnects: make(map[peer.ID]time.Time), noReconnect: make(map[peer.ID]bool), } @@ -114,6 +116,12 @@ func (pm *PeerMan) Start(ctx context.Context) error { pm.removeOldPeers() }() + pm.wg.Add(1) + go func() { + defer pm.wg.Done() + pm.maintainMinPeers(ctx) + }() + <-ctx.Done() pm.close() @@ -123,6 +131,60 @@ func (pm *PeerMan) Start(ctx context.Context) error { return nil } +const ( + urgentConnInterval = time.Second + normalConnInterval = 20 * urgentConnInterval +) + +func (pm *PeerMan) maintainMinPeers(ctx context.Context) { + // Start with a fast iteration until we determine that we either have some + // connected peers, or we don't even have candidate addresses yet. + ticker := time.NewTicker(urgentConnInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + + _, activeConns, unconnectedPeers := pm.KnownPeers() + if numActive := len(activeConns); numActive < pm.targetConnections { + if numActive == 0 && len(unconnectedPeers) == 0 { + pm.log.Warnln("No connected peers and no known addresses to dial!") + continue + } + + pm.log.Infof("Active connections: %d, below target: %d. Initiating new connections.", + numActive, pm.targetConnections) + + var added int + for _, peerInfo := range unconnectedPeers { + pid := peerInfo.ID + err := pm.h.Connect(ctx, peer.AddrInfo{ID: pid}) + if err != nil { + pm.log.Warnf("Failed to connect to peer %s: %v", pid, CompressDialError(err)) + } else { + pm.log.Infof("Connected to peer %s", pid) + added++ + } + } + + if added == 0 && numActive == 0 { + // Keep trying known peer addresses more frequently until we + // have at least on connection. + ticker.Reset(urgentConnInterval) + } else { + ticker.Reset(normalConnInterval) + } + } else { + pm.log.Debugf("Have %d connections and %d candidates of %d target", numActive, len(unconnectedPeers), pm.targetConnections) + } + + } +} + func (pm *PeerMan) startPex(ctx context.Context) { for { // discover for this node @@ -136,7 +198,7 @@ func (pm *PeerMan) startPex(ctx context.Context) { if pm.addPeerAddrs(peer) { // TODO: connection manager, with limits if err = pm.c.Connect(ctx, peer); err != nil { - pm.log.Warnf("Failed to connect to %s: %v", peer.ID, err) + pm.log.Warnf("Failed to connect to %s: %v", peer.ID, CompressDialError(err)) } } count++ @@ -219,12 +281,13 @@ func (pm *PeerMan) ConnectedPeers() []PeerInfo { // KnownPeers returns a list of peer info for all known peers (connected or just // in peer store). -func (pm *PeerMan) KnownPeers() []PeerInfo { +func (pm *PeerMan) KnownPeers() (all, connected, disconnected []PeerInfo) { // connected peers first peers := pm.ConnectedPeers() connectedPeers := make(map[peer.ID]bool) for _, peerInfo := range peers { connectedPeers[peerInfo.ID] = true + connected = append(connected, peerInfo) } // all others in peer store @@ -241,10 +304,11 @@ func (pm *PeerMan) KnownPeers() []PeerInfo { continue } + disconnected = append(disconnected, *peerInfo) peers = append(peers, *peerInfo) } - return peers + return peers, connected, disconnected } func CheckProtocolSupport(_ context.Context, ps peerstore.Peerstore, peerID peer.ID, protoIDs ...protocol.ID) (bool, error) { @@ -304,14 +368,14 @@ func peerInfo(ps peerstore.Peerstore, peerID peer.ID) (*PeerInfo, error) { } func (pm *PeerMan) PrintKnownPeers() { - peers := pm.KnownPeers() + peers, _, _ := pm.KnownPeers() for _, p := range peers { pm.log.Info("Known peer", "id", p.ID.String()) } } func (pm *PeerMan) savePeers() error { - peerList := pm.KnownPeers() + peerList, _, _ := pm.KnownPeers() pm.log.Infof("saving %d peers to address book", len(peerList)) if err := persistPeers(peerList, pm.addrBook); err != nil { return err @@ -493,7 +557,6 @@ func (pm *PeerMan) reconnectWithRetry(ctx context.Context, peerID peer.ID) { cancel() err = CompressDialError(err) pm.log.Infof("Failed to reconnect to peer %s (trying again in %v): %v", peerID, delay, err) - } else { cancel() pm.log.Infof("Successfully reconnected to peer %s", peerID)