From c3d9fb65208e1638fe957cb5ec7a6916cbee0ced Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Thu, 21 Dec 2023 21:02:54 +0000 Subject: [PATCH] Add configurable rate limiting to Fx Exchange push rate Add a configurable rate limiter to the parallel CID push mechanism in FX exchange. Add the ability to override the default value of 5 per second via CLI arguments or blox YAML config. --- cmd/blox/main.go | 9 +++++++++ exchange/fx_exchange.go | 1 + exchange/options.go | 12 ++++++++++++ go.mod | 1 + go.sum | 2 ++ 5 files changed, 25 insertions(+) diff --git a/cmd/blox/main.go b/cmd/blox/main.go index 27f54d29..a72d173d 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -87,6 +87,7 @@ var ( StaticRelays []string `yaml:"staticRelays"` ForceReachabilityPrivate bool `yaml:"forceReachabilityPrivate"` AllowTransientConnection bool `yaml:"allowTransientConnection"` + MaxCIDPushRate int `yaml:"maxCIDPushRate"` IpniPublishDisabled bool `yaml:"ipniPublishDisabled"` IpniPublishInterval time.Duration `yaml:"ipniPublishInterval"` IpniPublishDirectAnnounce []string `yaml:"IpniPublishDirectAnnounce"` @@ -319,6 +320,13 @@ func init() { Destination: &app.config.AllowTransientConnection, Value: true, }), + altsrc.NewIntFlag(&cli.IntFlag{ + Name: "maxCIDPushRate", + Aliases: []string{"mcidpr"}, + Usage: "Maximum number of CIDs pushed per second.", + Destination: &app.config.MaxCIDPushRate, + Value: 5, + }), altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "ipniPublisherDisabled", Usage: "Weather to disable IPNI publisher.", @@ -758,6 +766,7 @@ func action(ctx *cli.Context) error { exchange.WithAuthorizer(authorizer), exchange.WithAuthorizedPeers(authorizedPeers), exchange.WithAllowTransientConnection(app.config.AllowTransientConnection), + exchange.WithMaxPushRate(app.config.MaxCIDPushRate), exchange.WithIpniPublishDisabled(app.config.IpniPublishDisabled), exchange.WithIpniPublishInterval(app.config.IpniPublishInterval), exchange.WithIpniGetEndPoint("https://cid.contact/cid/"), diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index d5ba462e..6ad26cde 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -322,6 +322,7 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { // Recursively traverse the node and push all its leaves. err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error { eg.Go(func() error { + e.pushRateLimiter.Take() link, err := e.ls.ComputeLink(l.Prototype(), node) if err != nil { return err diff --git a/exchange/options.go b/exchange/options.go index d4b65432..7dcce072 100644 --- a/exchange/options.go +++ b/exchange/options.go @@ -8,6 +8,7 @@ import ( "github.com/ipni/index-provider/engine" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/ratelimit" ) type ( @@ -27,6 +28,7 @@ type ( updateConfig ConfigUpdater wg *sync.WaitGroup ipfsApi iface.CoreAPI + pushRateLimiter ratelimit.Limiter } ) @@ -35,6 +37,7 @@ func newOptions(o ...Option) (*options, error) { ipniPublishMaxBatchSize: 16 << 10, ipniPublishChanBuffer: 1, wg: nil, + pushRateLimiter: ratelimit.New(5), // Default of 5 per second } for _, apply := range o { if err := apply(&opts); err != nil { @@ -145,3 +148,12 @@ func WithIPFS(ipfsApi iface.CoreAPI) Option { return nil } } + +// WithMaxPushRate sets the maximum number of CIDs pushed per second. +// Defaults to 5 per second. +func WithMaxPushRate(r int) Option { + return func(o *options) error { + o.pushRateLimiter = ratelimit.New(r) + return nil + } +} diff --git a/go.mod b/go.mod index 386bdafa..ee66b300 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-varint v0.0.7 github.com/urfave/cli/v2 v2.26.0 + go.uber.org/ratelimit v0.3.0 golang.org/x/crypto v0.16.0 golang.org/x/sync v0.5.0 gopkg.in/ini.v1 v1.67.0 diff --git a/go.sum b/go.sum index eb0afe20..90864322 100644 --- a/go.sum +++ b/go.sum @@ -960,6 +960,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.0 h1:IdZd9wqvFXnvLvSEBo0KPcGfkoBGNkpTHlrE3Rcjkjw= +go.uber.org/ratelimit v0.3.0/go.mod h1:So5LG7CV1zWpY1sHe+DXTJqQvOx+FFPFaAs2SnoyBaI= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=