Skip to content

Commit

Permalink
Add configurable rate limiting to Fx Exchange push rate
Browse files Browse the repository at this point in the history
Add a configurable rate limiter to the parallel CID push mechanism in FX
exchange.

Add the ability to override the default value of 5 per second via CLI
arguments or blox YAML config.
  • Loading branch information
masih committed Dec 21, 2023
1 parent cdd25a5 commit c3d9fb6
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 0 deletions.
9 changes: 9 additions & 0 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var (
StaticRelays []string `yaml:"staticRelays"`
ForceReachabilityPrivate bool `yaml:"forceReachabilityPrivate"`
AllowTransientConnection bool `yaml:"allowTransientConnection"`
MaxCIDPushRate int `yaml:"maxCIDPushRate"`
IpniPublishDisabled bool `yaml:"ipniPublishDisabled"`
IpniPublishInterval time.Duration `yaml:"ipniPublishInterval"`
IpniPublishDirectAnnounce []string `yaml:"IpniPublishDirectAnnounce"`
Expand Down Expand Up @@ -319,6 +320,13 @@ func init() {
Destination: &app.config.AllowTransientConnection,
Value: true,
}),
altsrc.NewIntFlag(&cli.IntFlag{
Name: "maxCIDPushRate",
Aliases: []string{"mcidpr"},
Usage: "Maximum number of CIDs pushed per second.",
Destination: &app.config.MaxCIDPushRate,
Value: 5,
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "ipniPublisherDisabled",
Usage: "Weather to disable IPNI publisher.",
Expand Down Expand Up @@ -758,6 +766,7 @@ func action(ctx *cli.Context) error {
exchange.WithAuthorizer(authorizer),
exchange.WithAuthorizedPeers(authorizedPeers),
exchange.WithAllowTransientConnection(app.config.AllowTransientConnection),
exchange.WithMaxPushRate(app.config.MaxCIDPushRate),
exchange.WithIpniPublishDisabled(app.config.IpniPublishDisabled),
exchange.WithIpniPublishInterval(app.config.IpniPublishInterval),
exchange.WithIpniGetEndPoint("https://cid.contact/cid/"),
Expand Down
1 change: 1 addition & 0 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
// Recursively traverse the node and push all its leaves.
err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error {
eg.Go(func() error {
e.pushRateLimiter.Take()
link, err := e.ls.ComputeLink(l.Prototype(), node)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions exchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipni/index-provider/engine"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/ratelimit"
)

type (
Expand All @@ -27,6 +28,7 @@ type (
updateConfig ConfigUpdater
wg *sync.WaitGroup
ipfsApi iface.CoreAPI
pushRateLimiter ratelimit.Limiter
}
)

Expand All @@ -35,6 +37,7 @@ func newOptions(o ...Option) (*options, error) {
ipniPublishMaxBatchSize: 16 << 10,
ipniPublishChanBuffer: 1,
wg: nil,
pushRateLimiter: ratelimit.New(5), // Default of 5 per second
}
for _, apply := range o {
if err := apply(&opts); err != nil {
Expand Down Expand Up @@ -145,3 +148,12 @@ func WithIPFS(ipfsApi iface.CoreAPI) Option {
return nil
}
}

// WithMaxPushRate sets the maximum number of CIDs pushed per second.
// Defaults to 5 per second.
func WithMaxPushRate(r int) Option {
return func(o *options) error {
o.pushRateLimiter = ratelimit.New(r)
return nil
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/urfave/cli/v2 v2.26.0
go.uber.org/ratelimit v0.3.0
golang.org/x/crypto v0.16.0
golang.org/x/sync v0.5.0
gopkg.in/ini.v1 v1.67.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.0 h1:IdZd9wqvFXnvLvSEBo0KPcGfkoBGNkpTHlrE3Rcjkjw=
go.uber.org/ratelimit v0.3.0/go.mod h1:So5LG7CV1zWpY1sHe+DXTJqQvOx+FFPFaAs2SnoyBaI=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
Expand Down

0 comments on commit c3d9fb6

Please sign in to comment.