From 1b5b50f8e3540416c930336d042509d70b385bd2 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Mon, 25 Nov 2024 18:07:21 -0800 Subject: [PATCH] feat(client): request abort signal support (#112) * feat(client): request abort signal support * fix: lint errors --- libs/client/package.json | 2 +- libs/client/src/client.ts | 3 +++ libs/client/src/queue.ts | 24 +++++++++++++++++++++--- libs/client/src/storage.ts | 22 +++++++++------------- libs/client/src/streaming.ts | 17 +++++++++++++++++ libs/client/src/types/common.ts | 5 +++++ 6 files changed, 56 insertions(+), 17 deletions(-) diff --git a/libs/client/package.json b/libs/client/package.json index b197974..d13b58b 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/client", "description": "The fal.ai client for JavaScript and TypeScript", - "version": "1.2.0-alpha.5", + "version": "1.2.0-alpha.6", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/client.ts b/libs/client/src/client.ts index 18bd329..3cfdfbb 100644 --- a/libs/client/src/client.ts +++ b/libs/client/src/client.ts @@ -106,6 +106,9 @@ export function createFalClient(userConfig: Config = {}): FalClient { ...config, responseHandler: resultResponseHandler, }, + options: { + signal: options.abortSignal, + }, }); }, subscribe: async (endpointId, options) => { diff --git a/libs/client/src/queue.ts b/libs/client/src/queue.ts index a663b70..b7a634a 100644 --- a/libs/client/src/queue.ts +++ b/libs/client/src/queue.ts @@ -123,6 +123,11 @@ type BaseQueueOptions = { * The unique identifier for the enqueued request. */ requestId: string; + + /** + * The signal to abort the request. + */ + abortSignal?: AbortSignal; }; export type QueueStatusOptions = BaseQueueOptions & { @@ -246,11 +251,14 @@ export const createQueueClient = ({ }, input: input as Input, config, + options: { + signal: options.abortSignal, + }, }); }, async status( endpointId: string, - { requestId, logs = false }: QueueStatusOptions, + { requestId, logs = false, abortSignal }: QueueStatusOptions, ): Promise { const appId = parseEndpointId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ""; @@ -262,6 +270,9 @@ export const createQueueClient = ({ path: `/requests/${requestId}/status`, }), config, + options: { + signal: abortSignal, + }, }); }, @@ -379,6 +390,7 @@ export const createQueueClient = ({ const requestStatus = await ref.status(endpointId, { requestId, logs: options.logs ?? false, + abortSignal: options.abortSignal, }); if (options.onQueueUpdate) { options.onQueueUpdate(requestStatus); @@ -400,7 +412,7 @@ export const createQueueClient = ({ async result( endpointId: string, - { requestId }: BaseQueueOptions, + { requestId, abortSignal }: BaseQueueOptions, ): Promise> { const appId = parseEndpointId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ""; @@ -414,12 +426,15 @@ export const createQueueClient = ({ ...config, responseHandler: resultResponseHandler, }, + options: { + signal: abortSignal, + }, }); }, async cancel( endpointId: string, - { requestId }: BaseQueueOptions, + { requestId, abortSignal }: BaseQueueOptions, ): Promise { const appId = parseEndpointId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ""; @@ -430,6 +445,9 @@ export const createQueueClient = ({ path: `/requests/${requestId}/cancel`, }), config, + options: { + signal: abortSignal, + }, }); }, }; diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index 5bd86a7..59c5d70 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -105,7 +105,7 @@ async function partUploadRetries( uploadUrl: string, chunk: Blob, config: RequiredConfig, - tries: number = 3, + tries = 3, ): Promise { if (tries === 0) { throw new Error("Part upload failed, retries exhausted"); @@ -142,21 +142,17 @@ async function multipartUpload( const responses: MultipartObject[] = []; - try { - for (let i = 0; i < chunks; i++) { - const start = i * chunkSize; - const end = Math.min(start + chunkSize, file.size); + for (let i = 0; i < chunks; i++) { + const start = i * chunkSize; + const end = Math.min(start + chunkSize, file.size); - const chunk = file.slice(start, end); + const chunk = file.slice(start, end); - const partNumber = i + 1; - // {uploadUrl}/{part_number}?uploadUrlParams=... - const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; + const partNumber = i + 1; + // {uploadUrl}/{part_number}?uploadUrlParams=... + const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; - responses.push(await partUploadRetries(partUploadUrl, chunk, config)); - } - } catch (error) { - throw error; + responses.push(await partUploadRetries(partUploadUrl, chunk, config)); } // Complete the upload diff --git a/libs/client/src/streaming.ts b/libs/client/src/streaming.ts index bf32a95..c618ccf 100644 --- a/libs/client/src/streaming.ts +++ b/libs/client/src/streaming.ts @@ -63,6 +63,11 @@ export type StreamOptions = { * support streaming. */ readonly connectionMode?: StreamingConnectionMode; + + /** + * The signal to abort the request. + */ + readonly signal?: AbortSignal; }; const EVENT_STREAM_TIMEOUT = 15 * 1000; @@ -129,6 +134,14 @@ export class FalStream { reject(error); }); }); + // if a abort signal was passed, sync it with the internal one + if (options.signal) { + options.signal.addEventListener("abort", () => { + this.abortController.abort(); + }); + } + + // start the streaming request this.start().catch(this.handleError); } @@ -345,6 +358,10 @@ export class FalStream { /** * Gets the `AbortSignal` instance that can be used to listen for abort events. + * + * **Note:** this signal is internal to the `FalStream` instance. If you pass your + * own abort signal, the `FalStream` will listen to it and abort it appropriately. + * * @returns the `AbortSignal` instance. * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal */ diff --git a/libs/client/src/types/common.ts b/libs/client/src/types/common.ts index d51b4e8..a7efb77 100644 --- a/libs/client/src/types/common.ts +++ b/libs/client/src/types/common.ts @@ -22,6 +22,11 @@ export type RunOptions = { * The HTTP method, defaults to `post`; */ readonly method?: "get" | "post" | "put" | "delete" | string; + + /** + * The abort signal to cancel the request. + */ + readonly abortSignal?: AbortSignal; }; export type UrlOptions = {