Skip to content

Update docs with headers #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ These classes will typically be helpful in batch or queue consumers, not as much

# Example Usage Scenarios

Consider a typical processing loop without IterableMapper:
## Typical Processing Loop without `IterableMapper`

```typescript
const source = new SomeSource();
Expand All @@ -25,48 +25,88 @@ for (const sourceId of sourceIds) {

Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.

Using IterableMapper as a prefetcher:
## Using `IterableMapper` as Prefetcher with Blocking Sequential Writes

`concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
// Pre-reads up to 8 items serially and releases in sequential order
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1 }
{ concurrency: 1, maxUnread: 10 }
);
const sink = new SomeSink();
for await (const item of sourcePrefetcher) {
for await (const item of sourcePrefetcher) { // may not block for fast sources
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
}
```

This reduces iteration time to 520ms by overlapping reads with processing/writing.

For maximum throughput, make the writes concurrent with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration or backpressure):
## Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple`

`concurrency: 1` on the prefetcher preserves the order of the reads.
`concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1, maxUnread: 10 }
);
const sink = new SomeSink();
const flusher = new IterableQueueMapperSimple(
async (outputItem) => sink.write(outputItem),
{ concurrency: 1 }
);
for await (const item of sourcePrefetcher) { // may not block for fast sources
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await flusher.enqueue(outputItem); // will periodically block for portion of write time
}
// Wait for all writes to complete
await flusher.onIdle();
// Check for errors
if (flusher.errors.length > 0) {
// ...
}
```

This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))`
by overlapping reads and writes with the CPU processing step.
In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms.
In cases where the CPU usage time is higher, the impact can be greater.

## Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple`

For maximum throughput, allow out of order reads and writes with
`IterableQueueMapper` (to iterate results with backpressure when too many unread items) or
`IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):

```typescript
const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 10, maxUnread: 20 }
);
const sink = new SomeSink();
const flusher = new IterableQueueMapperSimple(
async (outputItem) => sink.write(outputItem),
{ concurrency: 10 }
);
for await (const item of sourcePrefetcher) {
for await (const item of sourcePrefetcher) { // typically will not block
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await flusher.enqueue(outputItem); // usually takes no time
await flusher.enqueue(outputItem); // typically will not block
}
// Wait for all writes to complete
await flusher.onIdle();
// Check for errors
if (flusher.errors.length > 0) {
// ...
// ...
}
```

Expand Down
15 changes: 11 additions & 4 deletions src/iterable-mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type NewElementOrError<NewElement = unknown> = {
*
* @example
*
* Consider a typical processing loop without IterableMapper:
* ### Typical Processing Loop without `IterableMapper`
*
* ```typescript
* const source = new SomeSource();
Expand All @@ -134,7 +134,9 @@ type NewElementOrError<NewElement = unknown> = {
*
* @example
*
* Using `IterableMapper` as a prefetcher and blocking writes, without changing the order of reads or writes:
* ### Using `IterableMapper` as Prefetcher with Blocking Sequential Writes
*
* `concurrency: 1` on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).
*
* ```typescript
* const source = new SomeSource();
Expand All @@ -155,7 +157,10 @@ type NewElementOrError<NewElement = unknown> = {
*
* @example
*
* Using `IterableMapper` as a prefetcher with background writes, without changing the order of reads or writes:
* ### Using `IterableMapper` as Prefetcher with Background Sequential Writes with `IterableQueueMapperSimple`
*
* `concurrency: 1` on the prefetcher preserves the order of the reads.
* `concurrency: 1` on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.
*
* ```typescript
* const source = new SomeSource();
Expand All @@ -181,13 +186,15 @@ type NewElementOrError<NewElement = unknown> = {
* }
* ```
*
* This reduces iteration time to about to `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))
* This reduces iteration time to about `max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))`
* by overlapping reads and writes with the CPU processing step.
* In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms.
* In cases where the CPU usage time is higher, the impact can be greater.
*
* @example
*
* ### Using `IterableMapper` as Prefetcher with Out of Order Reads and Background Out of Order Writes with `IterableQueueMapperSimple`
*
* For maximum throughput, allow out of order reads and writes with
* `IterableQueueMapper` (to iterate results with backpressure when too many unread items) or
* `IterableQueueMapperSimple` (to handle errors at end without custom iteration and applying backpressure to block further enqueues when `concurrency` items are in process):
Expand Down