-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclient.go
325 lines (269 loc) · 9.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package web3protocol
import (
"time"
"sync"
// "fmt"
"io"
"context"
"github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/ethclient"
golanglru2 "github.com/hashicorp/golang-lru/v2/expirable"
)
type Client struct {
Config *Config
Logger *logrus.Logger
// The list of RPCs per chain. They are filled with the content of the config,
// and contains their state (available, too many requests, unauthorized)
Rpcs map[int][]*Rpc
RpcsMutex sync.RWMutex
// The request queue, and their channel to notify the caller with the response
// Used to aggregate requests to the same URL
RequestQueue map[RequestQueueKey][]chan *RequestQueueResponse
RequestQueueMutex sync.Mutex
// One worker per request, limited to X workers
// RequestWorkerSemaphone chan struct{}
// Cache for domain name resolution
DomainNameResolutionCache *localCache
// Cache for resolve mode determination
ResolveModeCache *golanglru2.LRU[ResolveModeCacheKey, ResolveMode]
// ERC-7774: Resource request mode : Cache invalidation tracking
ResourceRequestCachingTracker ResourceRequestCachingTracker
}
// Requests are grouped by URL and some specific request headers
type RequestQueueKey struct {
// The URL of the request
URL string
// Some specific request headers, which may alter the response
HttpHeaderIfNoneMatch string
HttpHeaderIfModifiedSince string
HttpHeaderAcceptEncoding string
}
// The response to a request queue entry
type RequestQueueResponse struct {
// The response itself (includes plenty of information of the request processing)
fetchedUrl *FetchedWeb3URL
// The error
// Should usually be of Web3ProtocolError type, which include the HTTP status code to return to
// the client, and additional information about the issue. But it can be a generic error.
err error
}
// A RPC, containing its URL and state
type Rpc struct {
// The RPC config
Config ChainRPCConfig
// The state of the RPC
State RpcState
// We authorize X parralel requests to the RPC
RequestSemaphone chan struct{}
}
type RpcState string
const (
RpcStateAvailable RpcState = "available"
RpcStateTooManyRequests RpcState = "tooManyRequests"
RpcStateUnauthorized RpcState = "unauthorized"
)
/**
* You'll need to instantiate a client to make calls.
*/
func NewClient(config *Config) (client *Client) {
client = &Client{
Config: config,
Logger: logrus.New(),
RequestQueue: make(map[RequestQueueKey][]chan *RequestQueueResponse),
// RequestWorkerSemaphone: make(chan struct{}, 5),
DomainNameResolutionCache: newLocalCache(time.Duration(config.NameAddrCacheDurationInMinutes)*time.Minute, 10*time.Minute),
ResolveModeCache: golanglru2.NewLRU[ResolveModeCacheKey, ResolveMode](1000, nil, time.Duration(0)),
}
client.ResourceRequestCachingTracker = NewResourceRequestCachingTracker(client)
// Initialize the RPCs
client.Rpcs = make(map[int][]*Rpc)
for chainId, chainConfig := range config.Chains {
client.Rpcs[chainId] = make([]*Rpc, 0)
// Max number of concurrent requests : default to 5
maxNumberOfConcurrentRequests := chainConfig.RPC.MaxConcurrentRequests
if maxNumberOfConcurrentRequests == 0 {
maxNumberOfConcurrentRequests = 5
}
client.Rpcs[chainId] = append(client.Rpcs[chainId], &Rpc{
Config: chainConfig.RPC,
State: RpcStateAvailable,
RequestSemaphone: make(chan struct{}, maxNumberOfConcurrentRequests),
})
}
return
}
/**
* The main function of the package.
* For a given full web3:// url ("web3://xxxx"), returns a structure containing
* the bytes output and the HTTP code and headers, as well as plenty of informations on
* how the processing was done.
*/
func (client *Client) FetchUrl(url string, httpHeaders map[string]string) (fetchedUrl *FetchedWeb3URL, err error) {
// Prepare the request queue key
requestQueueKey := RequestQueueKey{
URL: url,
HttpHeaderIfNoneMatch: httpHeaders["If-None-Match"],
HttpHeaderIfModifiedSince: httpHeaders["If-Modified-Since"],
HttpHeaderAcceptEncoding: httpHeaders["Accept-Encoding"],
}
// Prepare the request queue response channel
requestQueueResponseChannel := make(chan *RequestQueueResponse)
// Add the request to the queue
var requestAlreadyInQueue bool
client.RequestQueueMutex.Lock()
if _, requestAlreadyInQueue = client.RequestQueue[requestQueueKey]; !requestAlreadyInQueue {
client.RequestQueue[requestQueueKey] = make([]chan *RequestQueueResponse, 0)
}
client.RequestQueue[requestQueueKey] = append(client.RequestQueue[requestQueueKey], requestQueueResponseChannel)
client.RequestQueueMutex.Unlock()
// If the request was not already in the queue, start a worker to process it
if !requestAlreadyInQueue {
// client.RequestWorkerSemaphone <- struct{}{}
go client.requestWorker(requestQueueKey)
}
// Wait for the response
requestQueueResponse := <-requestQueueResponseChannel
// Return the response
return requestQueueResponse.fetchedUrl, requestQueueResponse.err
}
func (client *Client) requestWorker(requestQueueKey RequestQueueKey) {
defer func() {
// // Release the worker semaphore
// <-client.RequestWorkerSemaphone
}()
client.Logger.WithFields(logrus.Fields{
"worker": "requestWorker",
"url": requestQueueKey.URL,
}).Debug("Starting request worker")
// Fetch the URL
headers := map[string]string{}
if requestQueueKey.HttpHeaderIfNoneMatch != "" {
headers["If-None-Match"] = requestQueueKey.HttpHeaderIfNoneMatch
}
if requestQueueKey.HttpHeaderIfModifiedSince != "" {
headers["If-Modified-Since"] = requestQueueKey.HttpHeaderIfModifiedSince
}
if requestQueueKey.HttpHeaderAcceptEncoding != "" {
headers["Accept-Encoding"] = requestQueueKey.HttpHeaderAcceptEncoding
}
fetchedUrl, err := client.FetchUrlDirect(requestQueueKey.URL, headers)
// Send the result to all the requesters
// We need to duplicate the FetchedWeb3URL response, and update its Output field
// (each receiver will read the Output field, and we can't have multiple readers on the same io.Reader)
client.RequestQueueMutex.Lock()
// Create the shared output
sharedOutput := &SharedOutput{
OriginalOutput: fetchedUrl.Output,
FetchedBytes: make([]byte, 0),
IsEnded: false,
Mutex: sync.Mutex{},
}
// For each requester, send the response
for _, requestQueueResponseChannel := range client.RequestQueue[requestQueueKey] {
// Duplicate the response
fetchedUrlCopy := fetchedUrl
fetchedUrlCopy.Output = &SharedOutputReader{
SharedOutput: sharedOutput,
Position: 0,
}
// Prepare the response
requestQueueResponse := &RequestQueueResponse{
fetchedUrl: &fetchedUrlCopy,
err: err,
}
requestQueueResponseChannel <- requestQueueResponse
}
// Delete the request from the queue
delete(client.RequestQueue, requestQueueKey)
client.RequestQueueMutex.Unlock()
}
// When a request is shared between multiple receivers, we need to duplicate the response
// This contains the original output, and ongoing fetched bytes
type SharedOutput struct {
// The original output
OriginalOutput io.Reader
// The fetched byte
FetchedBytes []byte
// Is ended?
IsEnded bool
Mutex sync.Mutex
}
// This is a io.Reader that reads from a SharedOutput
type SharedOutputReader struct {
// The shared output
SharedOutput *SharedOutput
// The current position in the output
Position int
}
func (r *SharedOutputReader) Read(p []byte) (n int, err error) {
r.SharedOutput.Mutex.Lock()
defer r.SharedOutput.Mutex.Unlock()
// If trying to read after the end of the already fetched bytes, try to fetch more
if r.Position + len(p) > len(r.SharedOutput.FetchedBytes) && !r.SharedOutput.IsEnded {
// Make a buffer the same size as p
buffer := make([]byte, len(p))
// Read from the original output
n, oerr := r.SharedOutput.OriginalOutput.Read(buffer)
// Copy the read bytes to the fetched bytes
r.SharedOutput.FetchedBytes = append(r.SharedOutput.FetchedBytes, buffer[:n]...)
// If we reached the end of the original output, set the flag
if oerr == io.EOF {
r.SharedOutput.IsEnded = true
}
}
// Copy as much bytes as possible
n = copy(p, r.SharedOutput.FetchedBytes[r.Position:])
r.Position += n
// If we reached the end of the fetched bytes, and the original output is ended, return EOF
if r.Position == len(r.SharedOutput.FetchedBytes) && r.SharedOutput.IsEnded {
err = io.EOF
}
return
}
// When a RPC is returning 429, we start a worker to check if it is available again
func (client *Client) CheckTooManyRequestsStateWorker(rpc *Rpc) {
client.RpcsMutex.RLock()
rpcState := rpc.State
client.RpcsMutex.RUnlock()
// If we are not in too many requests state, return (we should enter this function only if we are in too many requests state)
if rpcState != RpcStateTooManyRequests {
return
}
client.Logger.WithFields(logrus.Fields{
"worker": "rpcStateWorker",
"url": rpc.Config.Url,
}).Info("RPC is returning 429, starting worker to check if it is available again")
for {
// Sleep for a while
time.Sleep(4 * time.Second)
// Create connection
ethClient, err := ethclient.Dial(rpc.Config.Url)
if err != nil {
client.Logger.WithFields(logrus.Fields{
"worker": "rpcStateWorker",
"url": rpc.Config.Url,
}).Error("Failed to connect to RPC: " + err.Error())
continue;
}
defer ethClient.Close()
// Attempt to fetch the block number
_, err = ethClient.BlockNumber(context.Background())
if err != nil {
client.Logger.WithFields(logrus.Fields{
"worker": "rpcStateWorker",
"url": rpc.Config.Url,
}).Error("Failed to fetch block number: " + err.Error())
continue;
}
// If we reached this point, the RPC is available again
client.RpcsMutex.Lock()
rpc.State = RpcStateAvailable
client.RpcsMutex.Unlock()
client.Logger.WithFields(logrus.Fields{
"worker": "rpcStateWorker",
"url": rpc.Config.Url,
}).Info("RPC is available again")
// Exit the loop
break
}
}