Skip to content

Commit

Permalink
Implement hub publisher (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
masih authored Dec 12, 2023
1 parent f6cc8f2 commit fcafb93
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 2 deletions.
4 changes: 2 additions & 2 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type (

authorizedPeers map[peer.ID]struct{}
authorizedPeersLock sync.RWMutex
pub *ipniPublisher
pub *hubPublisher
dht *fulaDht
}
pushRequest struct {
Expand Down Expand Up @@ -97,7 +97,7 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e
}
}
//if !e.ipniPublishDisabled {
e.pub, err = newIpniPublisher(h, opts)
e.pub, err = newHubPublisher(h, opts)
if err != nil {
return nil, err
}
Expand Down
140 changes: 140 additions & 0 deletions exchange/hub_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package exchange

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync/atomic"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
p2phttp "github.com/libp2p/go-libp2p-http"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multihash"
)

type hubPublisher struct {
*options
h host.Host
buffer chan cid.Cid
ctx context.Context
cancel context.CancelFunc
client *http.Client
}

type PutContentRequest struct {
Mutlihashes []multihash.Multihash
}

func newHubPublisher(h host.Host, opts *options) (*hubPublisher, error) {
p := &hubPublisher{
h: h,
options: opts,
}
p.ctx, p.cancel = context.WithCancel(context.Background())
p.buffer = make(chan cid.Cid, p.ipniPublishChanBuffer)

tr := &http.Transport{}
tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h, p2phttp.ProtocolOption("/fx.land/hub/0.0.1")))
p.client = &http.Client{Transport: tr}

a, err := peer.AddrInfoFromString("/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R")
if err != nil {
return nil, err
}
h.Peerstore().AddAddr(a.ID, a.Addrs[0], peerstore.PermanentAddrTTL)
return p, nil
}

func (p *hubPublisher) Start(_ context.Context) error {
go func() {
unpublished := make(map[cid.Cid]struct{})
var publishing atomic.Bool
maybePublish := func() {
remaining := len(unpublished)
if remaining == 0 {
log.Debug("No remaining entries to publish")
return
}
if publishing.Load() {
log.Debugw("IPNI publishing in progress", "remaining", remaining)
return
}
log.Debugw("Attempting to publish links to IPNI", "count", remaining)
mhs := make([]multihash.Multihash, 0, remaining)
for c := range unpublished {
mhs = append(mhs, c.Hash())
delete(unpublished, c)
}
publishing.Store(true)
go func(entries []multihash.Multihash) {
log.Debug("IPNI publish attempt in progress...")
defer func() {
publishing.Store(false)
log.Debug("Finished attempt to publish to IPNI.")
}()
if err := p.publish(entries); err != nil {
log.Errorw("Failed to publish to IPNI", "entriesCount", len(mhs), "err", err)
}
}(mhs)
}
for {
select {
case <-p.ctx.Done():
log.Infow("IPNI publisher stopped", "remainingLinks", len(unpublished))
return
case <-p.ipniPublishTicker.C:
maybePublish()
case c := <-p.buffer:
unpublished[c] = struct{}{}
maybePublish()
}
}
}()
return nil
}

func (p *hubPublisher) notifyReceivedLink(l ipld.Link) {
if l == nil {
return
}
link, ok := l.(cidlink.Link)
if ok &&
!cid.Undef.Equals(link.Cid) &&
link.Cid.Prefix().MhType != multihash.IDENTITY {
p.buffer <- link.Cid
}
}

func (p *hubPublisher) publish(mhs []multihash.Multihash) error {
r := &PutContentRequest{
Mutlihashes: mhs,
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&r); err != nil {
return err
}
request, err := http.NewRequest(http.MethodPut, "libp2p://12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R/content", &buf)
if err != nil {
return err
}
resp, err := p.client.Do(request)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("receievd non OK response from hub: %d", resp.StatusCode)
}
return nil
}

func (p *hubPublisher) shutdown() error {
p.cancel()
p.ipniPublishTicker.Stop()
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/libp2p/go-libp2p v0.32.1
github.com/libp2p/go-libp2p-gostream v0.6.0
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.25.0
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/mdp/qrterminal v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU=
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.25.0 h1:T2SXQ/VlXTQVLChWY/+OyOsmGMRJvB5kiR+eJt7jtvI=
github.com/libp2p/go-libp2p-kad-dht v0.25.0/go.mod h1:P6fz+J+u4tPigvS5J0kxQ1isksqAhmXiS/pNaEw/nFI=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
Expand Down

0 comments on commit fcafb93

Please sign in to comment.