Skip to content

Commit

Permalink
Implement fx exchange fully over HTTP/libp2p
Browse files Browse the repository at this point in the history
Implement DAG pull and push mechanism fully over HTTP/libp2p and remove graphsync dependencies.
  • Loading branch information
masih committed Dec 17, 2023
1 parent b33c9fc commit cf70f17
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 134 deletions.
262 changes: 129 additions & 133 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,27 @@ import (
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
gs "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
)

const (
FxExchangeProtocolID = "/fx.land/exchange/0.0.1"
FxExchangeProtocolID = "/fx.land/exchange/0.0.2"

actionPull = "pull"
actionPush = "push"
Expand All @@ -43,15 +45,31 @@ const (
var (
_ Exchange = (*FxExchange)(nil)

log = logging.Logger("fula/exchange")
errUnauthorized = errors.New("not authorized")
log = logging.Logger("fula/exchange")
errUnauthorized = errors.New("not authorized")
exploreAllRecursivelySelector selector.Selector
pl = cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagCbor),
MhType: uint64(multicodec.Sha2_256),
MhLength: -1,
},
}
)

func init() {
var err error
exploreAllRecursivelySelector, err = selector.ParseSelector(selectorparse.CommonSelector_ExploreAllRecursively)
if err != nil {
panic("failed to parse IPLD built-in selector selectorparse.CommonSelector_ExploreAllRecursively: " + err.Error())
}
}

type (
FxExchange struct {
*options
h host.Host
gx graphsync.GraphExchange
ls ipld.LinkSystem
s *http.Server
c *http.Client
Expand All @@ -61,7 +79,7 @@ type (
pub *hubPublisher
dht *fulaDht
}
pushRequest struct {
pullRequest struct {
Link cid.Cid `json:"link"`
}
authorizationRequest struct {
Expand Down Expand Up @@ -240,33 +258,10 @@ func (e *FxExchange) IpniNotifyLink(link ipld.Link) {
}

func (e *FxExchange) Start(ctx context.Context) error {
gsn := gsnet.NewFromLibp2pHost(e.h)
e.gx = gs.New(ctx, gsn, e.ls)

if err := e.pub.Start(ctx); err != nil {
return err
}
if !e.ipniPublishDisabled {
e.gx.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
go func(link ipld.Link) {
log.Debugw("Notifying link to IPNI publisher...", "link", link)
e.pub.notifyReceivedLink(link)
log.Debugw("Successfully notified link to IPNI publisher", "link", link)
}(blockData.Link())
})
}

e.gx.RegisterIncomingRequestHook(
func(p peer.ID, r graphsync.RequestData, ha graphsync.IncomingRequestHookActions) {
log.Debugw("Receiving Graphsync Request...", "p", p)
if e.authorized(p, actionPull) {
log.Debugw("Receiving Graphsync Request authorized...", "p", p)
ha.ValidateRequest()
} else {
log.Errorw("Receiving Graphsync Request not authorized...", "p", p)
ha.TerminateWithError(errUnauthorized)
}
})
listen, err := gostream.Listen(e.h, FxExchangeProtocolID)
if err != nil {
return err
Expand All @@ -280,122 +275,86 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}

/*if e.wg != nil {
e.wg.Add(1)
}
var cidLink cidlink.Link
var ok bool
go func() {
if e.wg != nil {
defer e.wg.Done()
}
if e.ipfsApi != nil && l != nil {
log.Info("ipfsAPI: Starting Pinning")
if cidLink, ok = l.(cidlink.Link); !ok {
// Handle error: couldn't fetch the node from IPFS
log.Warnw("ipfsAPI: Failed to create cidLink", "link", l)
}
log.Debugw("cidLink created in ipfsApi", "cidLink", cidLink)
log.Info("ipfsAPI: Pinned")
}
// Call Provide for the last block link of each response
if l != nil {
if err := e.dht.Provide(l); err != nil {
log.Warnw("Failed to provide link via DHT", "link", l, "err", err)
// Handle the error
} else {
log.Debug("Success provide link via DHT")
}
}
}()*/
resps, errs := e.gx.Request(ctx, from, l, selectorparse.CommonSelector_ExploreAllRecursively)
for {
select {
case <-ctx.Done():
return ctx.Err()
case resp, ok := <-resps:
if !ok {
return nil
}
log.Infow("synced node", "node", resp.Node)
/*if e.wg != nil {
e.wg.Add(1)
}
go func() {
if e.wg != nil {
defer e.wg.Done()
}
if e.ipfsApi != nil && resp.LastBlock.Link != nil {
log.Info("ipfsAPI: Starting Pinning")
if cidLink, ok = resp.LastBlock.Link.(cidlink.Link); !ok {
// Handle error: couldn't fetch the node from IPFS
log.Warnw("ipfsAPI: Failed to create cidLikn", "link", resp.LastBlock.Link)
}
node, err := e.ipfsApi.Dag().Get(ctx, cidLink.Cid)
if err != nil {
// Handle error: couldn't fetch the node from IPFS
log.Warnw("ipfsAPI: Failed to create node", "link", resp.LastBlock.Link, "err", err)
}
err = e.ipfsApi.Dag().Pinning().Add(ctx, node)
if err != nil {
// Handle error: couldn't fetch the node from IPFS
log.Warnw("ipfsAPI: Failed to pin add in ipfs", "link", resp.LastBlock.Link, "err", err)
}
log.Info("ipfsAPI: Pinned")
}
// Call Provide for the last block link of each response
if resp.LastBlock.Link != nil {
if err := e.dht.Provide(resp.LastBlock.Link); err != nil {
log.Warnw("Failed to provide link via DHT2", "link", resp.LastBlock.Link, "err", err)
// Decide how to handle the error, e.g., continue, return, etc.
} else {
log.Debug("Success provide link via DHT")
}
}
}()*/

case err, ok := <-errs:
if !ok {
return nil
}
log.Warnw("sync failed", "err", err)
}
}
}

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}
r := pushRequest{Link: l.(cidlink.Link).Cid}
r := pullRequest{Link: l.(cidlink.Link).Cid}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(r); err != nil {
log.Errorw("Failed to encode pull request", "err", err)
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+to.String()+".invalid/"+actionPush, &buf)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+from.String()+".invalid/"+actionPull, &buf)
if err != nil {
log.Errorw("Failed to instantiate pull request", "err", err)
return err
}
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to send pull request", "err", err)
return err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read pull response", "err", err)
return err
case resp.StatusCode != http.StatusAccepted:
log.Errorw("Expected status accepted on pull response", "got", resp.StatusCode)
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Infow("Successfully sent push request")
return nil
}
}

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}
log := log.With("cid", l.(cidlink.Link).Cid)
node, err := e.ls.Load(ipld.LinkContext{Ctx: ctx}, l, basicnode.Prototype.Any)
if err != nil {
log.Errorw("Failed to load link prior to push ", "err", err)
return err
}
// Recursively traverse the node and push all its leaves.
err = traversal.WalkAdv(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node, _ traversal.VisitReason) error {
log := log.With("t-cid", progress.LastBlock.Link.(cidlink.Link).Cid)
var buf bytes.Buffer
err := dagcbor.Encode(node, &buf)
if err != nil {
log.Errorw("Failed encode node to be pushed", "err", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+to.String()+".invalid/"+actionPush, &buf)
if err != nil {
log.Errorw("Failed to instantiate push request", "err", err)
return err
}
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to send push request", "err", err)
return err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return err
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Infow("Successfully pushed traversed node")
return nil
}
})
if err != nil {
log.Errorw("Failed to traverse link during push", "err", err)
return err
}
return nil
}

func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
from, err := peer.Decode(r.RemoteAddr)
if err != nil {
Expand All @@ -406,10 +365,12 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
action := path.Base(r.URL.Path)
if !e.authorized(from, action) {
log.Debugw("rejected unauthorized request", "from", from, "action", action)
http.Error(w, "", http.StatusUnauthorized)
http.Error(w, errUnauthorized.Error(), http.StatusUnauthorized)
return
}
switch action {
case actionPull:
e.handlePull(from, w, r)
case actionPush:
e.handlePush(from, w, r)
case actionAuth:
Expand All @@ -421,31 +382,66 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {

func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Request) {
log := log.With("action", actionPush, "from", from)
if r.Method != http.MethodPost {
log.Errorw("Only POST is allowed on push", "got", r.Method)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
defer r.Body.Close()
na := basicnode.Prototype.Any.NewBuilder()
err := dagcbor.Decode(na, r.Body)
if err != nil {
log.Errorw("Failed to decode pushed node as dagcbor", "err", err)
http.Error(w, "failed to decode node", http.StatusBadRequest)
return
}
l, err := e.ls.Store(ipld.LinkContext{Ctx: r.Context()}, pl, na.Build())
if err != nil {
log.Errorw("Failed to store pushed node", "err", err)
http.Error(w, "failed to store node", http.StatusInternalServerError)
return
}
log.Infow("Successfully stored pushed node", "cid", l.(cidlink.Link).Cid)
log.Debugw("Notifying stored pushed link to IPNI publisher", "link", l)
e.pub.notifyReceivedLink(l)
log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", l)
}

func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request) {
log := log.With("action", actionPull, "from", from)
if r.Method != http.MethodPost {
log.Errorw("Only POST is allowed on pull", "got", r.Method)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
log.Errorw("failed to read request body", "err", err)
http.Error(w, "", http.StatusInternalServerError)
log.Errorw("failed to read pull request body", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}
var p pushRequest
var p pullRequest
if err := json.Unmarshal(b, &p); err != nil {
log.Debugw("cannot parse request body", "err", err)
log.Errorw("Cannot parse pull request body", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}
log = log.With("link", p.Link)
w.WriteHeader(http.StatusAccepted)
log.Info("Accepted pull request")
go func() {
log.Info("Instantiating background push in response to pull request")
ctx := context.TODO()
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}
if err := e.Pull(ctx, from, cidlink.Link{Cid: p.Link}); err != nil {
log.Warnw("failed to fetch in response to push", "err", err)
log.Errorw("Failed to fetch in response to push", "err", err)
} else {
log.Debugw("successfully fetched in response to push", "from", from, "link", p.Link)
log.Infow("Successfully finished background push in response to pull request")
}
}()
w.WriteHeader(http.StatusAccepted)
}

func (e *FxExchange) handleAuthorization(from peer.ID, w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-graphsync v0.16.0
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/kubo v0.25.0
Expand Down Expand Up @@ -120,6 +119,7 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0 // indirect
github.com/ipfs/go-ds-measure v0.2.0 // indirect
github.com/ipfs/go-fs-lock v0.0.7 // indirect
github.com/ipfs/go-graphsync v0.16.0 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
Expand Down

0 comments on commit cf70f17

Please sign in to comment.