Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable rate limiting to Fx Exchange push rate #199

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var (
StaticRelays []string `yaml:"staticRelays"`
ForceReachabilityPrivate bool `yaml:"forceReachabilityPrivate"`
AllowTransientConnection bool `yaml:"allowTransientConnection"`
MaxCIDPushRate int `yaml:"maxCIDPushRate"`
IpniPublishDisabled bool `yaml:"ipniPublishDisabled"`
IpniPublishInterval time.Duration `yaml:"ipniPublishInterval"`
IpniPublishDirectAnnounce []string `yaml:"IpniPublishDirectAnnounce"`
Expand Down Expand Up @@ -319,6 +320,13 @@ func init() {
Destination: &app.config.AllowTransientConnection,
Value: true,
}),
altsrc.NewIntFlag(&cli.IntFlag{
Name: "maxCIDPushRate",
Aliases: []string{"mcidpr"},
Usage: "Maximum number of CIDs pushed per second.",
Destination: &app.config.MaxCIDPushRate,
Value: 5,
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "ipniPublisherDisabled",
Usage: "Weather to disable IPNI publisher.",
Expand Down Expand Up @@ -758,6 +766,7 @@ func action(ctx *cli.Context) error {
exchange.WithAuthorizer(authorizer),
exchange.WithAuthorizedPeers(authorizedPeers),
exchange.WithAllowTransientConnection(app.config.AllowTransientConnection),
exchange.WithMaxPushRate(app.config.MaxCIDPushRate),
exchange.WithIpniPublishDisabled(app.config.IpniPublishDisabled),
exchange.WithIpniPublishInterval(app.config.IpniPublishInterval),
exchange.WithIpniGetEndPoint("https://cid.contact/cid/"),
Expand Down
1 change: 1 addition & 0 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
// Recursively traverse the node and push all its leaves.
err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error {
eg.Go(func() error {
e.pushRateLimiter.Take()
link, err := e.ls.ComputeLink(l.Prototype(), node)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions exchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipni/index-provider/engine"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/ratelimit"
)

type (
Expand All @@ -27,6 +28,7 @@ type (
updateConfig ConfigUpdater
wg *sync.WaitGroup
ipfsApi iface.CoreAPI
pushRateLimiter ratelimit.Limiter
}
)

Expand All @@ -35,6 +37,7 @@ func newOptions(o ...Option) (*options, error) {
ipniPublishMaxBatchSize: 16 << 10,
ipniPublishChanBuffer: 1,
wg: nil,
pushRateLimiter: ratelimit.New(5), // Default of 5 per second
}
for _, apply := range o {
if err := apply(&opts); err != nil {
Expand Down Expand Up @@ -145,3 +148,12 @@ func WithIPFS(ipfsApi iface.CoreAPI) Option {
return nil
}
}

// WithMaxPushRate sets the maximum number of CIDs pushed per second.
// Defaults to 5 per second.
func WithMaxPushRate(r int) Option {
return func(o *options) error {
o.pushRateLimiter = ratelimit.New(r)
return nil
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/urfave/cli/v2 v2.26.0
go.uber.org/ratelimit v0.3.0
golang.org/x/crypto v0.16.0
golang.org/x/sync v0.5.0
gopkg.in/ini.v1 v1.67.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.0 h1:IdZd9wqvFXnvLvSEBo0KPcGfkoBGNkpTHlrE3Rcjkjw=
go.uber.org/ratelimit v0.3.0/go.mod h1:So5LG7CV1zWpY1sHe+DXTJqQvOx+FFPFaAs2SnoyBaI=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
Expand Down
Loading