From 929ab51d52abefd98fdbfd76d91270e9b287dfc9 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Wed, 12 Feb 2025 22:21:39 -0500 Subject: [PATCH 1/8] Update IterableMapper jsdocs --- src/iterable-mapper.ts | 137 +++++++++++++++++++++++++++++++++-------- 1 file changed, 110 insertions(+), 27 deletions(-) diff --git a/src/iterable-mapper.ts b/src/iterable-mapper.ts index ca19a48..4f238cb 100644 --- a/src/iterable-mapper.ts +++ b/src/iterable-mapper.ts @@ -10,25 +10,50 @@ import { IterableQueue } from './iterable-queue'; */ export interface IterableMapperOptions { /** - * Number of concurrently pending promises returned by `mapper`. + * Maximum number of concurrent invocations of `mapper` to run at once. * - * Must be an integer from 1 and up or `Infinity`, must be <= `maxUnread`. + * The number of concurrent invocations is dynamically adjusted based on the `maxUnread` limit: + * - If there are no unread items and `maxUnread` is 10 with `concurrency` of 4, all 4 mappers can run. + * - If there are already 8 unread items in the queue, only 2 mappers will run to avoid exceeding + * the `maxUnread` limit of 10. + * - If there are 10 unread items, no mappers will run until an item is consumed from the queue. + * + * This ensures efficient processing while maintaining backpressure through the `maxUnread` limit. + * + * Setting `concurrency` to 1 enables serial processing, preserving the order of items + * while still benefiting from the backpressure mechanism. + * + * Must be an integer from 1 and up or `Infinity`, and must be <= `maxUnread`. * * @default 4 */ readonly concurrency?: number; /** - * Number of pending unread iterable items. + * Maximum number of unread items allowed to accumulate before applying backpressure. + * + * This parameter is crucial for controlling memory usage and system load by: + * 1. Limiting the number of processed but unread items in the queue + * 2. Automatically pausing mapper execution when the limit is reached + * 3. Resuming processing when items are consumed, maintaining optimal throughput * - * Must be an integer from 1 and up or `Infinity`, must be >= `concurrency`. + * For example, when reading from a slow database: + * - With maxUnread=10, only 10 items will be fetched before the consumer reads them + * - Additional items won't be fetched until the consumer reads existing items + * - This prevents runaway memory usage for items that cannot be processed quickly enough + * + * Must be an integer from 1 and up or `Infinity`, and must be >= `concurrency`. + * It is not typical to set this value to `Infinity`, but rather to a value such as 1 to 10. * * @default 8 */ readonly maxUnread?: number; /** - * When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. + * When set to `false`, instead of stopping when a promise rejects, it will wait for all + * the promises to settle and then reject with an + * [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the + * errors from the rejected promises. * * @default true */ @@ -64,13 +89,70 @@ type NewElementOrError = { * * @remarks * - * This allows performing a concurrent mapping with - * back pressure (won't iterate all source items if the consumer is - * not reading). + * Optimized for I/O-bound operations (e.g., fetching data, reading files) rather than + * CPU-intensive tasks. The concurrent processing with backpressure ensures efficient + * resource utilization without overwhelming memory or system resources. + * + * Consider a typical processing loop without IterableMapper: + * ```typescript + * const source = new SomeSource(); + * const sourceIds = [1, 2,... 1000]; + * const sink = new SomeSink(); + * for (const sourceId of sourceIds) { + * const item = await source.read(sourceId); // takes 300 ms of I/O wait, no CPU + * const outputItem = doSomeOperation(item); // takes 20 ms of CPU + * await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU + * } + * ``` + * + * Each iteration takes 820ms total, but we waste time waiting for I/O. + * We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), + * without changing the order of reads or writes. * - * Typical use case is for a `prefetcher` that ensures that items - * are always ready for the consumer but that large numbers of items - * are not processed before the consumer is ready for them. + * Using IterableMapper as a prefetcher: + * ```typescript + * const source = new SomeSource(); + * const sourceIds = [1, 2,... 1000]; + * // Pre-reads up to 8 items serially and releases in sequential order + * const sourcePrefetcher = new IterableMapper(sourceIds, + * async (sourceId) => source.read(sourceId), + * { concurrency: 1 } + * ); + * const sink = new SomeSink(); + * for await (const item of sourcePrefetcher) { + * const outputItem = doSomeOperation(item); // takes 20 ms of CPU + * await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU + * } + * ``` + * + * This reduces iteration time to 520ms by overlapping reads with processing/writing. + * + * For maximum throughput, combine with IterableQueueMapperSimple for concurrent writes: + * ```typescript + * const source = new SomeSource(); + * const sourceIds = [1, 2,... 1000]; + * const sourcePrefetcher = new IterableMapper(sourceIds, + * async (sourceId) => source.read(sourceId), + * { concurrency: 1 } + * ); + * const sink = new SomeSink(); + * const flusher = new IterableQueueMapperSimple( + * async (outputItem) => sink.write(outputItem), + * { concurrency: 10 } + * ); + * for await (const item of sourcePrefetcher) { + * const outputItem = doSomeOperation(item); // takes 20 ms of CPU + * await flusher.enqueue(outputItem); // usually takes no time + * } + * // Wait for all writes to complete + * await flusher.onIdle(); + * // Check for errors + * if (flusher.errors.length > 0) { + * // ... + * } + * ``` + * + * This reduces iteration time to about 20ms by overlapping reads and writes. * * @category Iterable Input */ @@ -93,23 +175,24 @@ export class IterableMapper implements AsyncIterable | Iterable, @@ -323,7 +406,7 @@ export class IterableMapper implements AsyncIterable Date: Wed, 12 Feb 2025 23:58:30 -0500 Subject: [PATCH 2/8] Update IterableQueueMapperSimple example --- examples/iterable-queue-mapper-simple.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/examples/iterable-queue-mapper-simple.ts b/examples/iterable-queue-mapper-simple.ts index 7a6ae79..dfb4d36 100644 --- a/examples/iterable-queue-mapper-simple.ts +++ b/examples/iterable-queue-mapper-simple.ts @@ -31,7 +31,7 @@ async function main() { let callCount = 0; // Create an item processor with IterableQueueMapperSimple - const prefetcher = new IterableQueueMapperSimple( + const backgroundFlusher = new IterableQueueMapperSimple( // mapper function async (value: number): Promise => { const myCallCount = callCount++; @@ -39,9 +39,7 @@ async function main() { console.log(`Mapper Call Start ${myCallCount}, Value: ${value}, Total: ${total}`); - // Simulate fetching an async item with varied delays - // await sleep(10 * (callCount % 7)); - // Wait short random time + // Simulate flushing an async item with varied delays await sleep(Math.random() * 10000); if (value % 5 === 0) { @@ -53,26 +51,27 @@ async function main() { { concurrency: 3 }, ); - // Add items to the queue in the background + // Add items to be flushed to the queue in the background + // This will pause when the queue is full and resume when there is capacity const jobAdder = (async () => { for await (const item of iterator) { console.log(`Enqueue Start ${item}`); - await prefetcher.enqueue(item); + await backgroundFlusher.enqueue(item); console.log(`Enqueue Done ${item}`); } })(); // Wait for the job adder to finish adding the jobs - // (it's throughput is constrained by the prefetcher's concurrency) + // (it's throughput is constrained by the flushers's concurrency) await jobAdder; - // Wait for the prefetcher to finish processing all items - await prefetcher.onIdle(); + // Wait for the async flusher to finish flushing all items + await backgroundFlusher.onIdle(); // Check for errors - if (prefetcher.errors.length > 0) { + if (backgroundFlusher.errors.length > 0) { console.error('Errors:'); - prefetcher.errors.forEach(({ error, item }) => + backgroundFlusher.errors.forEach(({ error, item }) => console.error( `${item} had error: ${(error as Error).message ? (error as Error).message : error}`, ), @@ -80,6 +79,7 @@ async function main() { } console.log(`Total: ${total}`); + console.log('Note - It is intended that there are errors in this example'); } void main(); From 077effd1bbfb4cdf8f8a5ba1b7a9a024e13d8810 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Wed, 12 Feb 2025 23:58:39 -0500 Subject: [PATCH 3/8] Add comment on errors --- examples/iterable-queue-mapper.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/iterable-queue-mapper.ts b/examples/iterable-queue-mapper.ts index bbbae08..966027d 100644 --- a/examples/iterable-queue-mapper.ts +++ b/examples/iterable-queue-mapper.ts @@ -135,6 +135,7 @@ async function main() { await jobAdder; console.log(`QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB}`); + console.log('Note - It is intended that there are errors in this example'); } void main(); From ab6bf6b9ec87cc38c2c49b7fb8f79e271dd0a0ce Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 13 Feb 2025 00:24:24 -0500 Subject: [PATCH 4/8] Add order preservation test --- src/iterable-mapper.test.ts | 38 +++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/iterable-mapper.test.ts b/src/iterable-mapper.test.ts index 5049877..3266c23 100644 --- a/src/iterable-mapper.test.ts +++ b/src/iterable-mapper.test.ts @@ -419,6 +419,44 @@ describe('IterableMapper', () => { }); }); + describe('order preservation tests', () => { + it('preserves exact sequential order under high load with concurrency 1, maxUnread 8', async () => { + const size = 1000; + const input = Array.from({ length: size }, (_, i) => ({ + value: i + 1, + ms: Math.floor(Math.random() * 20), // Random delay 0-19ms + })); + + const mappedOrder: number[] = []; + const iteratedOrder: number[] = []; + + const prefetcher = new IterableMapper( + input, + async ({ value, ms }): Promise => { + await sleep(ms); + mappedOrder.push(value); + return value; + }, + { concurrency: 1, maxUnread: 8 }, + ); + + for await (const value of prefetcher) { + iteratedOrder.push(value); + // Add some random delay in consuming to create backpressure + await sleep(Math.floor(Math.random() * 20)); + } + + // Verify exact sequential order preservation + expect(iteratedOrder).toHaveLength(size); + expect(mappedOrder).toHaveLength(size); + + // Verify that both mapped and iterated orders match the input sequence + const expectedOrder = Array.from({ length: size }, (_, i) => i + 1); + expect(mappedOrder).toEqual(expectedOrder); + expect(iteratedOrder).toEqual(expectedOrder); + }, 30000); + }); + describe('concurrency 1, maxUnread 1', () => { const concurrency = 1; const maxUnread = 1; From d3ba8507b012e8c8a85f1aa83558637b06e75eb2 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 13 Feb 2025 09:42:31 -0500 Subject: [PATCH 5/8] Replace readme example --- README.md | 75 +++++++++++++++++++++++++++++++++++------- src/iterable-mapper.ts | 9 +++-- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 144cefc..c0e4b2c 100644 --- a/README.md +++ b/README.md @@ -8,18 +8,69 @@ A common use case for `@shutterstock/p-map-iterable` is as a "prefetcher" that w These classes will typically be helpful in batch or queue consumers, not as much in request/response services. -# Example Usage Scenario - -- AWS Lambda function for Kafka or Kinesis stream processing -- Each invocation has a batch of, say, 100 records -- Each record points to a file on S3 that needs to be parsed, processed, then written back to S3 -- The average file size is 50 MB, so these will take a few seconds to fetch and write -- The Lambda function has 1,769 MB of RAM which gives it 1 vCPU, allowing the JS thread to run at roughly 100% speed of 1 core -- If the code never waits to fetch or write files then the Lambda function will be able to use 100% of the paid-for CPU time -- If there is no back pressure then reading 100 * 50 MB files would fill up the default Lambda temp disk space of 512 MB OR would consume 5 GB of memory, which would cause the Lambda function to fail -- We can use `IterableMapper` to fetch up to 5 of the next files while the current file is being processed, then pause until the current file is processed and the next file is consumed -- We can also use `IterableQueueMapper.enqueue()` to write the files back to S3 without waiting for them to finish, unless we get more than, say, 3-4 files being uploaded at once, at which point we can pause until the current file is uploaded before we allow queuing another file for upload -- In the rare case that 3-4 files are uploading, but not yet finished, we would block on `.enqueue()` and not consume much CPU while waiting for at least 1 upload to finish +# Example Usage Scenarios + +Consider a typical processing loop without IterableMapper: + +```typescript +const source = new SomeSource(); +const sourceIds = [1, 2,... 1000]; +const sink = new SomeSink(); +for (const sourceId of sourceIds) { + const item = await source.read(sourceId); // takes 300 ms of I/O wait, no CPU + const outputItem = doSomeOperation(item); // takes 20 ms of CPU + await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU +} +``` + +Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes. + +Using IterableMapper as a prefetcher: + +```typescript +const source = new SomeSource(); +const sourceIds = [1, 2,... 1000]; +// Pre-reads up to 8 items serially and releases in sequential order +const sourcePrefetcher = new IterableMapper(sourceIds, + async (sourceId) => source.read(sourceId), + { concurrency: 1 } +); +const sink = new SomeSink(); +for await (const item of sourcePrefetcher) { + const outputItem = doSomeOperation(item); // takes 20 ms of CPU + await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU +} +``` + +This reduces iteration time to 520ms by overlapping reads with processing/writing. + +For maximum throughput, make the writes concurrent with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure): + +```typescript +const source = new SomeSource(); +const sourceIds = [1, 2,... 1000]; +const sourcePrefetcher = new IterableMapper(sourceIds, + async (sourceId) => source.read(sourceId), + { concurrency: 1 } +); +const sink = new SomeSink(); +const flusher = new IterableQueueMapperSimple( + async (outputItem) => sink.write(outputItem), + { concurrency: 10 } +); +for await (const item of sourcePrefetcher) { + const outputItem = doSomeOperation(item); // takes 20 ms of CPU + await flusher.enqueue(outputItem); // usually takes no time +} +// Wait for all writes to complete +await flusher.onIdle(); +// Check for errors +if (flusher.errors.length > 0) { + // ... +} +``` + +This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of the time to process each item and fully utilizing the CPU time available in the JS event loop. # Getting Started diff --git a/src/iterable-mapper.ts b/src/iterable-mapper.ts index 4f238cb..251a4e3 100644 --- a/src/iterable-mapper.ts +++ b/src/iterable-mapper.ts @@ -127,7 +127,10 @@ type NewElementOrError = { * * This reduces iteration time to 520ms by overlapping reads with processing/writing. * - * For maximum throughput, combine with IterableQueueMapperSimple for concurrent writes: + * For maximum throughput, make the writes concurrent with + * IterableQueueMapper (to iterate results with backpressure when too many unread items) or + * IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure): + * * ```typescript * const source = new SomeSource(); * const sourceIds = [1, 2,... 1000]; @@ -152,7 +155,9 @@ type NewElementOrError = { * } * ``` * - * This reduces iteration time to about 20ms by overlapping reads and writes. + * This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. + * In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of + * the time to process each item and fully utilizing the CPU time available in the JS event loop. * * @category Iterable Input */ From b754d323ddf52e72ed32a9876c1d4717dc84fd21 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 13 Feb 2025 13:04:52 -0500 Subject: [PATCH 6/8] Update usage of backpressure in readme --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c0e4b2c..21f6d02 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # Overview -`@shutterstock/p-map-iterable` provides several classes that allow processing results of `p-map`-style mapper functions by iterating the results as they are completed, with back pressure to limit the number of items that are processed ahead of the consumer. +`@shutterstock/p-map-iterable` provides several classes that allow processing results of `p-map`-style mapper functions by iterating the results as they are completed, with backpressure to limit the number of items that are processed ahead of the consumer. A common use case for `@shutterstock/p-map-iterable` is as a "prefetcher" that will fetch, for example, AWS S3 files in an AWS Lambda function. By prefetching large files the consumer is able to use 100% of the paid-for Lambda CPU time for the JS thread, rather than waiting idle while the next file is fetched. The backpressure (set by `maxUnread`) prevents the prefetcher from consuming unlimited memory or disk space by racing ahead of the consumer. @@ -117,7 +117,7 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu - User supplied sync or async mapper function - Exposes an async iterable interface for consuming mapped items - Allows a maximum queue depth of mapped items - if the consumer stops consuming, the queue will fill up, at which point the mapper will stop being invoked until an item is consumed from the queue - - This allows mapping with back pressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer + - This allows mapping with backpressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer - [IterableQueueMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapper.html) - Wraps `IterableMapper` - Adds items to the queue via the `enqueue` method @@ -142,7 +142,7 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu See [p-map](https://github.com/sindresorhus/p-map) docs for a good start in understanding what this does. -The key difference between `IterableMapper` and `pMap` are that `IterableMapper` does not return when the entire mapping is done, rather it exposes an iterable that the caller loops through. This enables results to be processed while the mapping is still happening, while optionally allowing for back pressure to slow or stop the mapping if the caller is not consuming items fast enough. Common use cases include `prefetching` items from a remote service - the next set of requests are dispatched asyncronously while the current responses are processed and the prefetch requests will pause when the unread queue fills up. +The key difference between `IterableMapper` and `pMap` are that `IterableMapper` does not return when the entire mapping is done, rather it exposes an iterable that the caller loops through. This enables results to be processed while the mapping is still happening, while optionally allowing for backpressure to slow or stop the mapping if the caller is not consuming items fast enough. Common use cases include `prefetching` items from a remote service - the next set of requests are dispatched asyncronously while the current responses are processed and the prefetch requests will pause when the unread queue fills up. See [examples/iterable-mapper.ts](./examples/iterable-mapper.ts) for an example. From c8583f8a0868d0adf83e2d7ee387ff6c69ee2777 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 13 Feb 2025 13:05:24 -0500 Subject: [PATCH 7/8] Complete normalization of class jsdocs --- src/iterable-mapper.ts | 100 ++++++++++++++++++++-------- src/iterable-queue-mapper-simple.ts | 77 ++++++++++++--------- src/iterable-queue-mapper.ts | 98 +++++++++++---------------- 3 files changed, 157 insertions(+), 118 deletions(-) diff --git a/src/iterable-mapper.ts b/src/iterable-mapper.ts index 251a4e3..15a8848 100644 --- a/src/iterable-mapper.ts +++ b/src/iterable-mapper.ts @@ -82,18 +82,41 @@ type NewElementOrError = { }; /** - * Iterates over a source iterable with specified concurrency, + * Iterates over a source iterable / generator with specified `concurrency`, * calling the `mapper` on each iterated item, and storing the - * `mapper` result in a queue of specified max size, before + * `mapper` result in a queue of `maxUnread` size, before * being iterated / read by the caller. * * @remarks * - * Optimized for I/O-bound operations (e.g., fetching data, reading files) rather than - * CPU-intensive tasks. The concurrent processing with backpressure ensures efficient - * resource utilization without overwhelming memory or system resources. + * ### Typical Use Case + * - Prefetching items from an async I/O source + * - In the simple sequential (`concurrency: 1`) case, allows items to be prefetched async, preserving order, while caller processes an item + * - Can allow parallel prefetches for sources that allow for out of order reads (`concurrency: 2+`) + * - Prevents the producer from racing ahead of the consumer if `maxUnread` is reached + * + * ### Error Handling + * The mapper should ideally handle all errors internally to enable error handling + * closest to where they occur. However, if errors do escape the mapper: + * + * When `stopOnMapperError` is true (default): + * - First error immediately stops processing + * - Error is thrown from the `AsyncIterator`'s next() call + * + * When `stopOnMapperError` is false: + * - Processing continues despite errors + * - All errors are collected and thrown together + * - Errors are thrown as `AggregateError` after all items complete + * + * ### Usage + * - Items are exposed to the `mapper` via an iterator or async iterator (this includes generator and async generator functions) + * - IMPORTANT: `mapper` method not be invoked when `maxUnread` is reached, until items are consumed + * - The iterable will set `done` when the `input` has indicated `done` and all `mapper` promises have resolved + * + * @example * * Consider a typical processing loop without IterableMapper: + * * ```typescript * const source = new SomeSource(); * const sourceIds = [1, 2,... 1000]; @@ -109,17 +132,20 @@ type NewElementOrError = { * We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), * without changing the order of reads or writes. * - * Using IterableMapper as a prefetcher: + * @example + * + * Using `IterableMapper` as a prefetcher and blocking writes, without changing the order of reads or writes: + * * ```typescript * const source = new SomeSource(); * const sourceIds = [1, 2,... 1000]; * // Pre-reads up to 8 items serially and releases in sequential order * const sourcePrefetcher = new IterableMapper(sourceIds, * async (sourceId) => source.read(sourceId), - * { concurrency: 1 } + * { concurrency: 1, maxUnread: 10 } * ); * const sink = new SomeSink(); - * for await (const item of sourcePrefetcher) { + * for await (const item of sourcePrefetcher) { // may not block for fast sources * const outputItem = doSomeOperation(item); // takes 20 ms of CPU * await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU * } @@ -127,25 +153,60 @@ type NewElementOrError = { * * This reduces iteration time to 520ms by overlapping reads with processing/writing. * - * For maximum throughput, make the writes concurrent with - * IterableQueueMapper (to iterate results with backpressure when too many unread items) or - * IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure): + * @example + * + * Using `IterableMapper` as a prefetcher with background writes, without changing the order of reads or writes: * * ```typescript * const source = new SomeSource(); * const sourceIds = [1, 2,... 1000]; * const sourcePrefetcher = new IterableMapper(sourceIds, * async (sourceId) => source.read(sourceId), + * { concurrency: 1, maxUnread: 10 } + * ); + * const sink = new SomeSink(); + * const flusher = new IterableQueueMapperSimple( + * async (outputItem) => sink.write(outputItem), * { concurrency: 1 } * ); + * for await (const item of sourcePrefetcher) { // may not block for fast sources + * const outputItem = doSomeOperation(item); // takes 20 ms of CPU + * await flusher.enqueue(outputItem); // will periodically block for portion of write time + * } + * // Wait for all writes to complete + * await flusher.onIdle(); + * // Check for errors + * if (flusher.errors.length > 0) { + * // ... + * } + * ``` + * + * This reduces iteration time to about to `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime)) + * by overlapping reads and writes with the CPU processing step. + * In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. + * In cases where the CPU usage time is higher, the impact can be greater. + * + * @example + * + * For maximum throughput, allow out of order reads and writes with + * `IterableQueueMapper` (to iterate results with backpressure when too many unread items) or + * `IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process): + * + * ```typescript + * const source = new SomeSource(); + * const sourceIds = [1, 2,... 1000]; + * const sourcePrefetcher = new IterableMapper(sourceIds, + * async (sourceId) => source.read(sourceId), + * { concurrency: 10, maxUnread: 20 } + * ); * const sink = new SomeSink(); * const flusher = new IterableQueueMapperSimple( * async (outputItem) => sink.write(outputItem), * { concurrency: 10 } * ); - * for await (const item of sourcePrefetcher) { + * for await (const item of sourcePrefetcher) { // typically will not block * const outputItem = doSomeOperation(item); // takes 20 ms of CPU - * await flusher.enqueue(outputItem); // usually takes no time + * await flusher.enqueue(outputItem); // typically will not block * } * // Wait for all writes to complete * await flusher.onIdle(); @@ -184,19 +245,6 @@ export class IterableMapper implements AsyncIterable = { item: T; error: string | { [key: string]: any } | Error }[]; const NoResult = Symbol('noresult'); +/** + * Options for IterableQueueMapperSimple + */ +export type IterableQueueMapperSimpleOptions = Pick; + /** * Accepts queue items via `enqueue` and calls the `mapper` on them - * with specified concurrency, storing the - * `mapper` result in a queue of specified max size, before - * being iterated / read by the caller. The `enqueue` method will block if - * the queue is full, until an item is read. + * with specified `concurrency`, discards the results, and accumulates + * exceptions in the `errors` property. When empty, `await enqueue()` + * will return immediately, but when `concurrency` items are in progress, + * `await enqueue()` will block until a slot is available to accept the item. * * @remarks * - * Note: the name is somewhat of a misnomer as this wraps `IterableQueueMapper` - * but is not itself an `Iterable`. + * ### Typical Use Case + * - Pushing items to an async I/O destination + * - In the simple sequential (`concurrency: 1`) case, allows 1 item to be flushed async while caller prepares next item + * - Results of the flushed items are not needed in a subsequent step (if they are, use `IterableQueueMapper`) + * + * ### Error Handling + * The mapper should ideally handle all errors internally to enable error handling + * closest to where they occur. However, if errors do escape the mapper: + * - Processing continues despite errors + * - All errors are collected in the `errors` property + * - Errors can be checked/handled during processing via the `errors` property * - * Accepts items for mapping in the background, discards the results, - * but accumulates exceptions in the `errors` property. + * Key Differences from `IterableQueueMapper`: + * - `maxUnread` defaults to equal `concurrency` (simplifying queue management) + * - Results are automatically iterated and discarded (all work should happen in mapper) + * - Errors are collected rather than thrown (available via errors property) * - * Allows up to `concurrency` mappers to be in progress before - * `enqueue` will block until a mapper completes. + * ### Usage + * - Items are added to the queue via the `await enqueue()` method + * - Check `errors` property to see if any errors occurred, stop if desired + * - IMPORTANT: `await enqueue()` method will block until a slot is available, if queue is full + * - IMPORTANT: Always `await onIdle()` to ensure all items are processed + * + * Note: the name is somewhat of a misnomer as this wraps `IterableQueueMapper` + * but is not itself an `Iterable`. * * @category Enqueue Input + * + * @see {@link IterableQueueMapper} for related class with more configuration options + * @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage */ export class IterableQueueMapperSimple { private readonly _writer: IterableQueueMapper; @@ -34,29 +59,17 @@ export class IterableQueueMapperSimple { private _isIdle = false; /** - * Create a new `IterableQueueMapperSimple` + * Create a new `IterableQueueMapperSimple`, which uses `IterableQueueMapper` underneath, but + * automatically iterates and discards results as they complete. * - * @param mapper Function which is called for every item in `input`. - * Expected to return a `Promise` or value. - * - * The `mapper` *should* handle all errors and not allow an error to be thrown - * out of the `mapper` function as this enables the best handling of errors - * closest to the time that they occur. - * - * If the `mapper` function does allow an error to be thrown then the - * errors will be accumulated in the `errors` property. + * @param mapper Function called for every enqueued item. Returns a `Promise` or value. * @param options IterableQueueMapperSimple options + * + * @see {@link IterableQueueMapperSimple} for full class documentation + * @see {@link IterableQueueMapper} for related class with more configuration options + * @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage */ - constructor( - mapper: Mapper, - options: { - /** - * Number of items to accept for mapping before requiring the caller to wait for one to complete. - * @default 4 - */ - concurrency?: number; - } = {}, - ) { + constructor(mapper: Mapper, options: IterableQueueMapperSimpleOptions = {}) { const { concurrency = 4 } = options; this._mapper = mapper; @@ -106,7 +119,7 @@ export class IterableQueueMapperSimple { /** * Accept a request for sending in the background if a concurrency slot is available. * Else, do not return until a concurrency slot is freed up. - * This provides concurrency background writes with back pressure to prevent + * This provides concurrency background writes with backpressure to prevent * the caller from getting too far ahead. * * MUST await `onIdle` for background `mappers`s to finish diff --git a/src/iterable-queue-mapper.ts b/src/iterable-queue-mapper.ts index 1e38918..0c4bf4a 100644 --- a/src/iterable-queue-mapper.ts +++ b/src/iterable-queue-mapper.ts @@ -1,62 +1,50 @@ // // 2021-08-25 - Initially based on: https://raw.githubusercontent.com/sindresorhus/p-map/main/index.js // -import { IterableMapper, Mapper } from './iterable-mapper'; +import { IterableMapper, IterableMapperOptions, Mapper } from './iterable-mapper'; import { IterableQueue } from './iterable-queue'; -export interface IterableQueueMapperOptions { - /** - * Number of concurrently pending promises returned by `mapper`. - * - * Must be an integer from 1 and up or `Infinity`, must be <= `maxUnread`. - * - * @default 4 - */ - readonly concurrency?: number; - - /** - * Number of pending unread iterable items. - * - * Must be an integer from 1 and up or `Infinity`, must be >= `concurrency`. - * - * @default 8 - */ - readonly maxUnread?: number; - - /** - * When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. - * - * @default true - */ - readonly stopOnMapperError?: boolean; -} +/** + * Options for IterableQueueMapper + */ +export type IterableQueueMapperOptions = IterableMapperOptions; /** * Accepts queue items via `enqueue` and calls the `mapper` on them - * with specified concurrency, storing the - * `mapper` result in a queue of specified max size, before - * being iterated / read by the caller. The `enqueue` method will block if - * the queue is full, until an item is read. + * with specified `concurrency`, storing the `mapper` result in a queue + * of `maxUnread` size, before being iterated / read by the caller. + * The `enqueue` method will block if the queue is full, until an item is read. * * @remarks * - * This allows performing a concurrent mapping with - * back pressure for items added after queue creation - * via a method call. + * ### Typical Use Case + * - Pushing items to an async I/O destination + * - In the simple sequential (`concurrency: 1`) case, allows 1 item to be flushed async while caller prepares next item + * - Results of the flushed items are needed in a subsequent step (if they are not, use `IterableQueueMapperSimple`) + * - Prevents the producer from racing ahead of the consumer if `maxUnread` is reached + * + * ### Error Handling + * The mapper should ideally handle all errors internally to enable error handling + * closest to where they occur. However, if errors do escape the mapper: * - * Because items are added via a method call it is possible to - * chain an `IterableMapper` that prefetches files and processes them, - * with an `IterableQueueMapper` that processes the results of the - * `mapper` function of the `IterableMapper`. + * When `stopOnMapperError` is true (default): + * - First error immediately stops processing + * - Error is thrown from the `AsyncIterator`'s next() call * - * Typical use case is for a `background uploader` that prevents - * the producer from racing ahead of the upload process, consuming - * too much memory or disk space. As items are ready for upload - * they are added to the queue with the `enqueue` method, which is - * `await`ed by the caller. If the queue has room then `enqueue` - * will return immediately, otherwise it will block until there is room. + * When `stopOnMapperError` is false: + * - Processing continues despite errors + * - All errors are collected and thrown together + * - Errors are thrown as `AggregateError` after all items complete + * + * ### Usage + * - Items are added to the queue via the `await enqueue()` method + * - IMPORTANT: `await enqueue()` method will block until a slot is available, if queue is full + * - Call `done()` when no more items will be enqueued + * - IMPORTANT: Always `await onIdle()` to ensure all items are processed * * @category Enqueue Input + * + * @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage */ export class IterableQueueMapper implements AsyncIterable { private _iterableMapper: IterableMapper; @@ -64,24 +52,14 @@ export class IterableQueueMapper implements AsyncIterable; /** - * Create a new `IterableQueueMapper` - * - * @param mapper Function which is called for every item in `input`. - * Expected to return a `Promise` or value. + * Create a new `IterableQueueMapper`, which uses `IterableMapper` underneath, and exposes a + * queue interface for adding items that are not exposed via an iterator. * - * The `mapper` *should* handle all errors and not allow an error to be thrown - * out of the `mapper` function as this enables the best handling of errors - * closest to the time that they occur. - * - * If the `mapper` function does allow an error to be thrown then the - * `stopOnMapperError` option controls the behavior: - * - `stopOnMapperError`: `true` - will throw the error - * out of `next` or the `AsyncIterator` returned from `[Symbol.asyncIterator]` - * and stop processing. - * - `stopOnMapperError`: `false` - will continue processing - * and accumulate the errors to be thrown from `next` or the `AsyncIterator` - * returned from `[Symbol.asyncIterator]` when all items have been processed. + * @param mapper Function called for every enqueued item. Returns a `Promise` or value. * @param options IterableQueueMapper options + * + * @see {@link IterableQueueMapper} for full class documentation + * @see {@link IterableMapper} for underlying mapper implementation and examples of combined usage */ constructor(mapper: Mapper, options: IterableQueueMapperOptions = {}) { this._sourceIterable = new IterableQueue({ From b94e00d3a0736f7ae0b3dcbe976b27249b2d6ba5 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 13 Feb 2025 13:05:53 -0500 Subject: [PATCH 8/8] Update package description --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0917899..ec3f6c5 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@shutterstock/p-map-iterable", "version": "0.0.0", - "description": "Set of classes that allow you to call mappers with controlled concurrency on iterables or on queues", + "description": "Set of classes used for async prefetching with backpressure (IterableMapper) and async flushing with backpressure (IterableQueueMapper, IterableQueueMapperSimple)", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", "publishConfig": {