Skip to content

Commit 2089e0c

Browse files
authored
Merge pull request #1768 from pipecat-ai/mb/update-observers
Add DebugLogObserver
2 parents fb5438e + 9e0b4fe commit 2089e0c

File tree

5 files changed

+245
-8
lines changed

5 files changed

+245
-8
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Added `DebugLogObserver` for detailed frame logging with configurable
13+
filtering by frame type and endpoint. This observer automatically extracts
14+
and formats all frame data fields for debug logging.
15+
1216
- `UserImageRequestFrame.video_source` field has been added to request an image
1317
from the desired video source.
1418

examples/foundational/30-observer.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,34 @@
1414
from pipecat.frames.frames import (
1515
BotStartedSpeakingFrame,
1616
BotStoppedSpeakingFrame,
17+
EndFrame,
1718
StartInterruptionFrame,
19+
TTSTextFrame,
20+
UserStartedSpeakingFrame,
1821
)
1922
from pipecat.observers.base_observer import BaseObserver, FramePushed
23+
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
2024
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
2125
from pipecat.pipeline.pipeline import Pipeline
2226
from pipecat.pipeline.runner import PipelineRunner
2327
from pipecat.pipeline.task import PipelineParams, PipelineTask
24-
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
28+
from pipecat.processors.aggregators.openai_llm_context import (
29+
OpenAILLMContext,
30+
)
2531
from pipecat.processors.frame_processor import FrameDirection
2632
from pipecat.services.cartesia.tts import CartesiaTTSService
2733
from pipecat.services.deepgram.stt import DeepgramSTTService
2834
from pipecat.services.openai.llm import OpenAILLMService
35+
from pipecat.transports.base_input import BaseInputTransport
36+
from pipecat.transports.base_output import BaseOutputTransport
2937
from pipecat.transports.base_transport import TransportParams
3038
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
3139
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
3240

3341
load_dotenv(override=True)
3442

3543

36-
class DebugObserver(BaseObserver):
44+
class CustomObserver(BaseObserver):
3745
"""Observer to log interruptions and bot speaking events to the console.
3846
3947
Logs all frame instances of:
@@ -58,7 +66,7 @@ async def on_push_frame(self, data: FramePushed):
5866
# Create direction arrow
5967
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
6068

61-
if isinstance(frame, StartInterruptionFrame):
69+
if isinstance(frame, StartInterruptionFrame) and isinstance(src, BaseOutputTransport):
6270
logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
6371
elif isinstance(frame, BotStartedSpeakingFrame):
6472
logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
@@ -117,7 +125,17 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
117125
enable_usage_metrics=True,
118126
report_only_initial_ttfb=True,
119127
),
120-
observers=[DebugObserver(), LLMLogObserver()],
128+
observers=[
129+
CustomObserver(),
130+
LLMLogObserver(),
131+
DebugLogObserver(
132+
frame_types={
133+
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.DESTINATION),
134+
UserStartedSpeakingFrame: (BaseInputTransport, FrameEndpoint.SOURCE),
135+
EndFrame: None,
136+
}
137+
),
138+
],
121139
)
122140

123141
@transport.event_handler("on_client_connected")
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#
2+
# Copyright (c) 2024–2025, Daily
3+
#
4+
# SPDX-License-Identifier: BSD 2-Clause License
5+
#
6+
7+
from dataclasses import fields, is_dataclass
8+
from enum import Enum, auto
9+
from typing import Dict, Optional, Set, Tuple, Type, Union
10+
11+
from loguru import logger
12+
13+
from pipecat.frames.frames import Frame
14+
from pipecat.observers.base_observer import BaseObserver, FramePushed
15+
from pipecat.processors.frame_processor import FrameDirection
16+
17+
18+
class FrameEndpoint(Enum):
19+
"""Specifies which endpoint (source or destination) to filter on."""
20+
21+
SOURCE = auto()
22+
DESTINATION = auto()
23+
24+
25+
class DebugLogObserver(BaseObserver):
26+
"""Observer that logs frame activity with detailed content to the console.
27+
28+
Automatically extracts and formats data from any frame type, making it useful
29+
for debugging pipeline behavior without needing frame-specific observers.
30+
31+
Args:
32+
frame_types: Optional tuple of frame types to log, or a dict with frame type
33+
filters. If None, logs all frame types.
34+
exclude_fields: Optional set of field names to exclude from logging.
35+
36+
Examples:
37+
Log all frames from all services:
38+
```python
39+
observers = DebugLogObserver()
40+
```
41+
42+
Log specific frame types from any source/destination:
43+
```python
44+
from pipecat.frames.frames import TranscriptionFrame, InterimTranscriptionFrame
45+
observers = DebugLogObserver(frame_types=(TranscriptionFrame, InterimTranscriptionFrame))
46+
```
47+
48+
Log frames with specific source/destination filters:
49+
```python
50+
from pipecat.frames.frames import StartInterruptionFrame, UserStartedSpeakingFrame, LLMTextFrame
51+
from pipecat.transports.base_output_transport import BaseOutputTransport
52+
from pipecat.services.stt_service import STTService
53+
54+
observers = DebugLogObserver(frame_types={
55+
# Only log StartInterruptionFrame when source is BaseOutputTransport
56+
StartInterruptionFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
57+
58+
# Only log UserStartedSpeakingFrame when destination is STTService
59+
UserStartedSpeakingFrame: (STTService, FrameEndpoint.DESTINATION),
60+
61+
# Log LLMTextFrame regardless of source or destination type
62+
LLMTextFrame: None
63+
})
64+
```
65+
"""
66+
67+
def __init__(
68+
self,
69+
frame_types: Optional[
70+
Union[Tuple[Type[Frame], ...], Dict[Type[Frame], Optional[Tuple[Type, FrameEndpoint]]]]
71+
] = None,
72+
exclude_fields: Optional[Set[str]] = None,
73+
):
74+
"""Initialize the debug log observer.
75+
76+
Args:
77+
frame_types: Tuple of frame types to log, or a dict mapping frame types to
78+
filter configurations. Filter configs can be:
79+
- None to log all instances of the frame type
80+
- A tuple of (service_type, endpoint) to filter on a specific service
81+
and endpoint (SOURCE or DESTINATION)
82+
If None is provided instead of a tuple/dict, log all frames.
83+
exclude_fields: Set of field names to exclude from logging. If None, only binary
84+
data fields are excluded.
85+
"""
86+
# Process frame filters
87+
self.frame_filters = {}
88+
89+
if frame_types is not None:
90+
if isinstance(frame_types, tuple):
91+
# Tuple of frame types - log all instances
92+
self.frame_filters = {frame_type: None for frame_type in frame_types}
93+
else:
94+
# Dict of frame types with filters
95+
self.frame_filters = frame_types
96+
97+
# By default, exclude binary data fields that would clutter logs
98+
self.exclude_fields = (
99+
exclude_fields
100+
if exclude_fields is not None
101+
else {
102+
"audio", # Skip binary audio data
103+
"image", # Skip binary image data
104+
"images", # Skip lists of images
105+
}
106+
)
107+
108+
def _format_value(self, value):
109+
"""Format a value for logging.
110+
111+
Args:
112+
value: The value to format.
113+
114+
Returns:
115+
str: A string representation of the value suitable for logging.
116+
"""
117+
if value is None:
118+
return "None"
119+
elif isinstance(value, str):
120+
return f"{value!r}"
121+
elif isinstance(value, (list, tuple)):
122+
if len(value) == 0:
123+
return "[]"
124+
if isinstance(value[0], dict) and len(value) > 3:
125+
# For message lists, just show count
126+
return f"{len(value)} items"
127+
return str(value)
128+
elif isinstance(value, (bytes, bytearray)):
129+
return f"{len(value)} bytes"
130+
elif hasattr(value, "get_messages_for_logging") and callable(
131+
getattr(value, "get_messages_for_logging")
132+
):
133+
# Special case for OpenAI context
134+
return f"{value.__class__.__name__} with messages: {value.get_messages_for_logging()}"
135+
else:
136+
return str(value)
137+
138+
def _should_log_frame(self, frame, src, dst):
139+
"""Determine if a frame should be logged based on filters.
140+
141+
Args:
142+
frame: The frame being processed
143+
src: The source component
144+
dst: The destination component
145+
146+
Returns:
147+
bool: True if the frame should be logged, False otherwise
148+
"""
149+
# If no filters, log all frames
150+
if not self.frame_filters:
151+
return True
152+
153+
# Check if this frame type is in our filters
154+
for frame_type, filter_config in self.frame_filters.items():
155+
if isinstance(frame, frame_type):
156+
# If filter is None, log all instances of this frame type
157+
if filter_config is None:
158+
return True
159+
160+
# Otherwise, check the specific filter
161+
service_type, endpoint = filter_config
162+
163+
if endpoint == FrameEndpoint.SOURCE:
164+
return isinstance(src, service_type)
165+
elif endpoint == FrameEndpoint.DESTINATION:
166+
return isinstance(dst, service_type)
167+
168+
return False
169+
170+
async def on_push_frame(self, data: FramePushed):
171+
"""Process a frame being pushed into the pipeline.
172+
173+
Logs frame details to the console with all relevant fields and values.
174+
175+
Args:
176+
data: Event data containing the frame, source, destination, direction, and timestamp.
177+
"""
178+
src = data.source
179+
dst = data.destination
180+
frame = data.frame
181+
direction = data.direction
182+
timestamp = data.timestamp
183+
184+
# Check if we should log this frame
185+
if not self._should_log_frame(frame, src, dst):
186+
return
187+
188+
# Format direction arrow
189+
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
190+
191+
time_sec = timestamp / 1_000_000_000
192+
class_name = frame.__class__.__name__
193+
194+
# Build frame representation
195+
frame_details = []
196+
197+
# If dataclass, extract fields
198+
if is_dataclass(frame):
199+
for field in fields(frame):
200+
if field.name in self.exclude_fields:
201+
continue
202+
203+
value = getattr(frame, field.name)
204+
if value is None:
205+
continue
206+
207+
formatted_value = self._format_value(value)
208+
frame_details.append(f"{field.name}: {formatted_value}")
209+
210+
# Format the message
211+
if frame_details:
212+
details = ", ".join(frame_details)
213+
message = f"{class_name} {details} at {time_sec:.2f}s"
214+
else:
215+
message = f"{class_name} at {time_sec:.2f}s"
216+
217+
# Log the message
218+
logger.debug(f"{src} {arrow} {dst}: {message}")

src/pipecat/observers/loggers/llm_log_observer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from loguru import logger
88

99
from pipecat.frames.frames import (
10-
Frame,
1110
FunctionCallInProgressFrame,
1211
FunctionCallResultFrame,
1312
LLMFullResponseEndFrame,
@@ -17,7 +16,7 @@
1716
)
1817
from pipecat.observers.base_observer import BaseObserver, FramePushed
1918
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
20-
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
19+
from pipecat.processors.frame_processor import FrameDirection
2120
from pipecat.services.llm_service import LLMService
2221

2322

src/pipecat/observers/loggers/transcription_log_observer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
from loguru import logger
88

99
from pipecat.frames.frames import (
10-
Frame,
1110
InterimTranscriptionFrame,
1211
TranscriptionFrame,
1312
)
1413
from pipecat.observers.base_observer import BaseObserver, FramePushed
15-
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
1614
from pipecat.services.stt_service import STTService
1715

1816

0 commit comments

Comments
 (0)