You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/iterable-mapper.ts
+74-26Lines changed: 74 additions & 26 deletions
Original file line number
Diff line number
Diff line change
@@ -82,18 +82,41 @@ type NewElementOrError<NewElement = unknown> = {
82
82
};
83
83
84
84
/**
85
-
* Iterates over a source iterable with specified concurrency,
85
+
* Iterates over a source iterable / generator with specified `concurrency`,
86
86
* calling the `mapper` on each iterated item, and storing the
87
-
* `mapper` result in a queue of specified max size, before
87
+
* `mapper` result in a queue of `maxUnread` size, before
88
88
* being iterated / read by the caller.
89
89
*
90
90
* @remarks
91
91
*
92
-
* Optimized for I/O-bound operations (e.g., fetching data, reading files) rather than
93
-
* CPU-intensive tasks. The concurrent processing with backpressure ensures efficient
94
-
* resource utilization without overwhelming memory or system resources.
92
+
* ### Typical Use Case
93
+
* - Prefetching items from an async I/O source
94
+
* - In the simple sequential (`concurrency: 1`) case, allows items to be prefetched async, preserving order, while caller processes an item
95
+
* - Can allow parallel prefetches for sources that allow for out of order reads (`concurrency: 2+`)
96
+
* - Prevents the producer from racing ahead of the consumer if `maxUnread` is reached
97
+
*
98
+
* ### Error Handling
99
+
* The mapper should ideally handle all errors internally to enable error handling
100
+
* closest to where they occur. However, if errors do escape the mapper:
101
+
*
102
+
* When `stopOnMapperError` is true (default):
103
+
* - First error immediately stops processing
104
+
* - Error is thrown from the `AsyncIterator`'s next() call
105
+
*
106
+
* When `stopOnMapperError` is false:
107
+
* - Processing continues despite errors
108
+
* - All errors are collected and thrown together
109
+
* - Errors are thrown as `AggregateError` after all items complete
110
+
*
111
+
* ### Usage
112
+
* - Items are exposed to the `mapper` via an iterator or async iterator (this includes generator and async generator functions)
113
+
* - IMPORTANT: `mapper` method not be invoked when `maxUnread` is reached, until items are consumed
114
+
* - The iterable will set `done` when the `input` has indicated `done` and all `mapper` promises have resolved
115
+
*
116
+
* @example
95
117
*
96
118
* Consider a typical processing loop without IterableMapper:
119
+
*
97
120
* ```typescript
98
121
* const source = new SomeSource();
99
122
* const sourceIds = [1, 2,... 1000];
@@ -109,43 +132,81 @@ type NewElementOrError<NewElement = unknown> = {
109
132
* We could prefetch the next read (300ms) while processing (20ms) and writing (500ms),
110
133
* without changing the order of reads or writes.
111
134
*
112
-
* Using IterableMapper as a prefetcher:
135
+
* @example
136
+
*
137
+
* Using `IterableMapper` as a prefetcher and blocking writes, without changing the order of reads or writes:
138
+
*
113
139
* ```typescript
114
140
* const source = new SomeSource();
115
141
* const sourceIds = [1, 2,... 1000];
116
142
* // Pre-reads up to 8 items serially and releases in sequential order
117
143
* const sourcePrefetcher = new IterableMapper(sourceIds,
118
144
* async (sourceId) => source.read(sourceId),
119
-
* { concurrency: 1 }
145
+
* { concurrency: 1, maxUnread: 10 }
120
146
* );
121
147
* const sink = new SomeSink();
122
-
* for await (const item of sourcePrefetcher) {
148
+
* for await (const item of sourcePrefetcher) { // may not block for fast sources
123
149
* const outputItem = doSomeOperation(item); // takes 20 ms of CPU
124
150
* await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
125
151
* }
126
152
* ```
127
153
*
128
154
* This reduces iteration time to 520ms by overlapping reads with processing/writing.
129
155
*
130
-
* For maximum throughput, make the writes concurrent with
131
-
* IterableQueueMapper (to iterate results with backpressure when too many unread items) or
132
-
* IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure):
156
+
* @example
157
+
*
158
+
* Using `IterableMapper` as a prefetcher with background writes, without changing the order of reads or writes:
133
159
*
134
160
* ```typescript
135
161
* const source = new SomeSource();
136
162
* const sourceIds = [1, 2,... 1000];
137
163
* const sourcePrefetcher = new IterableMapper(sourceIds,
138
164
* async (sourceId) => source.read(sourceId),
165
+
* { concurrency: 1, maxUnread: 10 }
166
+
* );
167
+
* const sink = new SomeSink();
168
+
* const flusher = new IterableQueueMapperSimple(
169
+
* async (outputItem) => sink.write(outputItem),
139
170
* { concurrency: 1 }
140
171
* );
172
+
* for await (const item of sourcePrefetcher) { // may not block for fast sources
173
+
* const outputItem = doSomeOperation(item); // takes 20 ms of CPU
174
+
* await flusher.enqueue(outputItem); // will periodically block for portion of write time
175
+
* }
176
+
* // Wait for all writes to complete
177
+
* await flusher.onIdle();
178
+
* // Check for errors
179
+
* if (flusher.errors.length > 0) {
180
+
* // ...
181
+
* }
182
+
* ```
183
+
*
184
+
* This reduces iteration time to about to `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))
185
+
* by overlapping reads and writes with the CPU processing step.
186
+
* In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms.
187
+
* In cases where the CPU usage time is higher, the impact can be greater.
188
+
*
189
+
* @example
190
+
*
191
+
* For maximum throughput, allow out of order reads and writes with
192
+
* `IterableQueueMapper` (to iterate results with backpressure when too many unread items) or
193
+
* `IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):
194
+
*
195
+
* ```typescript
196
+
* const source = new SomeSource();
197
+
* const sourceIds = [1, 2,... 1000];
198
+
* const sourcePrefetcher = new IterableMapper(sourceIds,
199
+
* async (sourceId) => source.read(sourceId),
200
+
* { concurrency: 10, maxUnread: 20 }
201
+
* );
141
202
* const sink = new SomeSink();
142
203
* const flusher = new IterableQueueMapperSimple(
143
204
* async (outputItem) => sink.write(outputItem),
144
205
* { concurrency: 10 }
145
206
* );
146
-
* for await (const item of sourcePrefetcher) {
207
+
* for await (const item of sourcePrefetcher) { // typically will not block
147
208
* const outputItem = doSomeOperation(item); // takes 20 ms of CPU
148
-
* await flusher.enqueue(outputItem); // usually takes no time
209
+
* await flusher.enqueue(outputItem); // typically will not block
149
210
* }
150
211
* // Wait for all writes to complete
151
212
* await flusher.onIdle();
@@ -184,19 +245,6 @@ export class IterableMapper<Element, NewElement> implements AsyncIterable<NewEle
184
245
* @param mapper Function called for every item in `input`. Returns a `Promise` or value.
185
246
* @param options IterableMapper options
186
247
*
187
-
* Error Handling:
188
-
* The mapper should ideally handle all errors internally to enable error handling
189
-
* closest to where they occur. However, if errors do escape the mapper:
190
-
*
191
-
* When stopOnMapperError is true (default):
192
-
* - First error immediately stops processing
193
-
* - Error is thrown from the AsyncIterator's next() call
194
-
*
195
-
* When stopOnMapperError is false:
196
-
* - Processing continues despite errors
197
-
* - All errors are collected and thrown together
198
-
* - Errors are thrown as AggregateError after all items complete
199
-
*
200
248
* @see {@link IterableQueueMapper} for full class documentation
* Number of concurrently pending promises returned by `mapper`.
10
-
*
11
-
* Must be an integer from 1 and up or `Infinity`, must be <= `maxUnread`.
12
-
*
13
-
* @default 4
14
-
*/
15
-
readonlyconcurrency?: number;
16
-
17
-
/**
18
-
* Number of pending unread iterable items.
19
-
*
20
-
* Must be an integer from 1 and up or `Infinity`, must be >= `concurrency`.
21
-
*
22
-
* @default 8
23
-
*/
24
-
readonlymaxUnread?: number;
25
-
26
-
/**
27
-
* When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises.
0 commit comments