From fd12d001a5f0e472842258a200888578f7886015 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Wed, 3 Apr 2024 11:28:43 -0700 Subject: [PATCH 1/4] feat: support app id with namespaces --- .../app/whisper/realtime/page.tsx | 299 ++++++++++++++++++ libs/client/package.json | 2 +- libs/client/src/auth.ts | 6 +- libs/client/src/function.ts | 10 +- libs/client/src/realtime.ts | 6 +- libs/client/src/utils.spec.ts | 40 ++- libs/client/src/utils.ts | 29 ++ 7 files changed, 379 insertions(+), 13 deletions(-) create mode 100644 apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx diff --git a/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx b/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx new file mode 100644 index 0000000..3bffa3c --- /dev/null +++ b/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx @@ -0,0 +1,299 @@ +'use client'; + +import * as fal from '@fal-ai/serverless-client'; +import { useCallback, useMemo, useRef, useState } from 'react'; // Add useRef here + +fal.config({ + // credentials: 'FAL_KEY_ID:FAL_KEY_SECRET', + proxyUrl: '/api/fal/proxy', +}); + +type ErrorProps = { + error: any; +}; + +function Error(props: ErrorProps) { + if (!props.error) { + return null; + } + return ( +
+ Error {props.error.message} +
+ ); +} + +type RecorderOptions = { + maxDuration?: number; + onChunk?: (chunk: Blob) => void; + sendInterval?: number; // Add this line +}; + +function useMediaRecorder({ + maxDuration = 20000, + onChunk, + sendInterval = 1000, // Add this line +}: RecorderOptions = {}) { + const [isRecording, setIsRecording] = useState(false); + const [mediaRecorder, setMediaRecorder] = useState( + null + ); + const accumulatedChunks = useRef([]); // Use a ref to accumulate chunks + + const sendAccumulatedData = useCallback(() => { + // Convert accumulated chunks to a single blob + const audioBlob = new Blob(accumulatedChunks.current, { + type: 'audio/wav', + }); + // Optionally, here you can slice the Blob if you want to send data in smaller pieces + // For example: audioBlob.slice(startByte, endByte) + if (onChunk) { + onChunk(audioBlob); + } + }, [onChunk]); + + const record = useCallback((): Promise => { + setIsRecording(true); + accumulatedChunks.current = []; // Reset accumulated chunks + return new Promise(async (resolve, reject) => { + // Explicitly type the Promise here + try { + const stream = await navigator.mediaDevices.getUserMedia({ + audio: true, + }); + const recorder = new MediaRecorder(stream); + setMediaRecorder(recorder); + + recorder.addEventListener('dataavailable', (event) => { + accumulatedChunks.current.push(event.data); + }); + + recorder.addEventListener('stop', () => { + const audioBlob = new Blob(accumulatedChunks.current, { + type: 'audio/wav', + }); + const audioFile = new File( + [audioBlob], + `recording_${Date.now()}.wav`, + { type: 'audio/wav' } + ); + + sendAccumulatedData(); // Ensure final data is sent + setIsRecording(false); + resolve(audioFile); // Resolve the promise with the audio file + }); + + recorder.start(1000); // Configure how often you get 'dataavailable' events + + // Periodically send accumulated data + const intervalId = setInterval(() => { + sendAccumulatedData(); + }, sendInterval); + + setTimeout(() => { + clearInterval(intervalId); // Stop the interval when recording stops + recorder.stop(); + recorder.stream.getTracks().forEach((track) => track.stop()); + }, maxDuration); + } catch (error) { + reject(error); + } + }); + }, [maxDuration, sendAccumulatedData, sendInterval]); + + // Stop recording logic remains the same + const stopRecording = useCallback(() => { + setIsRecording(false); + mediaRecorder?.stop(); + mediaRecorder?.stream.getTracks().forEach((track) => track.stop()); + }, [mediaRecorder]); + + return { record, stopRecording, isRecording }; +} + +interface RealTimeOutput { + // Define the structure of your real-time output + // Example: + message: string; +} + +export default function WhisperDemo() { + const [realTimeOutputs, setRealTimeOutputs] = useState([]); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [logs, setLogs] = useState([]); + const [audioFile, setAudioFile] = useState(null); + const [result, setResult] = useState(null); // eslint-disable-line @typescript-eslint/no-explicit-any + const [elapsedTime, setElapsedTime] = useState(0); + const { send } = fal.realtime.connect('20128202/rtw', { + connectionKey: 'realtime-demo', + throttleInterval: 128, + onResult(result) { + console.log('result', result); + handleNewOutput({ message: result.output }); + }, + }); + const accumulatedChunksRef = useRef(new Uint8Array()); // To store accumulated chunks + + const { record, stopRecording, isRecording } = useMediaRecorder({ + onChunk: async (chunk) => { + const buffer = await chunk.arrayBuffer(); + const newChunk = new Uint8Array(buffer); + + // Accumulate chunks + const accumulatedChunks = new Uint8Array( + accumulatedChunksRef.current.length + newChunk.length + ); + accumulatedChunks.set(accumulatedChunksRef.current, 0); + accumulatedChunks.set(newChunk, accumulatedChunksRef.current.length); + accumulatedChunksRef.current = accumulatedChunks; + + console.log('Accumulated chunk', accumulatedChunks); + // Send the accumulated chunks using your `send` method + send({ content: accumulatedChunks }); + }, + }); + + const handleNewOutput = (newOutput: RealTimeOutput) => { + // Use `any` if you don't have a specific type + // This could be a transcription chunk, analysis result, etc. + setRealTimeOutputs((prevOutputs) => [...prevOutputs, newOutput]); + }; + + const reset = () => { + setLoading(false); + setError(null); + setLogs([]); + setElapsedTime(0); + setResult(null); + }; + + const audioFileLocalUrl = useMemo(() => { + if (!audioFile) { + return null; + } + return URL.createObjectURL(audioFile); + }, [audioFile]); + + const transcribeAudio = async (audioFile: File) => { + reset(); + setLoading(true); + const start = Date.now(); + try { + const result = await fal.subscribe('fal-ai/whisper', { + input: { + file_name: 'recording.wav', + audio_url: audioFile, + }, + pollInterval: 1000, + logs: true, + onQueueUpdate(update) { + setElapsedTime(Date.now() - start); + if ( + update.status === 'IN_PROGRESS' || + update.status === 'COMPLETED' + ) { + setLogs((update.logs || []).map((log) => log.message)); + } + }, + }); + setResult(result); + } catch (error: any) { + setError(error); + } finally { + setLoading(false); + setElapsedTime(Date.now() - start); + } + }; + return ( +
+
+

+ Hello fal and{' '} + whisper +

+ +
+ + +
+ + {audioFileLocalUrl && ( +
+
+ )} + + + + {/* Real-Time Outputs Section */} +
+

Real-Time Outputs

+
+ {realTimeOutputs.map((output, index) => ( +

{output.message}

// Ensure 'message' is the correct property + ))} +
+
+ + {/* JSON Result Section */} +
+
+

JSON Result

+

+ {`Elapsed Time (seconds): ${(elapsedTime / 1000).toFixed(2)}`} +

+
+              {result
+                ? JSON.stringify(result, null, 2)
+                : '// result pending...'}
+            
+
+ +
+

Logs

+
+              {logs.filter(Boolean).join('\n')}
+            
+
+
+
+
+ ); +} diff --git a/libs/client/package.json b/libs/client/package.json index 364dfeb..ccd43e1 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.9.0", + "version": "0.9.1-alpha.0", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/auth.ts b/libs/client/src/auth.ts index 6c748b3..403f27c 100644 --- a/libs/client/src/auth.ts +++ b/libs/client/src/auth.ts @@ -1,6 +1,6 @@ import { getRestApiUrl } from './config'; import { dispatchRequest } from './request'; -import { ensureAppIdFormat } from './utils'; +import { parseAppId } from './utils'; export const TOKEN_EXPIRATION_SECONDS = 120; @@ -8,12 +8,12 @@ export const TOKEN_EXPIRATION_SECONDS = 120; * Get a token to connect to the realtime endpoint. */ export async function getTemporaryAuthToken(app: string): Promise { - const [, appAlias] = ensureAppIdFormat(app).split('/'); + const appId = parseAppId(app); const token: string | object = await dispatchRequest( 'POST', `${getRestApiUrl()}/tokens/`, { - allowed_apps: [appAlias], + allowed_apps: [appId.alias], token_expiration: TOKEN_EXPIRATION_SECONDS, } ); diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index 1be73c5..7a4ff55 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -1,7 +1,7 @@ import { dispatchRequest } from './request'; import { storageImpl } from './storage'; import { EnqueueResult, QueueStatus } from './types'; -import { ensureAppIdFormat, isUUIDv4, isValidUrl } from './utils'; +import { ensureAppIdFormat, isUUIDv4, isValidUrl, parseAppId } from './utils'; /** * The function input and other configuration when running @@ -284,8 +284,8 @@ export const queue: Queue = { id: string, { requestId, logs = false }: QueueStatusOptions ): Promise { - const [appOwner, appAlias] = ensureAppIdFormat(id).split('/'); - return send(`${appOwner}/${appAlias}`, { + const appId = parseAppId(id); + return send(`${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}/status`, @@ -298,8 +298,8 @@ export const queue: Queue = { id: string, { requestId }: BaseQueueOptions ): Promise { - const [appOwner, appAlias] = ensureAppIdFormat(id).split('/'); - return send(`${appOwner}/${appAlias}`, { + const appId = parseAppId(id); + return send(`${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}`, diff --git a/libs/client/src/realtime.ts b/libs/client/src/realtime.ts index 9b66690..3bb65bd 100644 --- a/libs/client/src/realtime.ts +++ b/libs/client/src/realtime.ts @@ -16,7 +16,7 @@ import uuid from 'uuid-random'; import { TOKEN_EXPIRATION_SECONDS, getTemporaryAuthToken } from './auth'; import { ApiError } from './response'; import { isBrowser } from './runtime'; -import { ensureAppIdFormat, isReact, throttle } from './utils'; +import { ensureAppIdFormat, isReact, parseAppId, throttle } from './utils'; // Define the context interface Context { @@ -273,9 +273,9 @@ function buildRealtimeUrl( queryParams.set('max_buffering', maxBuffering.toFixed(0)); } const appId = ensureAppIdFormat(app); - const [, appAlias] = ensureAppIdFormat(app).split('/'); + const { alias } = parseAppId(appId); const suffix = - LEGACY_APPS.includes(appAlias) || !app.includes('/') ? 'ws' : 'realtime'; + LEGACY_APPS.includes(alias) || !app.includes('/') ? 'ws' : 'realtime'; return `wss://fal.run/${appId}/${suffix}?${queryParams.toString()}`; } diff --git a/libs/client/src/utils.spec.ts b/libs/client/src/utils.spec.ts index 443a11e..dcd6d58 100644 --- a/libs/client/src/utils.spec.ts +++ b/libs/client/src/utils.spec.ts @@ -1,5 +1,5 @@ import uuid from 'uuid-random'; -import { ensureAppIdFormat, isUUIDv4 } from './utils'; +import { ensureAppIdFormat, isUUIDv4, parseAppId } from './utils'; describe('The utils test suite', () => { it('should match a valid v4 uuid', () => { @@ -31,4 +31,42 @@ describe('The utils test suite', () => { const id = 'just-an-id'; expect(() => ensureAppIdFormat(id)).toThrowError(); }); + + it('should parse a legacy app id', () => { + const id = '12345-abcde-fgh'; + const parsed = parseAppId(id); + expect(parsed).toEqual({ + owner: '12345', + alias: 'abcde-fgh', + }); + }); + + it('should parse a current app id', () => { + const id = 'fal-ai/fast-sdxl'; + const parsed = parseAppId(id); + expect(parsed).toEqual({ + owner: 'fal-ai', + alias: 'fast-sdxl', + }); + }); + + it('should parse a current app id with path', () => { + const id = 'fal-ai/fast-sdxl/image-to-image'; + const parsed = parseAppId(id); + expect(parsed).toEqual({ + owner: 'fal-ai', + alias: 'fast-sdxl', + path: 'image-to-image', + }); + }); + + it('should parse a current app id with namespace', () => { + const id = 'workflows/fal-ai/fast-sdxl'; + const parsed = parseAppId(id); + expect(parsed).toEqual({ + owner: 'fal-ai', + alias: 'fast-sdxl', + namespace: 'workflows', + }); + }); }); diff --git a/libs/client/src/utils.ts b/libs/client/src/utils.ts index 5de1198..8f6fd22 100644 --- a/libs/client/src/utils.ts +++ b/libs/client/src/utils.ts @@ -21,6 +21,35 @@ export function ensureAppIdFormat(id: string): string { ); } +const APP_NAMESPACES = ['workflows'] as const; + +type AppNamespace = (typeof APP_NAMESPACES)[number]; + +export type AppId = { + readonly owner: string; + readonly alias: string; + readonly path?: string; + readonly namespace?: AppNamespace; +}; + +export function parseAppId(id: string): AppId { + const normalizedId = ensureAppIdFormat(id); + const parts = normalizedId.split('/'); + if (APP_NAMESPACES.includes(parts[0] as any)) { + return { + owner: parts[1], + alias: parts[2], + path: parts.slice(3).join('/') || undefined, + namespace: parts[0] as AppNamespace, + }; + } + return { + owner: parts[0], + alias: parts[1], + path: parts.slice(2).join('/') || undefined, + }; +} + export function isValidUrl(url: string) { try { const { host } = new URL(url); From 0bc2f6476e443f1d5a7806f52055d7f667ed89f2 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Wed, 3 Apr 2024 11:30:29 -0700 Subject: [PATCH 2/4] chore: remove unwanted page --- .../app/whisper/realtime/page.tsx | 299 ------------------ 1 file changed, 299 deletions(-) delete mode 100644 apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx diff --git a/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx b/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx deleted file mode 100644 index 3bffa3c..0000000 --- a/apps/demo-nextjs-app-router/app/whisper/realtime/page.tsx +++ /dev/null @@ -1,299 +0,0 @@ -'use client'; - -import * as fal from '@fal-ai/serverless-client'; -import { useCallback, useMemo, useRef, useState } from 'react'; // Add useRef here - -fal.config({ - // credentials: 'FAL_KEY_ID:FAL_KEY_SECRET', - proxyUrl: '/api/fal/proxy', -}); - -type ErrorProps = { - error: any; -}; - -function Error(props: ErrorProps) { - if (!props.error) { - return null; - } - return ( -
- Error {props.error.message} -
- ); -} - -type RecorderOptions = { - maxDuration?: number; - onChunk?: (chunk: Blob) => void; - sendInterval?: number; // Add this line -}; - -function useMediaRecorder({ - maxDuration = 20000, - onChunk, - sendInterval = 1000, // Add this line -}: RecorderOptions = {}) { - const [isRecording, setIsRecording] = useState(false); - const [mediaRecorder, setMediaRecorder] = useState( - null - ); - const accumulatedChunks = useRef([]); // Use a ref to accumulate chunks - - const sendAccumulatedData = useCallback(() => { - // Convert accumulated chunks to a single blob - const audioBlob = new Blob(accumulatedChunks.current, { - type: 'audio/wav', - }); - // Optionally, here you can slice the Blob if you want to send data in smaller pieces - // For example: audioBlob.slice(startByte, endByte) - if (onChunk) { - onChunk(audioBlob); - } - }, [onChunk]); - - const record = useCallback((): Promise => { - setIsRecording(true); - accumulatedChunks.current = []; // Reset accumulated chunks - return new Promise(async (resolve, reject) => { - // Explicitly type the Promise here - try { - const stream = await navigator.mediaDevices.getUserMedia({ - audio: true, - }); - const recorder = new MediaRecorder(stream); - setMediaRecorder(recorder); - - recorder.addEventListener('dataavailable', (event) => { - accumulatedChunks.current.push(event.data); - }); - - recorder.addEventListener('stop', () => { - const audioBlob = new Blob(accumulatedChunks.current, { - type: 'audio/wav', - }); - const audioFile = new File( - [audioBlob], - `recording_${Date.now()}.wav`, - { type: 'audio/wav' } - ); - - sendAccumulatedData(); // Ensure final data is sent - setIsRecording(false); - resolve(audioFile); // Resolve the promise with the audio file - }); - - recorder.start(1000); // Configure how often you get 'dataavailable' events - - // Periodically send accumulated data - const intervalId = setInterval(() => { - sendAccumulatedData(); - }, sendInterval); - - setTimeout(() => { - clearInterval(intervalId); // Stop the interval when recording stops - recorder.stop(); - recorder.stream.getTracks().forEach((track) => track.stop()); - }, maxDuration); - } catch (error) { - reject(error); - } - }); - }, [maxDuration, sendAccumulatedData, sendInterval]); - - // Stop recording logic remains the same - const stopRecording = useCallback(() => { - setIsRecording(false); - mediaRecorder?.stop(); - mediaRecorder?.stream.getTracks().forEach((track) => track.stop()); - }, [mediaRecorder]); - - return { record, stopRecording, isRecording }; -} - -interface RealTimeOutput { - // Define the structure of your real-time output - // Example: - message: string; -} - -export default function WhisperDemo() { - const [realTimeOutputs, setRealTimeOutputs] = useState([]); - const [loading, setLoading] = useState(false); - const [error, setError] = useState(null); - const [logs, setLogs] = useState([]); - const [audioFile, setAudioFile] = useState(null); - const [result, setResult] = useState(null); // eslint-disable-line @typescript-eslint/no-explicit-any - const [elapsedTime, setElapsedTime] = useState(0); - const { send } = fal.realtime.connect('20128202/rtw', { - connectionKey: 'realtime-demo', - throttleInterval: 128, - onResult(result) { - console.log('result', result); - handleNewOutput({ message: result.output }); - }, - }); - const accumulatedChunksRef = useRef(new Uint8Array()); // To store accumulated chunks - - const { record, stopRecording, isRecording } = useMediaRecorder({ - onChunk: async (chunk) => { - const buffer = await chunk.arrayBuffer(); - const newChunk = new Uint8Array(buffer); - - // Accumulate chunks - const accumulatedChunks = new Uint8Array( - accumulatedChunksRef.current.length + newChunk.length - ); - accumulatedChunks.set(accumulatedChunksRef.current, 0); - accumulatedChunks.set(newChunk, accumulatedChunksRef.current.length); - accumulatedChunksRef.current = accumulatedChunks; - - console.log('Accumulated chunk', accumulatedChunks); - // Send the accumulated chunks using your `send` method - send({ content: accumulatedChunks }); - }, - }); - - const handleNewOutput = (newOutput: RealTimeOutput) => { - // Use `any` if you don't have a specific type - // This could be a transcription chunk, analysis result, etc. - setRealTimeOutputs((prevOutputs) => [...prevOutputs, newOutput]); - }; - - const reset = () => { - setLoading(false); - setError(null); - setLogs([]); - setElapsedTime(0); - setResult(null); - }; - - const audioFileLocalUrl = useMemo(() => { - if (!audioFile) { - return null; - } - return URL.createObjectURL(audioFile); - }, [audioFile]); - - const transcribeAudio = async (audioFile: File) => { - reset(); - setLoading(true); - const start = Date.now(); - try { - const result = await fal.subscribe('fal-ai/whisper', { - input: { - file_name: 'recording.wav', - audio_url: audioFile, - }, - pollInterval: 1000, - logs: true, - onQueueUpdate(update) { - setElapsedTime(Date.now() - start); - if ( - update.status === 'IN_PROGRESS' || - update.status === 'COMPLETED' - ) { - setLogs((update.logs || []).map((log) => log.message)); - } - }, - }); - setResult(result); - } catch (error: any) { - setError(error); - } finally { - setLoading(false); - setElapsedTime(Date.now() - start); - } - }; - return ( -
-
-

- Hello fal and{' '} - whisper -

- -
- - -
- - {audioFileLocalUrl && ( -
-
- )} - - - - {/* Real-Time Outputs Section */} -
-

Real-Time Outputs

-
- {realTimeOutputs.map((output, index) => ( -

{output.message}

// Ensure 'message' is the correct property - ))} -
-
- - {/* JSON Result Section */} -
-
-

JSON Result

-

- {`Elapsed Time (seconds): ${(elapsedTime / 1000).toFixed(2)}`} -

-
-              {result
-                ? JSON.stringify(result, null, 2)
-                : '// result pending...'}
-            
-
- -
-

Logs

-
-              {logs.filter(Boolean).join('\n')}
-            
-
-
-
-
- ); -} From 2df0931e1feb6034e5c38b75ef83c3837a5c9f12 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Wed, 3 Apr 2024 12:18:43 -0700 Subject: [PATCH 3/4] fix: queue status url --- libs/client/package.json | 2 +- libs/client/src/function.ts | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/libs/client/package.json b/libs/client/package.json index ccd43e1..0680c6c 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.9.1-alpha.0", + "version": "0.9.1-alpha.1", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index 7a4ff55..51ee3fe 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -285,7 +285,8 @@ export const queue: Queue = { { requestId, logs = false }: QueueStatusOptions ): Promise { const appId = parseAppId(id); - return send(`${appId.owner}/${appId.alias}`, { + const prefix = appId.namespace ? `${appId.namespace}/` : ''; + return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}/status`, @@ -299,7 +300,8 @@ export const queue: Queue = { { requestId }: BaseQueueOptions ): Promise { const appId = parseAppId(id); - return send(`${appId.owner}/${appId.alias}`, { + const prefix = appId.namespace ? `${appId.namespace}/` : ''; + return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}`, From e73fd10a65eed8891a1be4a292119169863e287e Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Sat, 6 Apr 2024 01:17:53 -0700 Subject: [PATCH 4/4] chore: bump version for release --- libs/client/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/client/package.json b/libs/client/package.json index 0680c6c..35d4379 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.9.1-alpha.1", + "version": "0.9.1", "license": "MIT", "repository": { "type": "git",