From cf70f1788c48f9a7565e20c1cb2c51f7ffd84870 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Sun, 17 Dec 2023 20:13:32 +0000 Subject: [PATCH] Implement fx exchange fully over HTTP/libp2p Implement DAG pull and push mechanism fully over HTTP/libp2p and remove graphsync dependencies. --- exchange/fx_exchange.go | 262 ++++++++++++++++++++-------------------- go.mod | 2 +- 2 files changed, 130 insertions(+), 134 deletions(-) diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index b5c6635d..fd076dde 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -15,25 +15,27 @@ import ( "sync" "github.com/ipfs/go-cid" - "github.com/ipfs/go-graphsync" - gs "github.com/ipfs/go-graphsync/impl" - gsnet "github.com/ipfs/go-graphsync/network" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" - _ "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/codec/dagcbor" _ "github.com/ipld/go-ipld-prime/codec/dagjson" _ "github.com/ipld/go-ipld-prime/codec/raw" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" gostream "github.com/libp2p/go-libp2p-gostream" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multicodec" ) const ( - FxExchangeProtocolID = "/fx.land/exchange/0.0.1" + FxExchangeProtocolID = "/fx.land/exchange/0.0.2" actionPull = "pull" actionPush = "push" @@ -43,15 +45,31 @@ const ( var ( _ Exchange = (*FxExchange)(nil) - log = logging.Logger("fula/exchange") - errUnauthorized = errors.New("not authorized") + log = logging.Logger("fula/exchange") + errUnauthorized = errors.New("not authorized") + exploreAllRecursivelySelector selector.Selector + pl = cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: uint64(multicodec.DagCbor), + MhType: uint64(multicodec.Sha2_256), + MhLength: -1, + }, + } ) +func init() { + var err error + exploreAllRecursivelySelector, err = selector.ParseSelector(selectorparse.CommonSelector_ExploreAllRecursively) + if err != nil { + panic("failed to parse IPLD built-in selector selectorparse.CommonSelector_ExploreAllRecursively: " + err.Error()) + } +} + type ( FxExchange struct { *options h host.Host - gx graphsync.GraphExchange ls ipld.LinkSystem s *http.Server c *http.Client @@ -61,7 +79,7 @@ type ( pub *hubPublisher dht *fulaDht } - pushRequest struct { + pullRequest struct { Link cid.Cid `json:"link"` } authorizationRequest struct { @@ -240,33 +258,10 @@ func (e *FxExchange) IpniNotifyLink(link ipld.Link) { } func (e *FxExchange) Start(ctx context.Context) error { - gsn := gsnet.NewFromLibp2pHost(e.h) - e.gx = gs.New(ctx, gsn, e.ls) if err := e.pub.Start(ctx); err != nil { return err } - if !e.ipniPublishDisabled { - e.gx.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) { - go func(link ipld.Link) { - log.Debugw("Notifying link to IPNI publisher...", "link", link) - e.pub.notifyReceivedLink(link) - log.Debugw("Successfully notified link to IPNI publisher", "link", link) - }(blockData.Link()) - }) - } - - e.gx.RegisterIncomingRequestHook( - func(p peer.ID, r graphsync.RequestData, ha graphsync.IncomingRequestHookActions) { - log.Debugw("Receiving Graphsync Request...", "p", p) - if e.authorized(p, actionPull) { - log.Debugw("Receiving Graphsync Request authorized...", "p", p) - ha.ValidateRequest() - } else { - log.Errorw("Receiving Graphsync Request not authorized...", "p", p) - ha.TerminateWithError(errUnauthorized) - } - }) listen, err := gostream.Listen(e.h, FxExchangeProtocolID) if err != nil { return err @@ -280,122 +275,86 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error if e.allowTransientConnection { ctx = network.WithUseTransient(ctx, "fx.exchange") } - - /*if e.wg != nil { - e.wg.Add(1) - } - var cidLink cidlink.Link - var ok bool - go func() { - if e.wg != nil { - defer e.wg.Done() - } - - if e.ipfsApi != nil && l != nil { - log.Info("ipfsAPI: Starting Pinning") - - if cidLink, ok = l.(cidlink.Link); !ok { - // Handle error: couldn't fetch the node from IPFS - log.Warnw("ipfsAPI: Failed to create cidLink", "link", l) - } - log.Debugw("cidLink created in ipfsApi", "cidLink", cidLink) - - log.Info("ipfsAPI: Pinned") - } - // Call Provide for the last block link of each response - if l != nil { - if err := e.dht.Provide(l); err != nil { - log.Warnw("Failed to provide link via DHT", "link", l, "err", err) - // Handle the error - } else { - log.Debug("Success provide link via DHT") - } - } - }()*/ - resps, errs := e.gx.Request(ctx, from, l, selectorparse.CommonSelector_ExploreAllRecursively) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case resp, ok := <-resps: - if !ok { - return nil - } - log.Infow("synced node", "node", resp.Node) - /*if e.wg != nil { - e.wg.Add(1) - } - go func() { - if e.wg != nil { - defer e.wg.Done() - } - if e.ipfsApi != nil && resp.LastBlock.Link != nil { - log.Info("ipfsAPI: Starting Pinning") - if cidLink, ok = resp.LastBlock.Link.(cidlink.Link); !ok { - // Handle error: couldn't fetch the node from IPFS - log.Warnw("ipfsAPI: Failed to create cidLikn", "link", resp.LastBlock.Link) - } - node, err := e.ipfsApi.Dag().Get(ctx, cidLink.Cid) - if err != nil { - // Handle error: couldn't fetch the node from IPFS - log.Warnw("ipfsAPI: Failed to create node", "link", resp.LastBlock.Link, "err", err) - } - err = e.ipfsApi.Dag().Pinning().Add(ctx, node) - if err != nil { - // Handle error: couldn't fetch the node from IPFS - log.Warnw("ipfsAPI: Failed to pin add in ipfs", "link", resp.LastBlock.Link, "err", err) - } - log.Info("ipfsAPI: Pinned") - } - // Call Provide for the last block link of each response - if resp.LastBlock.Link != nil { - if err := e.dht.Provide(resp.LastBlock.Link); err != nil { - log.Warnw("Failed to provide link via DHT2", "link", resp.LastBlock.Link, "err", err) - // Decide how to handle the error, e.g., continue, return, etc. - } else { - log.Debug("Success provide link via DHT") - } - } - }()*/ - - case err, ok := <-errs: - if !ok { - return nil - } - log.Warnw("sync failed", "err", err) - } - } -} - -func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { - if e.allowTransientConnection { - ctx = network.WithUseTransient(ctx, "fx.exchange") - } - r := pushRequest{Link: l.(cidlink.Link).Cid} + r := pullRequest{Link: l.(cidlink.Link).Cid} var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(r); err != nil { + log.Errorw("Failed to encode pull request", "err", err) return err } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+to.String()+".invalid/"+actionPush, &buf) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+from.String()+".invalid/"+actionPull, &buf) if err != nil { + log.Errorw("Failed to instantiate pull request", "err", err) return err } resp, err := e.c.Do(req) if err != nil { + log.Errorw("Failed to send pull request", "err", err) return err } defer resp.Body.Close() b, err := io.ReadAll(resp.Body) switch { case err != nil: + log.Errorw("Failed to read pull response", "err", err) return err case resp.StatusCode != http.StatusAccepted: + log.Errorw("Expected status accepted on pull response", "got", resp.StatusCode) return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) default: + log.Infow("Successfully sent push request") return nil } } +func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { + if e.allowTransientConnection { + ctx = network.WithUseTransient(ctx, "fx.exchange") + } + log := log.With("cid", l.(cidlink.Link).Cid) + node, err := e.ls.Load(ipld.LinkContext{Ctx: ctx}, l, basicnode.Prototype.Any) + if err != nil { + log.Errorw("Failed to load link prior to push ", "err", err) + return err + } + // Recursively traverse the node and push all its leaves. + err = traversal.WalkAdv(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node, _ traversal.VisitReason) error { + log := log.With("t-cid", progress.LastBlock.Link.(cidlink.Link).Cid) + var buf bytes.Buffer + err := dagcbor.Encode(node, &buf) + if err != nil { + log.Errorw("Failed encode node to be pushed", "err", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+to.String()+".invalid/"+actionPush, &buf) + if err != nil { + log.Errorw("Failed to instantiate push request", "err", err) + return err + } + resp, err := e.c.Do(req) + if err != nil { + log.Errorw("Failed to send push request", "err", err) + return err + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + switch { + case err != nil: + log.Errorw("Failed to read the response from push", "err", err) + return err + case resp.StatusCode != http.StatusOK: + log.Errorw("Received non-OK response from push", "err", err) + return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) + default: + log.Infow("Successfully pushed traversed node") + return nil + } + }) + if err != nil { + log.Errorw("Failed to traverse link during push", "err", err) + return err + } + return nil +} + func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) { from, err := peer.Decode(r.RemoteAddr) if err != nil { @@ -406,10 +365,12 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) { action := path.Base(r.URL.Path) if !e.authorized(from, action) { log.Debugw("rejected unauthorized request", "from", from, "action", action) - http.Error(w, "", http.StatusUnauthorized) + http.Error(w, errUnauthorized.Error(), http.StatusUnauthorized) return } switch action { + case actionPull: + e.handlePull(from, w, r) case actionPush: e.handlePush(from, w, r) case actionAuth: @@ -421,31 +382,66 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) { func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Request) { log := log.With("action", actionPush, "from", from) + if r.Method != http.MethodPost { + log.Errorw("Only POST is allowed on push", "got", r.Method) + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + defer r.Body.Close() + na := basicnode.Prototype.Any.NewBuilder() + err := dagcbor.Decode(na, r.Body) + if err != nil { + log.Errorw("Failed to decode pushed node as dagcbor", "err", err) + http.Error(w, "failed to decode node", http.StatusBadRequest) + return + } + l, err := e.ls.Store(ipld.LinkContext{Ctx: r.Context()}, pl, na.Build()) + if err != nil { + log.Errorw("Failed to store pushed node", "err", err) + http.Error(w, "failed to store node", http.StatusInternalServerError) + return + } + log.Infow("Successfully stored pushed node", "cid", l.(cidlink.Link).Cid) + log.Debugw("Notifying stored pushed link to IPNI publisher", "link", l) + e.pub.notifyReceivedLink(l) + log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", l) +} + +func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request) { + log := log.With("action", actionPull, "from", from) + if r.Method != http.MethodPost { + log.Errorw("Only POST is allowed on pull", "got", r.Method) + http.Error(w, "", http.StatusMethodNotAllowed) + return + } defer r.Body.Close() b, err := io.ReadAll(r.Body) if err != nil { - log.Errorw("failed to read request body", "err", err) - http.Error(w, "", http.StatusInternalServerError) + log.Errorw("failed to read pull request body", "err", err) + http.Error(w, "", http.StatusBadRequest) return } - var p pushRequest + var p pullRequest if err := json.Unmarshal(b, &p); err != nil { - log.Debugw("cannot parse request body", "err", err) + log.Errorw("Cannot parse pull request body", "err", err) http.Error(w, "", http.StatusBadRequest) return } + log = log.With("link", p.Link) + w.WriteHeader(http.StatusAccepted) + log.Info("Accepted pull request") go func() { + log.Info("Instantiating background push in response to pull request") ctx := context.TODO() if e.allowTransientConnection { ctx = network.WithUseTransient(ctx, "fx.exchange") } if err := e.Pull(ctx, from, cidlink.Link{Cid: p.Link}); err != nil { - log.Warnw("failed to fetch in response to push", "err", err) + log.Errorw("Failed to fetch in response to push", "err", err) } else { - log.Debugw("successfully fetched in response to push", "from", from, "link", p.Link) + log.Infow("Successfully finished background push in response to pull request") } }() - w.WriteHeader(http.StatusAccepted) } func (e *FxExchange) handleAuthorization(from peer.ID, w http.ResponseWriter, r *http.Request) { diff --git a/go.mod b/go.mod index de592008..61e171ed 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger v0.3.0 - github.com/ipfs/go-graphsync v0.16.0 github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/kubo v0.25.0 @@ -120,6 +119,7 @@ require ( github.com/ipfs/go-ds-leveldb v0.5.0 // indirect github.com/ipfs/go-ds-measure v0.2.0 // indirect github.com/ipfs/go-fs-lock v0.0.7 // indirect + github.com/ipfs/go-graphsync v0.16.0 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect