-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastapi_helpers.py
33 lines (30 loc) · 1.24 KB
/
fastapi_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from fastapi.responses import StreamingResponse
from fastapi.encoders import jsonable_encoder
import typing, json
# Helper that takes a stream of objects and streams using "Server-sent events"
SyncJsonStream = typing.Iterator[typing.Any]
AsyncJsonStream = typing.AsyncIterable[typing.Any]
JsonStream = typing.Union[AsyncJsonStream, SyncJsonStream]
class StreamingJSONResponse(StreamingResponse):
def __init__(
self,
content: JsonStream,
**kw
) -> None:
async def json_iterator():
if isinstance(content, typing.AsyncIterable):
iter = content
else:
from starlette.concurrency import iterate_in_threadpool
iter = iterate_in_threadpool(content)
async for chunk in iter:
text = json.dumps(jsonable_encoder(chunk),
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
)
text = "data: " + text + "\n\n"
yield text.encode("utf-8")
yield "data: [DONE]".encode("utf-8")
super().__init__(content=json_iterator(), media_type="text/event-stream", **kw)