From 3542c9227dd197d04017dba86ebfb5693bbf1fd2 Mon Sep 17 00:00:00 2001 From: David Corbitt Date: Tue, 19 Dec 2023 21:59:00 -0800 Subject: [PATCH 1/3] Add support for streaming --- src/adapters/node-http/core.ts | 37 +++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/adapters/node-http/core.ts b/src/adapters/node-http/core.ts index 060c8d2a..c8c2f020 100644 --- a/src/adapters/node-http/core.ts +++ b/src/adapters/node-http/core.ts @@ -64,13 +64,40 @@ export const createOpenApiNodeHttpHandler = < body: OpenApiResponse | undefined, ) => { res.statusCode = statusCode; - res.setHeader('Content-Type', 'application/json'); - for (const [key, value] of Object.entries(headers)) { - if (typeof value !== 'undefined') { - res.setHeader(key, value); + + if (body instanceof ReadableStream) { + const reader = body.getReader(); + + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + + const processStream = async (reader: ReadableStreamDefaultReader, res: TResponse) => { + try { + let done, value; + do { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + ({ done, value } = await reader.read()); + if (!done) res.write(value); + } while (!done); + } catch (error) { + console.error('Error while reading from stream', error); + } finally { + reader.releaseLock(); + res.end(); + } + }; + + void processStream(reader, res); + } else { + res.setHeader('Content-Type', 'application/json'); + for (const [key, value] of Object.entries(headers)) { + if (typeof value !== 'undefined') { + res.setHeader(key, value); + } } + res.end(JSON.stringify(body)); } - res.end(JSON.stringify(body)); }; const method = req.method! as OpenApiMethod & 'HEAD'; From 907beb3a04b328ab1acc02a1f947c85e43b303bf Mon Sep 17 00:00:00 2001 From: David Corbitt Date: Thu, 21 Dec 2023 01:57:34 -0800 Subject: [PATCH 2/3] Support more types of SSE streams --- src/adapters/node-http/core.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/adapters/node-http/core.ts b/src/adapters/node-http/core.ts index c8c2f020..ceef3211 100644 --- a/src/adapters/node-http/core.ts +++ b/src/adapters/node-http/core.ts @@ -65,7 +65,8 @@ export const createOpenApiNodeHttpHandler = < ) => { res.statusCode = statusCode; - if (body instanceof ReadableStream) { + // Support sending SSE streams + if (body && typeof body.getReader === 'function') { const reader = body.getReader(); res.setHeader('Connection', 'keep-alive'); @@ -78,7 +79,7 @@ export const createOpenApiNodeHttpHandler = < do { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment ({ done, value } = await reader.read()); - if (!done) res.write(value); + if (!done) res.write(`data: ${Buffer.from(value.buffer).toString()}\n\n`); } while (!done); } catch (error) { console.error('Error while reading from stream', error); From 5e27b1e1af3b63630a82f3ea6178cebcc13b3bc4 Mon Sep 17 00:00:00 2001 From: David Corbitt Date: Thu, 22 Feb 2024 18:23:26 -0800 Subject: [PATCH 3/3] Flush data and headers for streamed requests --- src/adapters/node-http/core.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/adapters/node-http/core.ts b/src/adapters/node-http/core.ts index ceef3211..618c9b25 100644 --- a/src/adapters/node-http/core.ts +++ b/src/adapters/node-http/core.ts @@ -72,6 +72,7 @@ export const createOpenApiNodeHttpHandler = < res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); + res.flushHeaders(); const processStream = async (reader: ReadableStreamDefaultReader, res: TResponse) => { try { @@ -79,7 +80,11 @@ export const createOpenApiNodeHttpHandler = < do { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment ({ done, value } = await reader.read()); - if (!done) res.write(`data: ${Buffer.from(value.buffer).toString()}\n\n`); + if (!done) { + res.write(`data: ${Buffer.from(value.buffer).toString()}\n\n`); + // @ts-expect-error flush is not in the types + res.flush(); + } } while (!done); } catch (error) { console.error('Error while reading from stream', error);