Skip to content

Commit 4ad227c

Browse files
authored
Merge pull request #1643 from pipecat-ai/mb/gladia-keepalive
Add a keepalive task to GladiaSTTService
2 parents db7d7a4 + 7d65132 commit 4ad227c

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
the Telnyx call when an `EndFrame` or `CancelFrame` is received. It is
1414
enabled by default and is configurable via the `auto_hang_up` `InputParam`.
1515

16+
- Added a keepalive task to `GladiaSTTService` to prevent the websocket from
17+
disconnecting after 30 seconds of no audio input.
18+
1619
### Changed
1720

1821
- In `TwilioFrameSerializer`, `call_sid` is Optional so as to avoid a breaking

src/pipecat/services/gladia/stt.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# SPDX-License-Identifier: BSD 2-Clause License
55
#
66

7+
import asyncio
78
import base64
89
import json
910
import warnings
@@ -224,6 +225,7 @@ def __init__(
224225
self._params = params
225226
self._websocket = None
226227
self._receive_task = None
228+
self._keepalive_task = None
227229

228230
def language_to_service_language(self, language: Language) -> Optional[str]:
229231
"""Convert pipecat Language enum to Gladia's language code."""
@@ -287,22 +289,38 @@ async def start(self, frame: StartFrame):
287289
self._websocket = await websockets.connect(response["url"])
288290
if self._websocket and not self._receive_task:
289291
self._receive_task = self.create_task(self._receive_task_handler())
292+
if self._websocket and not self._keepalive_task:
293+
self._keepalive_task = self.create_task(self._keepalive_task_handler())
290294

291295
async def stop(self, frame: EndFrame):
292296
"""Stop the Gladia STT websocket connection."""
293297
await super().stop(frame)
294298
await self._send_stop_recording()
299+
300+
if self._keepalive_task:
301+
await self.cancel_task(self._keepalive_task)
302+
self._keepalive_task = None
303+
295304
if self._websocket:
296305
await self._websocket.close()
297306
self._websocket = None
307+
298308
if self._receive_task:
299309
await self.wait_for_task(self._receive_task)
300310
self._receive_task = None
301311

302312
async def cancel(self, frame: CancelFrame):
303313
"""Cancel the Gladia STT websocket connection."""
304314
await super().cancel(frame)
305-
await self._websocket.close()
315+
316+
if self._keepalive_task:
317+
await self.cancel_task(self._keepalive_task)
318+
self._keepalive_task = None
319+
320+
if self._websocket:
321+
await self._websocket.close()
322+
self._websocket = None
323+
306324
if self._receive_task:
307325
await self.cancel_task(self._receive_task)
308326
self._receive_task = None
@@ -341,6 +359,24 @@ async def _send_stop_recording(self):
341359
if self._websocket and not self._websocket.closed:
342360
await self._websocket.send(json.dumps({"type": "stop_recording"}))
343361

362+
async def _keepalive_task_handler(self):
363+
"""Send periodic empty audio chunks to keep the connection alive."""
364+
try:
365+
while True:
366+
# Send keepalive every 20 seconds (Gladia times out after 30 seconds)
367+
await asyncio.sleep(20)
368+
if self._websocket and not self._websocket.closed:
369+
# Send an empty audio chunk as keepalive
370+
empty_audio = b""
371+
await self._send_audio(empty_audio)
372+
else:
373+
logger.debug("Websocket closed, stopping keepalive")
374+
break
375+
except websockets.exceptions.ConnectionClosed:
376+
logger.debug("Connection closed during keepalive")
377+
except Exception as e:
378+
logger.error(f"Error in Gladia keepalive task: {e}")
379+
344380
async def _receive_task_handler(self):
345381
try:
346382
async for message in self._websocket:

0 commit comments

Comments
 (0)