Skip to content

Commit

Permalink
Fix traversal config
Browse files Browse the repository at this point in the history
  • Loading branch information
masih committed Dec 18, 2023
1 parent b0570ad commit 2d9585a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 66 deletions.
4 changes: 2 additions & 2 deletions exchange/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func Example_poolExchangeDagBetweenPoolNodes() {
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
panic(err)
} else if !exists {
panic("expected n2 to have fetched the entire sample DAG")
panic("expected n2 to have fetched the entire sample DAG, link: " + n1RootLink.String())
} else {
fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID())
n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
Expand All @@ -777,7 +777,7 @@ func Example_poolExchangeDagBetweenPoolNodes() {
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
panic(err)
} else if !exists {
panic("expected n2 to have fetched the entire sample DAG")
panic("expected n2 to have fetched the entire sample DAG, link: " + n1leafLink.String())
} else {
fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID())
n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
Expand Down
129 changes: 65 additions & 64 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"path"
"strings"
"sync"

"github.com/ipfs/go-cid"
Expand All @@ -27,6 +25,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -93,22 +92,16 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e
if err != nil {
return nil, err
}
tr := &http.Transport{}
tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h, p2phttp.ProtocolOption(FxExchangeProtocolID)))
client := &http.Client{Transport: tr}

e := &FxExchange{
options: opts,
h: h,
ls: ls,
s: &http.Server{},
c: &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
pid, err := peer.Decode(strings.TrimSuffix(addr, ".invalid:80"))
if err != nil {
return nil, err
}
return gostream.Dial(ctx, h, pid, FxExchangeProtocolID)
},
},
},
options: opts,
h: h,
ls: ls,
s: &http.Server{},
c: client,
authorizedPeers: make(map[peer.ID]struct{}),
}
if e.authorizer != "" {
Expand Down Expand Up @@ -281,7 +274,7 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error
log.Errorw("Failed to encode pull request", "err", err)
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+from.String()+".invalid/"+actionPull, &buf)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "libp2p://"+from.String()+"/"+actionPull, &buf)
if err != nil {
log.Errorw("Failed to instantiate pull request", "err", err)
return err
Expand Down Expand Up @@ -310,46 +303,25 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}

log := log.With("cid", l.(cidlink.Link).Cid)
node, err := e.ls.Load(ipld.LinkContext{Ctx: ctx}, l, basicnode.Prototype.Any)
if err != nil {
log.Errorw("Failed to load link prior to push ", "err", err)
return err
}

progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: e.ls,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
}

// Recursively traverse the node and push all its leaves.
err = traversal.WalkAdv(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node, _ traversal.VisitReason) error {
log := log
if progress.LastBlock.Link != nil {
log = log.With("t-cid", progress.LastBlock.Link.(cidlink.Link).Cid)
}
var buf bytes.Buffer
err := dagcbor.Encode(node, &buf)
if err != nil {
log.Errorw("Failed encode node to be pushed", "err", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+to.String()+".invalid/"+actionPush, &buf)
if err != nil {
log.Errorw("Failed to instantiate push request", "err", err)
return err
}
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to send push request", "err", err)
return err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return err
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Infow("Successfully pushed traversed node")
return nil
}
err = progress.WalkAdv(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node, _ traversal.VisitReason) error {
return e.pushOneNode(ctx, node, to)
})
if err != nil {
log.Errorw("Failed to traverse link during push", "err", err)
Expand All @@ -358,6 +330,37 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
return nil
}

func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID) error {
var buf bytes.Buffer
err := dagcbor.Encode(node, &buf)
if err != nil {
log.Errorw("Failed encode node to be pushed", "err", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "libp2p://"+to.String()+"/"+actionPush, &buf)
if err != nil {
log.Errorw("Failed to instantiate push request", "err", err)
return err
}
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to send push request", "err", err)
return err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return err
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Infow("Successfully pushed traversed node")
return nil
}
}

func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
from, err := peer.Decode(r.RemoteAddr)
if err != nil {
Expand Down Expand Up @@ -433,18 +436,16 @@ func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Req
log = log.With("link", p.Link)
w.WriteHeader(http.StatusAccepted)
log.Info("Accepted pull request")
go func() {
log.Info("Instantiating background push in response to pull request")
ctx := context.TODO()
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}
if err := e.Pull(ctx, from, cidlink.Link{Cid: p.Link}); err != nil {
log.Errorw("Failed to fetch in response to push", "err", err)
} else {
log.Infow("Successfully finished background push in response to pull request")
}
}()
log.Info("Instantiating background push in response to pull request")
ctx := context.TODO()
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}
if err := e.Push(ctx, from, cidlink.Link{Cid: p.Link}); err != nil {
log.Errorw("Failed to fetch in response to push", "err", err)
} else {
log.Infow("Successfully finished background push in response to pull request")
}
}

func (e *FxExchange) handleAuthorization(from peer.ID, w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -524,7 +525,7 @@ func (e *FxExchange) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, a
if err := json.NewEncoder(&buf).Encode(r); err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+on.String()+".invalid/"+actionAuth, &buf)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "libp2p://"+on.String()+"/"+actionAuth, &buf)
if err != nil {
return err
}
Expand Down

0 comments on commit 2d9585a

Please sign in to comment.