Skip to content

Commit c3f3f46

Browse files
authored
Merge pull request #1413 from pipecat-ai/aleix/llm-user-aggregator-emulate-fixes
LLMUserContextAggregator: fix emulated user started/stopped speaking issues
2 parents de8a831 + b20ce7d commit c3f3f46

File tree

9 files changed

+33
-50
lines changed

9 files changed

+33
-50
lines changed

CHANGELOG.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5050
- Added new `sample_rate` constructor parameter to `TavusVideoService` to allow
5151
changing the output sample rate.
5252

53+
- Added new `NeuphonicTTSService`.
54+
(see https://neuphonic.com)
55+
5356
- Added new `UltravoxSTTService`.
5457
(see https://github.com/fixie-ai/ultravox)
5558

@@ -269,6 +272,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
269272

270273
- Add foundational example `07w-interruptible-fal.py`, showing `FalSTTService`.
271274

275+
- Added a new Ultravox example
276+
`examples/foundational/07u-interruptible-ultravox.py`.
277+
278+
- Added new Neuphonic examples
279+
`examples/foundational/07v-interruptible-neuphonic.py` and
280+
`examples/foundational/07v-interruptible-neuphonic-http.py`.
281+
272282
- Added a new example `examples/foundational/36-user-email-gathering.py` to show
273283
how to gather user emails. The example uses's Cartesia's `<spell></spell>`
274284
tags and Rime `spell()` function to spell out the emails for confirmation.
@@ -367,6 +377,9 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
367377

368378
### Fixed
369379

380+
- Fixed an issue that would cause undesired interruptions via
381+
`EmulateUserStartedSpeakingFrame`.
382+
370383
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
371384
audio in some cases.
372385

@@ -383,10 +396,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
383396

384397
- Fixed `match_endofsentence` support for ellipses.
385398

386-
- Fixed an issue that would cause undesired interruptions via
387-
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
388-
final transcriptions) where received.
389-
390399
- Fixed an issue where `EndTaskFrame` was not triggering
391400
`on_client_disconnected` or closing the WebSocket in FastAPI.
392401

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ Website = "https://pipecat.ai"
4242
anthropic = [ "anthropic~=0.49.0" ]
4343
assemblyai = [ "assemblyai~=0.37.0" ]
4444
aws = [ "boto3~=1.37.16" ]
45-
azure = [ "azure-cognitiveservices-speech~=1.43.0"]
45+
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
4646
canonical = [ "aiofiles~=24.1.0" ]
4747
cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ]
4848
neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ]

src/pipecat/processors/aggregators/llm_response.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#
66

77
import asyncio
8-
import time
98
from abc import abstractmethod
109
from typing import Dict, List
1110

@@ -222,24 +221,23 @@ def __init__(
222221
self,
223222
context: OpenAILLMContext,
224223
aggregation_timeout: float = 1.0,
225-
bot_interruption_timeout: float = 5.0,
226224
**kwargs,
227225
):
228226
super().__init__(context=context, role="user", **kwargs)
229227
self._aggregation_timeout = aggregation_timeout
230-
self._bot_interruption_timeout = bot_interruption_timeout
231228

232229
self._seen_interim_results = False
233230
self._user_speaking = False
234-
self._last_user_speaking_time = 0
235231
self._emulating_vad = False
232+
self._waiting_for_aggregation = False
236233

237234
self._aggregation_event = asyncio.Event()
238235
self._aggregation_task = None
239236

240237
def reset(self):
241238
super().reset()
242239
self._seen_interim_results = False
240+
self._waiting_for_aggregation = False
243241

244242
async def handle_aggregation(self, aggregation: str):
245243
self._context.add_message({"role": self.role, "content": self._aggregation})
@@ -285,14 +283,11 @@ async def push_aggregation(self):
285283

286284
# Reset the aggregation. Reset it before pushing it down, otherwise
287285
# if the tasks gets cancelled we won't be able to clear things up.
288-
self._aggregation = ""
286+
self.reset()
289287

290288
frame = OpenAILLMContextFrame(self._context)
291289
await self.push_frame(frame)
292290

293-
# Reset our accumulator state.
294-
self.reset()
295-
296291
async def _start(self, frame: StartFrame):
297292
self._create_aggregation_task()
298293

@@ -303,12 +298,14 @@ async def _cancel(self, frame: CancelFrame):
303298
await self._cancel_aggregation_task()
304299

305300
async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame):
306-
self._last_user_speaking_time = time.time()
307301
self._user_speaking = True
302+
self._waiting_for_aggregation = True
308303

309304
async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame):
310-
self._last_user_speaking_time = time.time()
311305
self._user_speaking = False
306+
# We just stopped speaking. Let's see if there's some aggregation to
307+
# push. If the last thing we saw is an interim transcription, let's wait
308+
# pushing the aggregation as we will probably get a final transcription.
312309
if not self._seen_interim_results:
313310
await self.push_aggregation()
314311

@@ -361,18 +358,13 @@ async def _maybe_push_bot_interruption(self):
361358
frame we might want to interrupt the bot.
362359
363360
"""
364-
if not self._user_speaking:
365-
diff_time = time.time() - self._last_user_speaking_time
366-
if diff_time > self._bot_interruption_timeout:
367-
# If we reach this case we received a transcription but VAD was
368-
# not able to detect voice (e.g. when you whisper a short
369-
# utterance). So, we need to emulate VAD (i.e. user
370-
# start/stopped speaking).
371-
await self.push_frame(EmulateUserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
372-
self._emulating_vad = True
373-
374-
# Reset time so we don't interrupt again right away.
375-
self._last_user_speaking_time = time.time()
361+
if not self._user_speaking and not self._waiting_for_aggregation:
362+
# If we reach this case we received a transcription but VAD was not
363+
# able to detect voice (e.g. when you whisper a short
364+
# utterance). So, we need to emulate VAD (i.e. user start/stopped
365+
# speaking).
366+
await self.push_frame(EmulateUserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
367+
self._emulating_vad = True
376368

377369

378370
class LLMAssistantContextAggregator(LLMContextResponseAggregator):
@@ -554,14 +546,11 @@ async def push_aggregation(self):
554546

555547
# Reset the aggregation. Reset it before pushing it down, otherwise
556548
# if the tasks gets cancelled we won't be able to clear things up.
557-
self._aggregation = ""
549+
self.reset()
558550

559551
frame = LLMMessagesFrame(self._context.messages)
560552
await self.push_frame(frame)
561553

562-
# Reset our accumulator state.
563-
self.reset()
564-
565554

566555
class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
567556
def __init__(self, messages: List[dict] = [], **kwargs):
@@ -573,10 +562,7 @@ async def push_aggregation(self):
573562

574563
# Reset the aggregation. Reset it before pushing it down, otherwise
575564
# if the tasks gets cancelled we won't be able to clear things up.
576-
self._aggregation = ""
565+
self.reset()
577566

578567
frame = LLMMessagesFrame(self._context.messages)
579568
await self.push_frame(frame)
580-
581-
# Reset our accumulator state.
582-
self.reset()

src/pipecat/transports/base_input.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ async def _handle_bot_interruption(self, frame: BotInterruptionFrame):
152152
async def _handle_user_interruption(self, frame: Frame):
153153
if isinstance(frame, UserStartedSpeakingFrame):
154154
logger.debug("User started speaking")
155+
await self.push_frame(frame)
155156
# Make sure we notify about interruptions quickly out-of-band.
156157
if self.interruptions_allowed:
157158
await self._start_interruption()
@@ -161,12 +162,11 @@ async def _handle_user_interruption(self, frame: Frame):
161162
await self.push_frame(StartInterruptionFrame())
162163
elif isinstance(frame, UserStoppedSpeakingFrame):
163164
logger.debug("User stopped speaking")
165+
await self.push_frame(frame)
164166
if self.interruptions_allowed:
165167
await self._stop_interruption()
166168
await self.push_frame(StopInterruptionFrame())
167169

168-
await self.push_frame(frame)
169-
170170
#
171171
# Audio input
172172
#

src/pipecat/transports/services/livekit.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -599,13 +599,6 @@ async def send_message_urgent(self, message: str, participant_id: Optional[str]
599599
)
600600
await self._output.send_message(frame)
601601

602-
async def cleanup(self):
603-
if self._input:
604-
await self._input.cleanup()
605-
if self._output:
606-
await self._output.cleanup()
607-
await self._client.disconnect()
608-
609602
async def on_room_event(self, event):
610603
# Handle room events
611604
pass

tests/test_context_aggregators.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444

4545
AGGREGATION_TIMEOUT = 0.1
4646
AGGREGATION_SLEEP = 0.15
47-
BOT_INTERRUPTION_TIMEOUT = 0.2
48-
BOT_INTERRUPTION_SLEEP = 0.25
4947

5048

5149
class BaseTestUserContextAggregator:
@@ -388,14 +386,13 @@ async def test_sie_delay_it(self):
388386
aggregator = self.AGGREGATOR_CLASS(
389387
context,
390388
aggregation_timeout=AGGREGATION_TIMEOUT,
391-
bot_interruption_timeout=BOT_INTERRUPTION_TIMEOUT,
392389
)
393390
frames_to_send = [
394391
UserStartedSpeakingFrame(),
395392
InterimTranscriptionFrame(text="How ", user_id="cat", timestamp=""),
396393
SleepFrame(),
397394
UserStoppedSpeakingFrame(),
398-
SleepFrame(BOT_INTERRUPTION_SLEEP),
395+
SleepFrame(AGGREGATION_SLEEP),
399396
InterimTranscriptionFrame(text="are you?", user_id="cat", timestamp=""),
400397
TranscriptionFrame(text="How are you?", user_id="cat", timestamp=""),
401398
SleepFrame(sleep=AGGREGATION_SLEEP),
@@ -405,12 +402,10 @@ async def test_sie_delay_it(self):
405402
UserStoppedSpeakingFrame,
406403
*self.EXPECTED_CONTEXT_FRAMES,
407404
]
408-
expected_up_frames = [EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame]
409405
await run_test(
410406
aggregator,
411407
frames_to_send=frames_to_send,
412408
expected_down_frames=expected_down_frames,
413-
expected_up_frames=expected_up_frames,
414409
)
415410
self.check_message_content(context, 0, "How are you?")
416411

0 commit comments

Comments
 (0)