Skip to content

Doc updates #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,75 @@

# Overview

`@shutterstock/p-map-iterable` provides several classes that allow processing results of `p-map`-style mapper functions by iterating the results as they are completed, with back pressure to limit the number of items that are processed ahead of the consumer.
`@shutterstock/p-map-iterable` provides several classes that allow processing results of `p-map`-style mapper functions by iterating the results as they are completed, with backpressure to limit the number of items that are processed ahead of the consumer.

A common use case for `@shutterstock/p-map-iterable` is as a "prefetcher" that will fetch, for example, AWS S3 files in an AWS Lambda function. By prefetching large files the consumer is able to use 100% of the paid-for Lambda CPU time for the JS thread, rather than waiting idle while the next file is fetched. The backpressure (set by `maxUnread`) prevents the prefetcher from consuming unlimited memory or disk space by racing ahead of the consumer.

These classes will typically be helpful in batch or queue consumers, not as much in request/response services.

# Example Usage Scenario
# Example Usage Scenarios

- AWS Lambda function for Kafka or Kinesis stream processing
- Each invocation has a batch of, say, 100 records
- Each record points to a file on S3 that needs to be parsed, processed, then written back to S3
- The average file size is 50 MB, so these will take a few seconds to fetch and write
- The Lambda function has 1,769 MB of RAM which gives it 1 vCPU, allowing the JS thread to run at roughly 100% speed of 1 core
- If the code never waits to fetch or write files then the Lambda function will be able to use 100% of the paid-for CPU time
- If there is no back pressure then reading 100 * 50 MB files would fill up the default Lambda temp disk space of 512 MB OR would consume 5 GB of memory, which would cause the Lambda function to fail
- We can use `IterableMapper` to fetch up to 5 of the next files while the current file is being processed, then pause until the current file is processed and the next file is consumed
- We can also use `IterableQueueMapper.enqueue()` to write the files back to S3 without waiting for them to finish, unless we get more than, say, 3-4 files being uploaded at once, at which point we can pause until the current file is uploaded before we allow queuing another file for upload
- In the rare case that 3-4 files are uploading, but not yet finished, we would block on `.enqueue()` and not consume much CPU while waiting for at least 1 upload to finish
Consider a typical processing loop without IterableMapper:

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sink = new SomeSink();
for (const sourceId of sourceIds) {
const item = await source.read(sourceId); // takes 300 ms of I/O wait, no CPU
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
}
```

Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.

Using IterableMapper as a prefetcher:

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
// Pre-reads up to 8 items serially and releases in sequential order
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1 }
);
const sink = new SomeSink();
for await (const item of sourcePrefetcher) {
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
}
```

This reduces iteration time to 520ms by overlapping reads with processing/writing.

For maximum throughput, make the writes concurrent with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure):

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1 }
);
const sink = new SomeSink();
const flusher = new IterableQueueMapperSimple(
async (outputItem) => sink.write(outputItem),
{ concurrency: 10 }
);
for await (const item of sourcePrefetcher) {
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await flusher.enqueue(outputItem); // usually takes no time
}
// Wait for all writes to complete
await flusher.onIdle();
// Check for errors
if (flusher.errors.length > 0) {
// ...
}
```

This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of the time to process each item and fully utilizing the CPU time available in the JS event loop.

# Getting Started

Expand Down Expand Up @@ -66,7 +117,7 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu
- User supplied sync or async mapper function
- Exposes an async iterable interface for consuming mapped items
- Allows a maximum queue depth of mapped items - if the consumer stops consuming, the queue will fill up, at which point the mapper will stop being invoked until an item is consumed from the queue
- This allows mapping with back pressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer
- This allows mapping with backpressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer
- [IterableQueueMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapper.html)
- Wraps `IterableMapper`
- Adds items to the queue via the `enqueue` method
Expand All @@ -91,7 +142,7 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu

See [p-map](https://github.com/sindresorhus/p-map) docs for a good start in understanding what this does.

The key difference between `IterableMapper` and `pMap` are that `IterableMapper` does not return when the entire mapping is done, rather it exposes an iterable that the caller loops through. This enables results to be processed while the mapping is still happening, while optionally allowing for back pressure to slow or stop the mapping if the caller is not consuming items fast enough. Common use cases include `prefetching` items from a remote service - the next set of requests are dispatched asyncronously while the current responses are processed and the prefetch requests will pause when the unread queue fills up.
The key difference between `IterableMapper` and `pMap` are that `IterableMapper` does not return when the entire mapping is done, rather it exposes an iterable that the caller loops through. This enables results to be processed while the mapping is still happening, while optionally allowing for backpressure to slow or stop the mapping if the caller is not consuming items fast enough. Common use cases include `prefetching` items from a remote service - the next set of requests are dispatched asyncronously while the current responses are processed and the prefetch requests will pause when the unread queue fills up.

See [examples/iterable-mapper.ts](./examples/iterable-mapper.ts) for an example.

Expand Down
22 changes: 11 additions & 11 deletions examples/iterable-queue-mapper-simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ async function main() {
let callCount = 0;

// Create an item processor with IterableQueueMapperSimple
const prefetcher = new IterableQueueMapperSimple(
const backgroundFlusher = new IterableQueueMapperSimple(
// mapper function
async (value: number): Promise<void> => {
const myCallCount = callCount++;
total += value;

console.log(`Mapper Call Start ${myCallCount}, Value: ${value}, Total: ${total}`);

// Simulate fetching an async item with varied delays
// await sleep(10 * (callCount % 7));
// Wait short random time
// Simulate flushing an async item with varied delays
await sleep(Math.random() * 10000);

if (value % 5 === 0) {
Expand All @@ -53,33 +51,35 @@ async function main() {
{ concurrency: 3 },
);

// Add items to the queue in the background
// Add items to be flushed to the queue in the background
// This will pause when the queue is full and resume when there is capacity
const jobAdder = (async () => {
for await (const item of iterator) {
console.log(`Enqueue Start ${item}`);
await prefetcher.enqueue(item);
await backgroundFlusher.enqueue(item);
console.log(`Enqueue Done ${item}`);
}
})();

// Wait for the job adder to finish adding the jobs
// (it's throughput is constrained by the prefetcher's concurrency)
// (it's throughput is constrained by the flushers's concurrency)
await jobAdder;

// Wait for the prefetcher to finish processing all items
await prefetcher.onIdle();
// Wait for the async flusher to finish flushing all items
await backgroundFlusher.onIdle();

// Check for errors
if (prefetcher.errors.length > 0) {
if (backgroundFlusher.errors.length > 0) {
console.error('Errors:');
prefetcher.errors.forEach(({ error, item }) =>
backgroundFlusher.errors.forEach(({ error, item }) =>
console.error(
`${item} had error: ${(error as Error).message ? (error as Error).message : error}`,
),
);
}

console.log(`Total: ${total}`);
console.log('Note - It is intended that there are errors in this example');
}

void main();
1 change: 1 addition & 0 deletions examples/iterable-queue-mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ async function main() {
await jobAdder;

console.log(`QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB}`);
console.log('Note - It is intended that there are errors in this example');
}

void main();
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@shutterstock/p-map-iterable",
"version": "0.0.0",
"description": "Set of classes that allow you to call mappers with controlled concurrency on iterables or on queues",
"description": "Set of classes used for async prefetching with backpressure (IterableMapper) and async flushing with backpressure (IterableQueueMapper, IterableQueueMapperSimple)",
"main": "dist/src/index.js",
"types": "dist/src/index.d.ts",
"publishConfig": {
Expand Down
38 changes: 38 additions & 0 deletions src/iterable-mapper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,44 @@ describe('IterableMapper', () => {
});
});

describe('order preservation tests', () => {
it('preserves exact sequential order under high load with concurrency 1, maxUnread 8', async () => {
const size = 1000;
const input = Array.from({ length: size }, (_, i) => ({
value: i + 1,
ms: Math.floor(Math.random() * 20), // Random delay 0-19ms
}));

const mappedOrder: number[] = [];
const iteratedOrder: number[] = [];

const prefetcher = new IterableMapper(
input,
async ({ value, ms }): Promise<number> => {
await sleep(ms);
mappedOrder.push(value);
return value;
},
{ concurrency: 1, maxUnread: 8 },
);

for await (const value of prefetcher) {
iteratedOrder.push(value);
// Add some random delay in consuming to create backpressure
await sleep(Math.floor(Math.random() * 20));
}

// Verify exact sequential order preservation
expect(iteratedOrder).toHaveLength(size);
expect(mappedOrder).toHaveLength(size);

// Verify that both mapped and iterated orders match the input sequence
const expectedOrder = Array.from({ length: size }, (_, i) => i + 1);
expect(mappedOrder).toEqual(expectedOrder);
expect(iteratedOrder).toEqual(expectedOrder);
}, 30000);
});

describe('concurrency 1, maxUnread 1', () => {
const concurrency = 1;
const maxUnread = 1;
Expand Down
Loading