You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+48-8Lines changed: 48 additions & 8 deletions
Original file line number
Diff line number
Diff line change
@@ -10,7 +10,7 @@ These classes will typically be helpful in batch or queue consumers, not as much
10
10
11
11
# Example Usage Scenarios
12
12
13
-
Consider a typical processing loop without IterableMapper:
13
+
## Typical Processing Loop without `IterableMapper`
14
14
15
15
```typescript
16
16
const source =newSomeSource();
@@ -25,48 +25,88 @@ for (const sourceId of sourceIds) {
25
25
26
26
Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.
27
27
28
-
Using IterableMapper as a prefetcher:
28
+
## Using `IterableMapper` as Prefetcher with Blocking Sequential Writes
29
+
30
+
`concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).
29
31
30
32
```typescript
31
33
const source =newSomeSource();
32
34
const sourceIds = [1, 2,...1000];
33
35
// Pre-reads up to 8 items serially and releases in sequential order
forawait (const item ofsourcePrefetcher) {// may not block for fast sources
40
42
const outputItem =doSomeOperation(item); // takes 20 ms of CPU
41
43
awaitsink.write(outputItem); // takes 500 ms of I/O wait, no CPU
42
44
}
43
45
```
44
46
45
47
This reduces iteration time to 520ms by overlapping reads with processing/writing.
46
48
47
-
For maximum throughput, make the writes concurrent with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure):
49
+
## Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple`
50
+
51
+
`concurrency: 1` on the prefetcher preserves the order of the reads.
52
+
`concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.
forawait (const item ofsourcePrefetcher) { // may not block for fast sources
67
+
const outputItem =doSomeOperation(item); // takes 20 ms of CPU
68
+
awaitflusher.enqueue(outputItem); // will periodically block for portion of write time
69
+
}
70
+
// Wait for all writes to complete
71
+
awaitflusher.onIdle();
72
+
// Check for errors
73
+
if (flusher.errors.length>0) {
74
+
// ...
75
+
}
76
+
```
77
+
78
+
This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))`
79
+
by overlapping reads and writes with the CPU processing step.
80
+
In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms.
81
+
In cases where the CPU usage time is higher, the impact can be greater.
82
+
83
+
## Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple`
84
+
85
+
For maximum throughput, allow out of order reads and writes with
86
+
`IterableQueueMapper` (to iterate results with backpressure when too many unread items) or
87
+
`IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):
Copy file name to clipboardExpand all lines: src/iterable-mapper.ts
+11-4Lines changed: 11 additions & 4 deletions
Original file line number
Diff line number
Diff line change
@@ -115,7 +115,7 @@ type NewElementOrError<NewElement = unknown> = {
115
115
*
116
116
* @example
117
117
*
118
-
* Consider a typical processing loop without IterableMapper:
118
+
* ### Typical Processing Loop without `IterableMapper`
119
119
*
120
120
* ```typescript
121
121
* const source = new SomeSource();
@@ -134,7 +134,9 @@ type NewElementOrError<NewElement = unknown> = {
134
134
*
135
135
* @example
136
136
*
137
-
* Using `IterableMapper` as a prefetcher and blocking writes, without changing the order of reads or writes:
137
+
* ### Using `IterableMapper` as Prefetcher with Blocking Sequential Writes
138
+
*
139
+
* `concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).
138
140
*
139
141
* ```typescript
140
142
* const source = new SomeSource();
@@ -155,7 +157,10 @@ type NewElementOrError<NewElement = unknown> = {
155
157
*
156
158
* @example
157
159
*
158
-
* Using `IterableMapper` as a prefetcher with background writes, without changing the order of reads or writes:
160
+
* ### Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple`
161
+
*
162
+
* `concurrency: 1` on the prefetcher preserves the order of the reads.
163
+
* `concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.
159
164
*
160
165
* ```typescript
161
166
* const source = new SomeSource();
@@ -181,13 +186,15 @@ type NewElementOrError<NewElement = unknown> = {
181
186
* }
182
187
* ```
183
188
*
184
-
* This reduces iteration time to about to `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))
189
+
* This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))`
185
190
* by overlapping reads and writes with the CPU processing step.
186
191
* In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms.
187
192
* In cases where the CPU usage time is higher, the impact can be greater.
188
193
*
189
194
* @example
190
195
*
196
+
* ### Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple`
197
+
*
191
198
* For maximum throughput, allow out of order reads and writes with
192
199
* `IterableQueueMapper` (to iterate results with backpressure when too many unread items) or
193
200
* `IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):
0 commit comments