diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index 44eb573..3bb6e3e 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -56,7 +56,7 @@ type ( authorizedPeers map[peer.ID]struct{} authorizedPeersLock sync.RWMutex - pub *ipniPublisher + pub *hubPublisher dht *fulaDht } pushRequest struct { @@ -97,7 +97,7 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e } } //if !e.ipniPublishDisabled { - e.pub, err = newIpniPublisher(h, opts) + e.pub, err = newHubPublisher(h, opts) if err != nil { return nil, err } diff --git a/exchange/hub_publisher.go b/exchange/hub_publisher.go new file mode 100644 index 0000000..62b35ef --- /dev/null +++ b/exchange/hub_publisher.go @@ -0,0 +1,140 @@ +package exchange + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync/atomic" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + p2phttp "github.com/libp2p/go-libp2p-http" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/multiformats/go-multihash" +) + +type hubPublisher struct { + *options + h host.Host + buffer chan cid.Cid + ctx context.Context + cancel context.CancelFunc + client *http.Client +} + +type PutContentRequest struct { + Mutlihashes []multihash.Multihash +} + +func newHubPublisher(h host.Host, opts *options) (*hubPublisher, error) { + p := &hubPublisher{ + h: h, + options: opts, + } + p.ctx, p.cancel = context.WithCancel(context.Background()) + p.buffer = make(chan cid.Cid, p.ipniPublishChanBuffer) + + tr := &http.Transport{} + tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h, p2phttp.ProtocolOption("/fx.land/hub/0.0.1"))) + p.client = &http.Client{Transport: tr} + + a, err := peer.AddrInfoFromString("/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R") + if err != nil { + return nil, err + } + h.Peerstore().AddAddr(a.ID, a.Addrs[0], peerstore.PermanentAddrTTL) + return p, nil +} + +func (p *hubPublisher) Start(_ context.Context) error { + go func() { + unpublished := make(map[cid.Cid]struct{}) + var publishing atomic.Bool + maybePublish := func() { + remaining := len(unpublished) + if remaining == 0 { + log.Debug("No remaining entries to publish") + return + } + if publishing.Load() { + log.Debugw("IPNI publishing in progress", "remaining", remaining) + return + } + log.Debugw("Attempting to publish links to IPNI", "count", remaining) + mhs := make([]multihash.Multihash, 0, remaining) + for c := range unpublished { + mhs = append(mhs, c.Hash()) + delete(unpublished, c) + } + publishing.Store(true) + go func(entries []multihash.Multihash) { + log.Debug("IPNI publish attempt in progress...") + defer func() { + publishing.Store(false) + log.Debug("Finished attempt to publish to IPNI.") + }() + if err := p.publish(entries); err != nil { + log.Errorw("Failed to publish to IPNI", "entriesCount", len(mhs), "err", err) + } + }(mhs) + } + for { + select { + case <-p.ctx.Done(): + log.Infow("IPNI publisher stopped", "remainingLinks", len(unpublished)) + return + case <-p.ipniPublishTicker.C: + maybePublish() + case c := <-p.buffer: + unpublished[c] = struct{}{} + maybePublish() + } + } + }() + return nil +} + +func (p *hubPublisher) notifyReceivedLink(l ipld.Link) { + if l == nil { + return + } + link, ok := l.(cidlink.Link) + if ok && + !cid.Undef.Equals(link.Cid) && + link.Cid.Prefix().MhType != multihash.IDENTITY { + p.buffer <- link.Cid + } +} + +func (p *hubPublisher) publish(mhs []multihash.Multihash) error { + r := &PutContentRequest{ + Mutlihashes: mhs, + } + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(&r); err != nil { + return err + } + request, err := http.NewRequest(http.MethodPut, "libp2p://12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R/content", &buf) + if err != nil { + return err + } + resp, err := p.client.Do(request) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("receievd non OK response from hub: %d", resp.StatusCode) + } + return nil +} + +func (p *hubPublisher) shutdown() error { + p.cancel() + p.ipniPublishTicker.Stop() + return nil +} diff --git a/go.mod b/go.mod index 4a4f90c..fc581af 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/libp2p/go-libp2p v0.32.1 github.com/libp2p/go-libp2p-gostream v0.6.0 + github.com/libp2p/go-libp2p-http v0.5.0 github.com/libp2p/go-libp2p-kad-dht v0.25.0 github.com/libp2p/go-libp2p-pubsub v0.10.0 github.com/mdp/qrterminal v1.0.1 diff --git a/go.sum b/go.sum index 03945d4..322eef6 100644 --- a/go.sum +++ b/go.sum @@ -560,6 +560,8 @@ github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= +github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc= +github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg= github.com/libp2p/go-libp2p-kad-dht v0.25.0 h1:T2SXQ/VlXTQVLChWY/+OyOsmGMRJvB5kiR+eJt7jtvI= github.com/libp2p/go-libp2p-kad-dht v0.25.0/go.mod h1:P6fz+J+u4tPigvS5J0kxQ1isksqAhmXiS/pNaEw/nFI= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=