-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmodel_openai.py
81 lines (67 loc) · 2.52 KB
/
model_openai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import base64
import contextlib
from typing import AsyncGenerator, AsyncIterator
from openai import AsyncOpenAI
from av import AudioFrame, AudioResampler
from PIL.Image import Image
import io
from model import Model, Input, Output
SAMPLE_RATE = 24000
AUDIO_PTIME = 0.02
class OpenAI(Model):
def __init__(self, session):
self.session = session
self.resampler = AudioResampler(
format="s16",
layout="mono",
rate=SAMPLE_RATE,
frame_size=int(SAMPLE_RATE * AUDIO_PTIME),
)
async def send(self, input: Input):
if isinstance(input, str):
await self.session.conversation.item.create(
item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Say hello!"}],
}
)
await self.session.response.create()
elif isinstance(input, AudioFrame):
for frame in self.resampler.resample(input):
data = frame.to_ndarray().tobytes()
audio = base64.b64encode(data).decode("utf-8")
await self.session.input_audio_buffer.append(audio=audio)
elif isinstance(input, Image):
array = io.BytesIO()
input.save(array, format="JPEG")
video = base64.b64encode(array.getvalue()).decode("utf-8")
await self.session.conversation.item.create(
item={
"type": "message",
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{video}"},
}
],
}
)
async def recv(self) -> AsyncIterator[Output]:
async for event in self.session:
if event.type == "response.audio.delta":
data = base64.b64decode(event.delta)
frame = AudioFrame(format="s16", layout="mono", samples=len(data) / 2)
frame.sample_rate = SAMPLE_RATE
frame.planes[0].update(data)
yield frame
async def close(self):
await self.session.close()
client = AsyncOpenAI()
@contextlib.asynccontextmanager
async def connect_openai() -> AsyncGenerator[OpenAI, None]:
async with client.beta.realtime.connect(
model="gpt-4o-realtime-preview-2024-10-01"
) as conn:
yield OpenAI(conn)