Skip to content

Commit

Permalink
Fix traversal and link prototype issue
Browse files Browse the repository at this point in the history
  • Loading branch information
masih committed Dec 20, 2023
1 parent 7dcc193 commit 6bb803a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
2 changes: 1 addition & 1 deletion blox/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var lp = cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagCbor),
MhType: uint64(multicodec.Sha2_256),
MhType: uint64(multicodec.Blake3),
MhLength: -1,
},
}
Expand Down
86 changes: 60 additions & 26 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,29 @@ import (
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"

bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
ipldmc "github.com/ipld/go-ipld-prime/multicodec"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"golang.org/x/sync/errgroup"
)

Expand All @@ -48,19 +49,16 @@ var (
log = logging.Logger("fula/exchange")
errUnauthorized = errors.New("not authorized")
exploreAllRecursivelySelector selector.Selector
pl = cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagCbor),
MhType: uint64(multicodec.Sha2_256),
MhLength: -1,
},
}
)

func init() {
var err error
exploreAllRecursivelySelector, err = selector.ParseSelector(selectorparse.CommonSelector_ExploreAllRecursively)
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
ss := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
))
exploreAllRecursivelySelector, err = ss.Selector()
if err != nil {
panic("failed to parse IPLD built-in selector selectorparse.CommonSelector_ExploreAllRecursively: " + err.Error())
}
Expand Down Expand Up @@ -316,14 +314,19 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: e.ls,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
LinkTargetNodePrototypeChooser: bsfetcher.DefaultPrototypeChooser,
LinkVisitOnlyOnce: true,
},
}
var eg errgroup.Group
// Recursively traverse the node and push all its leaves.
err = progress.WalkAdv(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node, _ traversal.VisitReason) error {
err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error {
eg.Go(func() error {
return e.pushOneNode(ctx, node, to)
link, err := e.ls.ComputeLink(l.Prototype(), node)
if err != nil {
return err
}
return e.pushOneNode(ctx, node, to, link)
})
return nil
})
Expand All @@ -334,13 +337,20 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
return eg.Wait()
}

func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID) error {
func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID, link datamodel.Link) error {
var buf bytes.Buffer
err := dagcbor.Encode(node, &buf)
c := link.(cidlink.Link).Cid
encoder, err := ipldmc.LookupEncoder(c.Prefix().Codec)
if err != nil {
log.Errorw("No encoder found to encode pushing node", "err", err)
return err
}

if err := encoder(node, &buf); err != nil {
log.Errorw("Failed encode node to be pushed", "err", err)
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "libp2p://"+to.String()+"/"+actionPush, &buf)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "libp2p://"+to.String()+"/"+actionPush+"/"+c.String(), &buf)
if err != nil {
log.Errorw("Failed to instantiate push request", "err", err)
return err
Expand Down Expand Up @@ -372,7 +382,15 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
http.Error(w, "", http.StatusBadRequest)
return
}
action := path.Base(r.URL.Path)

segments := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/")
var action, c string
if len(segments) > 0 {
action = segments[0]
}
if len(segments) > 1 {
c = segments[1]
}
if !e.authorized(from, action) {
log.Debugw("rejected unauthorized request", "from", from, "action", action)
http.Error(w, errUnauthorized.Error(), http.StatusUnauthorized)
Expand All @@ -382,30 +400,46 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
case actionPull:
e.handlePull(from, w, r)
case actionPush:
e.handlePush(from, w, r)
e.handlePush(from, w, r, c)
case actionAuth:
e.handleAuthorization(from, w, r)
default:
http.Error(w, "", http.StatusNotFound)
}
}

func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Request) {
log := log.With("action", actionPush, "from", from)
func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Request, c string) {
log := log.With("action", actionPush, "from", from, "cid", c)
if r.Method != http.MethodPost {
log.Errorw("Only POST is allowed on push", "got", r.Method)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
cidInPath, err := cid.Decode(c)
if err != nil {
log.Error("Invalid CID in path")
http.Error(w, "invalid cid", http.StatusBadRequest)
return
}
defer r.Body.Close()
na := basicnode.Prototype.Any.NewBuilder()
err := dagcbor.Decode(na, r.Body)

codec := cidInPath.Prefix().Codec
decoder, err := ipldmc.LookupDecoder(codec)
if err != nil {
log.Errorw("No decoder found", "codec", codec)
http.Error(w, "invalid cid", http.StatusBadRequest)
return
}
nb := basicnode.Prototype.Any.NewBuilder()
if err = decoder(nb, r.Body); err != nil {
log.Errorw("Failed to decode pushed node as dagcbor", "err", err)
http.Error(w, "failed to decode node", http.StatusBadRequest)
return
}
l, err := e.ls.Store(ipld.LinkContext{Ctx: r.Context()}, pl, na.Build())
l, err := e.ls.Store(
ipld.LinkContext{Ctx: r.Context()}, cidlink.LinkPrototype{
Prefix: cidInPath.Prefix(),
}, nb.Build())
if err != nil {
log.Errorw("Failed to store pushed node", "err", err)
http.Error(w, "failed to store node", http.StatusInternalServerError)
Expand Down

0 comments on commit 6bb803a

Please sign in to comment.