Skip to content

Commit ba76165

Browse files
feat(node): dynamic windows for feeder http requests (#2691)
* Implement dynamic timeouts * lint * Fix unit tests * lint * Sorting of values * lint * Modify the testing structure * Modifications to the feeder * renamings * refactor * remove debug log * Address comments on the PR * Decouple changes into two PRs * Bug fix * Bug fix * Fix tests, address PR comments, exclude the 429 Too many requests from increasing the http timeout * lint * add test for String() method * debug * fix unit tests * Add parsing of the string array flags * updates, changes in the functionality * generate docs * add mutex for current timeout * fix unit tests * address comments * change docs * address comments, add unit test * address comments * Update clients/feeder/timeouts.go Co-authored-by: Rodrigo <rodrodpino@gmail.com> Signed-off-by: MaksymMalicki <81577596+MaksymMalicki@users.noreply.github.com> --------- Signed-off-by: MaksymMalicki <81577596+MaksymMalicki@users.noreply.github.com> Co-authored-by: Rodrigo <rodrodpino@gmail.com>
1 parent 14f4f93 commit ba76165

File tree

10 files changed

+459
-32
lines changed

10 files changed

+459
-32
lines changed

clients/feeder/feeder.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Client struct {
3636
userAgent string
3737
apiKey string
3838
listener EventListener
39+
timeouts Timeouts
3940
}
4041

4142
func (c *Client) WithListener(l EventListener) *Client {
@@ -73,8 +74,12 @@ func (c *Client) WithUserAgent(ua string) *Client {
7374
return c
7475
}
7576

76-
func (c *Client) WithTimeout(t time.Duration) *Client {
77-
c.client.Timeout = t
77+
func (c *Client) WithTimeouts(timeouts []time.Duration, fixed bool) *Client {
78+
if len(timeouts) > 1 || fixed {
79+
c.timeouts = getFixedTimeouts(timeouts)
80+
} else {
81+
c.timeouts = getDynamicTimeouts(timeouts[0])
82+
}
7883
return c
7984
}
8085

@@ -206,6 +211,7 @@ func NewClient(clientURL string) *Client {
206211
minWait: time.Second,
207212
log: utils.NewNopZapLogger(),
208213
listener: &SelectiveListener{},
214+
timeouts: getDefaultFixedTimeouts(),
209215
}
210216
}
211217

@@ -249,11 +255,15 @@ func (c *Client) get(ctx context.Context, queryURL string) (io.ReadCloser, error
249255
req.Header.Set("X-Throttling-Bypass", c.apiKey)
250256
}
251257

258+
c.client.Timeout = c.timeouts.GetCurrentTimeout()
252259
reqTimer := time.Now()
253260
res, err = c.client.Do(req)
261+
tooManyRequests := false
254262
if err == nil {
255263
c.listener.OnResponse(req.URL.Path, res.StatusCode, time.Since(reqTimer))
264+
tooManyRequests = res.StatusCode == http.StatusTooManyRequests
256265
if res.StatusCode == http.StatusOK {
266+
c.timeouts.DecreaseTimeout()
257267
return res.Body, nil
258268
} else {
259269
err = errors.New(res.Status)
@@ -262,11 +272,24 @@ func (c *Client) get(ctx context.Context, queryURL string) (io.ReadCloser, error
262272
res.Body.Close()
263273
}
264274

275+
if !tooManyRequests {
276+
c.timeouts.IncreaseTimeout()
277+
}
278+
265279
if wait < c.minWait {
266280
wait = c.minWait
267281
}
268-
wait = min(c.backoff(wait), c.maxWait)
269-
c.log.Debugw("Failed query to feeder, retrying...", "req", req.URL.String(), "retryAfter", wait.String(), "err", err)
282+
wait = c.backoff(wait)
283+
if wait > c.maxWait {
284+
wait = c.maxWait
285+
}
286+
287+
c.log.Debugw("Failed query to feeder, retrying...",
288+
"req", req.URL.String(),
289+
"retryAfter", wait.String(),
290+
"err", err,
291+
"newHTTPTimeout", c.timeouts.GetCurrentTimeout().String(),
292+
)
270293
}
271294
}
272295
return nil, err

clients/feeder/feeder_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,80 @@ func TestEventListener(t *testing.T) {
689689
require.NoError(t, err)
690690
require.True(t, isCalled)
691691
}
692+
693+
func TestClientRetryBehavior(t *testing.T) {
694+
t.Run("succeeds after retrying with increased timeout", func(t *testing.T) {
695+
requestCount := 0
696+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
697+
requestCount++
698+
699+
if requestCount == 2 || requestCount == 1 {
700+
time.Sleep(800 * time.Millisecond)
701+
w.WriteHeader(http.StatusGatewayTimeout)
702+
return
703+
}
704+
705+
_, err := w.Write([]byte(`{"block_hash": "0x123", "block_number": 1}`))
706+
if err != nil {
707+
panic("TestClientRetryBehavior: write error")
708+
}
709+
}))
710+
defer srv.Close()
711+
712+
client := feeder.NewClient(srv.URL).
713+
WithTimeouts([]time.Duration{250 * time.Millisecond, 750 * time.Millisecond, 2 * time.Second}, false).
714+
WithMaxRetries(2).
715+
WithBackoff(feeder.NopBackoff)
716+
717+
block, err := client.Block(t.Context(), "1")
718+
require.NoError(t, err)
719+
require.NotNil(t, block)
720+
require.Equal(t, 3, requestCount)
721+
})
722+
723+
t.Run("fails when max retries exceeded", func(t *testing.T) {
724+
requestCount := 0
725+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
726+
requestCount++
727+
time.Sleep(300 * time.Millisecond)
728+
w.WriteHeader(http.StatusGatewayTimeout)
729+
}))
730+
defer srv.Close()
731+
732+
client := feeder.NewClient(srv.URL).
733+
WithTimeouts([]time.Duration{250 * time.Millisecond}, false).
734+
WithMaxRetries(2).
735+
WithBackoff(feeder.NopBackoff)
736+
737+
_, err := client.Block(t.Context(), "1")
738+
require.Error(t, err)
739+
require.Equal(t, 3, requestCount)
740+
})
741+
742+
t.Run("stops retrying on success", func(t *testing.T) {
743+
requestCount := 0
744+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
745+
requestCount++
746+
if requestCount == 1 {
747+
time.Sleep(300 * time.Millisecond)
748+
w.WriteHeader(http.StatusGatewayTimeout)
749+
return
750+
}
751+
_, err := w.Write([]byte(`{"block_hash": "0x123", "block_number": 1}`))
752+
if err != nil {
753+
panic("TestClientRetryBehavior: write error")
754+
}
755+
}))
756+
defer srv.Close()
757+
758+
client := feeder.NewClient(srv.URL).
759+
WithTimeouts([]time.Duration{250 * time.Millisecond, 750 * time.Millisecond}, false).
760+
WithMaxRetries(1).
761+
WithBackoff(feeder.NopBackoff)
762+
763+
block, err := client.Block(t.Context(), "1")
764+
require.NoError(t, err)
765+
require.NotNil(t, block)
766+
require.Equal(t, 2, requestCount)
767+
})
768+
}

clients/feeder/timeouts.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// timeouts.go implements adaptive timeout management for HTTP requests to Starknet nodes.
2+
// This file handles dynamic timeout adjustments based on request performance, automatically
3+
// scaling timeouts up or down depending on success/failure rates.
4+
5+
package feeder
6+
7+
import (
8+
"fmt"
9+
"math"
10+
"strings"
11+
"sync"
12+
"time"
13+
)
14+
15+
const (
16+
growthFactorFast = 2
17+
growthFactorMedium = 1.5
18+
growthFactorSlow = 1.2
19+
fastGrowThreshold = 1 * time.Minute
20+
mediumGrowThreshold = 2 * time.Minute
21+
timeoutsCount = 30
22+
DefaultTimeouts = "5s"
23+
)
24+
25+
type Timeouts struct {
26+
timeouts []time.Duration
27+
curTimeout int
28+
mu sync.RWMutex
29+
}
30+
31+
func (t *Timeouts) GetCurrentTimeout() time.Duration {
32+
t.mu.RLock()
33+
defer t.mu.RUnlock()
34+
return t.timeouts[t.curTimeout]
35+
}
36+
37+
func (t *Timeouts) DecreaseTimeout() {
38+
t.mu.Lock()
39+
defer t.mu.Unlock()
40+
if t.curTimeout > 0 {
41+
t.curTimeout--
42+
}
43+
}
44+
45+
func (t *Timeouts) IncreaseTimeout() {
46+
t.mu.Lock()
47+
defer t.mu.Unlock()
48+
t.curTimeout++
49+
if t.curTimeout >= len(t.timeouts) {
50+
t.curTimeout = len(t.timeouts) - 1
51+
}
52+
}
53+
54+
func (t *Timeouts) String() string {
55+
t.mu.RLock()
56+
defer t.mu.RUnlock()
57+
timeouts := make([]string, len(t.timeouts))
58+
for i, t := range t.timeouts {
59+
timeouts[i] = t.String()
60+
}
61+
return strings.Join(timeouts, ",")
62+
}
63+
64+
// timeoutsListFromNumber generates a list of timeouts based on the initial timeout and the number of retries.
65+
// The list is generated using a geometric progression with a growth factor of 2 for the first 1 minute,
66+
// 1.5 for the next 1 minute, and 1.2 for the rest.
67+
func timeoutsListFromNumber(initial time.Duration) []time.Duration {
68+
timeouts := make([]time.Duration, timeoutsCount)
69+
timeouts[0] = initial
70+
71+
for i := 1; i < timeoutsCount; i++ {
72+
prev := timeouts[i-1]
73+
next := increaseDuration(prev)
74+
timeouts[i] = next
75+
}
76+
77+
return timeouts
78+
}
79+
80+
func increaseDuration(prev time.Duration) time.Duration {
81+
var next time.Duration
82+
if prev < fastGrowThreshold {
83+
seconds := math.Ceil(float64(prev.Seconds()) * growthFactorFast)
84+
return time.Duration(seconds) * time.Second
85+
} else if prev < mediumGrowThreshold {
86+
seconds := math.Ceil(float64(prev.Seconds()) * growthFactorMedium)
87+
return time.Duration(seconds) * time.Second
88+
} else {
89+
seconds := math.Ceil(float64(prev.Seconds()) * growthFactorSlow)
90+
next = time.Duration(seconds) * time.Second
91+
}
92+
return next
93+
}
94+
95+
func getDynamicTimeouts(timeouts time.Duration) Timeouts {
96+
return Timeouts{
97+
curTimeout: 0,
98+
timeouts: timeoutsListFromNumber(timeouts),
99+
}
100+
}
101+
102+
func getFixedTimeouts(timeouts []time.Duration) Timeouts {
103+
return Timeouts{
104+
curTimeout: 0,
105+
timeouts: timeouts,
106+
}
107+
}
108+
109+
func getDefaultFixedTimeouts() Timeouts {
110+
timeouts, _, _ := ParseTimeouts(DefaultTimeouts)
111+
return getFixedTimeouts(timeouts)
112+
}
113+
114+
// ParseTimeouts parses a comma-separated string of duration values into a slice of time.Duration.
115+
// Returns:
116+
// - the parsed timeout values
117+
// - if a fixed or dynamic timeouts should be used
118+
// - an error in case the string cannot be parsed
119+
func ParseTimeouts(value string) ([]time.Duration, bool, error) {
120+
if value == "" {
121+
return nil, true, fmt.Errorf("timeouts are not set")
122+
}
123+
124+
values := strings.Split(value, ",")
125+
for i := range values {
126+
values[i] = strings.TrimSpace(values[i])
127+
}
128+
129+
hasTrailingComma := len(values) > 0 && values[len(values)-1] == ""
130+
if hasTrailingComma {
131+
values = values[:len(values)-1]
132+
}
133+
134+
timeouts := make([]time.Duration, 0, len(values))
135+
for i, v := range values {
136+
d, err := time.ParseDuration(v)
137+
if err != nil {
138+
return nil, false, fmt.Errorf("parsing timeout parameter number %d: %v", i+1, err)
139+
}
140+
timeouts = append(timeouts, d)
141+
}
142+
if len(timeouts) == 1 && hasTrailingComma {
143+
return timeouts, true, nil
144+
}
145+
146+
for i := 1; i < len(timeouts); i++ {
147+
if timeouts[i] <= timeouts[i-1] {
148+
return nil, false, fmt.Errorf("timeout values must be in ascending order, got %v <= %v", timeouts[i], timeouts[i-1])
149+
}
150+
}
151+
152+
if len(timeouts) > timeoutsCount {
153+
return nil, false, fmt.Errorf("exceeded max amount of allowed timeout parameters. Set %d but max is %d", len(timeouts), timeoutsCount)
154+
}
155+
return timeouts, false, nil
156+
}

0 commit comments

Comments
 (0)