diff --git a/README.md b/README.md index 21f6d02..44e388b 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ These classes will typically be helpful in batch or queue consumers, not as much # Example Usage Scenarios -Consider a typical processing loop without IterableMapper: +## Typical Processing Loop without `IterableMapper` ```typescript const source = new SomeSource(); @@ -25,7 +25,9 @@ for (const sourceId of sourceIds) { Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes. -Using IterableMapper as a prefetcher: +## Using `IterableMapper` as Prefetcher with Blocking Sequential Writes + +`concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged). ```typescript const source = new SomeSource(); @@ -33,10 +35,10 @@ const sourceIds = [1, 2,... 1000]; // Pre-reads up to 8 items serially and releases in sequential order const sourcePrefetcher = new IterableMapper(sourceIds, async (sourceId) => source.read(sourceId), - { concurrency: 1 } + { concurrency: 1, maxUnread: 10 } ); const sink = new SomeSink(); -for await (const item of sourcePrefetcher) { +for await (const item of sourcePrefetcher) { // may not block for fast sources const outputItem = doSomeOperation(item); // takes 20 ms of CPU await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU } @@ -44,29 +46,67 @@ for await (const item of sourcePrefetcher) { This reduces iteration time to 520ms by overlapping reads with processing/writing. -For maximum throughput, make the writes concurrent with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure): +## Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple` + +`concurrency: 1` on the prefetcher preserves the order of the reads. +`concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing. ```typescript const source = new SomeSource(); const sourceIds = [1, 2,... 1000]; const sourcePrefetcher = new IterableMapper(sourceIds, async (sourceId) => source.read(sourceId), + { concurrency: 1, maxUnread: 10 } +); +const sink = new SomeSink(); +const flusher = new IterableQueueMapperSimple( + async (outputItem) => sink.write(outputItem), { concurrency: 1 } ); +for await (const item of sourcePrefetcher) { // may not block for fast sources + const outputItem = doSomeOperation(item); // takes 20 ms of CPU + await flusher.enqueue(outputItem); // will periodically block for portion of write time +} +// Wait for all writes to complete +await flusher.onIdle(); +// Check for errors +if (flusher.errors.length > 0) { +// ... +} +``` + +This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))` +by overlapping reads and writes with the CPU processing step. +In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. +In cases where the CPU usage time is higher, the impact can be greater. + +## Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple` + +For maximum throughput, allow out of order reads and writes with +`IterableQueueMapper` (to iterate results with backpressure when too many unread items) or +`IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process): + +```typescript +const source = new SomeSource(); +const sourceIds = [1, 2,... 1000]; +const sourcePrefetcher = new IterableMapper(sourceIds, + async (sourceId) => source.read(sourceId), + { concurrency: 10, maxUnread: 20 } +); const sink = new SomeSink(); const flusher = new IterableQueueMapperSimple( async (outputItem) => sink.write(outputItem), { concurrency: 10 } ); -for await (const item of sourcePrefetcher) { +for await (const item of sourcePrefetcher) { // typically will not block const outputItem = doSomeOperation(item); // takes 20 ms of CPU - await flusher.enqueue(outputItem); // usually takes no time + await flusher.enqueue(outputItem); // typically will not block } // Wait for all writes to complete await flusher.onIdle(); // Check for errors if (flusher.errors.length > 0) { - // ... + // ... } ``` diff --git a/src/iterable-mapper.ts b/src/iterable-mapper.ts index 15a8848..1e2f7b4 100644 --- a/src/iterable-mapper.ts +++ b/src/iterable-mapper.ts @@ -115,7 +115,7 @@ type NewElementOrError = { * * @example * - * Consider a typical processing loop without IterableMapper: + * ### Typical Processing Loop without `IterableMapper` * * ```typescript * const source = new SomeSource(); @@ -134,7 +134,9 @@ type NewElementOrError = { * * @example * - * Using `IterableMapper` as a prefetcher and blocking writes, without changing the order of reads or writes: + * ### Using `IterableMapper` as Prefetcher with Blocking Sequential Writes + * + * `concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged). * * ```typescript * const source = new SomeSource(); @@ -155,7 +157,10 @@ type NewElementOrError = { * * @example * - * Using `IterableMapper` as a prefetcher with background writes, without changing the order of reads or writes: + * ### Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple` + * + * `concurrency: 1` on the prefetcher preserves the order of the reads. + * `concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing. * * ```typescript * const source = new SomeSource(); @@ -181,13 +186,15 @@ type NewElementOrError = { * } * ``` * - * This reduces iteration time to about to `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime)) + * This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))` * by overlapping reads and writes with the CPU processing step. * In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. * In cases where the CPU usage time is higher, the impact can be greater. * * @example * + * ### Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple` + * * For maximum throughput, allow out of order reads and writes with * `IterableQueueMapper` (to iterate results with backpressure when too many unread items) or * `IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):