17
17
package enode
18
18
19
19
import (
20
+ "context"
20
21
"sync"
21
22
"time"
22
23
)
@@ -59,6 +60,11 @@ func (it sourceIter) NodeSource() string {
59
60
return it .name
60
61
}
61
62
63
+ type iteratorItem struct {
64
+ n * Node
65
+ source string
66
+ }
67
+
62
68
// ReadNodes reads at most n nodes from the given iterator. The return value contains no
63
69
// duplicates and no nil values. To prevent looping indefinitely for small repeating node
64
70
// sequences, this function calls Next at most n times.
@@ -152,6 +158,149 @@ func (f *filterIter) Next() bool {
152
158
return false
153
159
}
154
160
161
+ // asyncFilterIter wraps an iterator such that Next only returns nodes for which
162
+ // the 'check' function returns a (possibly modified) node.
163
+ type asyncFilterIter struct {
164
+ it SourceIterator // the iterator to filter
165
+ slots chan struct {} // the slots for parallel checking
166
+ passed chan iteratorItem // channel to collect passed nodes
167
+ cur iteratorItem // buffer to serve the Node call
168
+ cancel context.CancelFunc
169
+ closeOnce sync.Once
170
+ }
171
+
172
+ type AsyncFilterFunc func (context.Context , * Node ) * Node
173
+
174
+ // AsyncFilter creates an iterator which checks nodes in parallel.
175
+ // The 'check' function is called on multiple goroutines to filter each node
176
+ // from the upstream iterator. When check returns nil, the node will be skipped.
177
+ // It can also return a new node to be returned by the iterator instead of the .
178
+ func AsyncFilter (it Iterator , check AsyncFilterFunc , workers int ) Iterator {
179
+ f := & asyncFilterIter {
180
+ it : ensureSourceIter (it ),
181
+ slots : make (chan struct {}, workers + 1 ),
182
+ passed : make (chan iteratorItem ),
183
+ }
184
+ for range cap (f .slots ) {
185
+ f .slots <- struct {}{}
186
+ }
187
+ ctx , cancel := context .WithCancel (context .Background ())
188
+ f .cancel = cancel
189
+
190
+ go func () {
191
+ select {
192
+ case <- ctx .Done ():
193
+ return
194
+ case <- f .slots :
195
+ }
196
+ // read from the iterator and start checking nodes in parallel
197
+ // when a node is checked, it will be sent to the passed channel
198
+ // and the slot will be released
199
+ for f .it .Next () {
200
+ node := f .it .Node ()
201
+ nodeSource := f .it .NodeSource ()
202
+
203
+ // check the node async, in a separate goroutine
204
+ <- f .slots
205
+ go func () {
206
+ if nn := check (ctx , node ); nn != nil {
207
+ item := iteratorItem {nn , nodeSource }
208
+ select {
209
+ case f .passed <- item :
210
+ case <- ctx .Done (): // bale out if downstream is already closed and not calling Next
211
+ }
212
+ }
213
+ f .slots <- struct {}{}
214
+ }()
215
+ }
216
+ // the iterator has ended
217
+ f .slots <- struct {}{}
218
+ }()
219
+
220
+ return f
221
+ }
222
+
223
+ // Next blocks until a node is available or the iterator is closed.
224
+ func (f * asyncFilterIter ) Next () bool {
225
+ var ok bool
226
+ f .cur , ok = <- f .passed
227
+ return ok
228
+ }
229
+
230
+ // Node returns the current node.
231
+ func (f * asyncFilterIter ) Node () * Node {
232
+ return f .cur .n
233
+ }
234
+
235
+ // NodeSource implements IteratorSource.
236
+ func (f * asyncFilterIter ) NodeSource () string {
237
+ return f .cur .source
238
+ }
239
+
240
+ // Close ends the iterator, also closing the wrapped iterator.
241
+ func (f * asyncFilterIter ) Close () {
242
+ f .closeOnce .Do (func () {
243
+ f .it .Close ()
244
+ f .cancel ()
245
+ for range cap (f .slots ) {
246
+ <- f .slots
247
+ }
248
+ close (f .slots )
249
+ close (f .passed )
250
+ })
251
+ }
252
+
253
+ // bufferIter wraps an iterator and buffers the nodes it returns.
254
+ // The buffer is pre-filled with the given size from the wrapped iterator.
255
+ type bufferIter struct {
256
+ it SourceIterator
257
+ buffer chan iteratorItem
258
+ head iteratorItem
259
+ closeOnce sync.Once
260
+ }
261
+
262
+ // NewBufferIter creates a new pre-fetch buffer of a given size.
263
+ func NewBufferIter (it Iterator , size int ) Iterator {
264
+ b := bufferIter {
265
+ it : ensureSourceIter (it ),
266
+ buffer : make (chan iteratorItem , size ),
267
+ }
268
+
269
+ go func () {
270
+ // if the wrapped iterator ends, the buffer content will still be served.
271
+ defer close (b .buffer )
272
+ // If instead the bufferIterator is closed, we bail out of the loop.
273
+ for b .it .Next () {
274
+ item := iteratorItem {b .it .Node (), b .it .NodeSource ()}
275
+ b .buffer <- item
276
+ }
277
+ }()
278
+ return & b
279
+ }
280
+
281
+ func (b * bufferIter ) Next () bool {
282
+ var ok bool
283
+ b .head , ok = <- b .buffer
284
+ return ok
285
+ }
286
+
287
+ func (b * bufferIter ) Node () * Node {
288
+ return b .head .n
289
+ }
290
+
291
+ func (b * bufferIter ) NodeSource () string {
292
+ return b .head .source
293
+ }
294
+
295
+ func (b * bufferIter ) Close () {
296
+ b .closeOnce .Do (func () {
297
+ b .it .Close ()
298
+ // Drain buffer and wait for the goroutine to end.
299
+ for range b .buffer {
300
+ }
301
+ })
302
+ }
303
+
155
304
// FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
156
305
// only when Close is called. Source iterators added via AddSource are removed from the
157
306
// mix when they end.
@@ -164,9 +313,9 @@ func (f *filterIter) Next() bool {
164
313
// It's safe to call AddSource and Close concurrently with Next.
165
314
type FairMix struct {
166
315
wg sync.WaitGroup
167
- fromAny chan mixItem
316
+ fromAny chan iteratorItem
168
317
timeout time.Duration
169
- cur mixItem
318
+ cur iteratorItem
170
319
171
320
mu sync.Mutex
172
321
closed chan struct {}
@@ -176,15 +325,10 @@ type FairMix struct {
176
325
177
326
type mixSource struct {
178
327
it SourceIterator
179
- next chan mixItem
328
+ next chan iteratorItem
180
329
timeout time.Duration
181
330
}
182
331
183
- type mixItem struct {
184
- n * Node
185
- source string
186
- }
187
-
188
332
// NewFairMix creates a mixer.
189
333
//
190
334
// The timeout specifies how long the mixer will wait for the next fairly-chosen source
@@ -193,7 +337,7 @@ type mixItem struct {
193
337
// timeout makes the mixer completely fair.
194
338
func NewFairMix (timeout time.Duration ) * FairMix {
195
339
m := & FairMix {
196
- fromAny : make (chan mixItem ),
340
+ fromAny : make (chan iteratorItem ),
197
341
closed : make (chan struct {}),
198
342
timeout : timeout ,
199
343
}
@@ -211,7 +355,7 @@ func (m *FairMix) AddSource(it Iterator) {
211
355
m .wg .Add (1 )
212
356
source := & mixSource {
213
357
it : ensureSourceIter (it ),
214
- next : make (chan mixItem ),
358
+ next : make (chan iteratorItem ),
215
359
timeout : m .timeout ,
216
360
}
217
361
m .sources = append (m .sources , source )
@@ -239,7 +383,7 @@ func (m *FairMix) Close() {
239
383
240
384
// Next returns a node from a random source.
241
385
func (m * FairMix ) Next () bool {
242
- m .cur = mixItem {}
386
+ m .cur = iteratorItem {}
243
387
244
388
for {
245
389
source := m .pickSource ()
@@ -327,7 +471,7 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
327
471
defer m .wg .Done ()
328
472
defer close (s .next )
329
473
for s .it .Next () {
330
- item := mixItem {s .it .Node (), s .it .NodeSource ()}
474
+ item := iteratorItem {s .it .Node (), s .it .NodeSource ()}
331
475
select {
332
476
case s .next <- item :
333
477
case m .fromAny <- item :
0 commit comments