@@ -49,10 +49,14 @@ def __init__(
49
49
protocol : Protocol ,
50
50
* ,
51
51
close_timeout : float | None = 10 ,
52
+ max_queue : int | tuple [int , int | None ] = 16 ,
52
53
) -> None :
53
54
self .socket = socket
54
55
self .protocol = protocol
55
56
self .close_timeout = close_timeout
57
+ if isinstance (max_queue , int ):
58
+ max_queue = (max_queue , None )
59
+ self .max_queue = max_queue
56
60
57
61
# Inject reference to this instance in the protocol's logger.
58
62
self .protocol .logger = logging .LoggerAdapter (
@@ -76,8 +80,15 @@ def __init__(
76
80
# Mutex serializing interactions with the protocol.
77
81
self .protocol_mutex = threading .Lock ()
78
82
83
+ # Lock stopping reads when the assembler buffer is full.
84
+ self .recv_flow_control = threading .Lock ()
85
+
79
86
# Assembler turning frames into messages and serializing reads.
80
- self .recv_messages = Assembler ()
87
+ self .recv_messages = Assembler (
88
+ * self .max_queue ,
89
+ pause = self .recv_flow_control .acquire ,
90
+ resume = self .recv_flow_control .release ,
91
+ )
81
92
82
93
# Whether we are busy sending a fragmented message.
83
94
self .send_in_progress = False
@@ -88,6 +99,10 @@ def __init__(
88
99
# Mapping of ping IDs to pong waiters, in chronological order.
89
100
self .ping_waiters : dict [bytes , threading .Event ] = {}
90
101
102
+ # Exception raised in recv_events, to be chained to ConnectionClosed
103
+ # in the user thread in order to show why the TCP connection dropped.
104
+ self .recv_exc : BaseException | None = None
105
+
91
106
# Receiving events from the socket. This thread is marked as daemon to
92
107
# allow creating a connection in a non-daemon thread and using it in a
93
108
# daemon thread. This mustn't prevent the interpreter from exiting.
@@ -97,10 +112,6 @@ def __init__(
97
112
)
98
113
self .recv_events_thread .start ()
99
114
100
- # Exception raised in recv_events, to be chained to ConnectionClosed
101
- # in the user thread in order to show why the TCP connection dropped.
102
- self .recv_exc : BaseException | None = None
103
-
104
115
# Public attributes
105
116
106
117
@property
@@ -172,7 +183,7 @@ def __iter__(self) -> Iterator[Data]:
172
183
except ConnectionClosedOK :
173
184
return
174
185
175
- def recv (self , timeout : float | None = None ) -> Data :
186
+ def recv (self , timeout : float | None = None , decode : bool | None = None ) -> Data :
176
187
"""
177
188
Receive the next message.
178
189
@@ -191,21 +202,36 @@ def recv(self, timeout: float | None = None) -> Data:
191
202
If the message is fragmented, wait until all fragments are received,
192
203
reassemble them, and return the whole message.
193
204
205
+ Args:
206
+ timeout: Timeout for receiving a message in seconds.
207
+ decode: Set this flag to override the default behavior of returning
208
+ :class:`str` or :class:`bytes`. See below for details.
209
+
194
210
Returns:
195
211
A string (:class:`str`) for a Text_ frame or a bytestring
196
212
(:class:`bytes`) for a Binary_ frame.
197
213
198
214
.. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
199
215
.. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
200
216
217
+ You may override this behavior with the ``decode`` argument:
218
+
219
+ * Set ``decode=False`` to disable UTF-8 decoding of Text_ frames and
220
+ return a bytestring (:class:`bytes`). This improves performance
221
+ when decoding isn't needed, for example if the message contains
222
+ JSON and you're using a JSON library that expects a bytestring.
223
+ * Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
224
+ and return a string (:class:`str`). This may be useful for
225
+ servers that send binary frames instead of text frames.
226
+
201
227
Raises:
202
228
ConnectionClosed: When the connection is closed.
203
229
ConcurrencyError: If two threads call :meth:`recv` or
204
230
:meth:`recv_streaming` concurrently.
205
231
206
232
"""
207
233
try :
208
- return self .recv_messages .get (timeout )
234
+ return self .recv_messages .get (timeout , decode )
209
235
except EOFError :
210
236
# Wait for the protocol state to be CLOSED before accessing close_exc.
211
237
self .recv_events_thread .join ()
@@ -216,31 +242,47 @@ def recv(self, timeout: float | None = None) -> Data:
216
242
"is already running recv or recv_streaming"
217
243
) from None
218
244
219
- def recv_streaming (self ) -> Iterator [Data ]:
245
+ def recv_streaming (self , decode : bool | None = None ) -> Iterator [Data ]:
220
246
"""
221
247
Receive the next message frame by frame.
222
248
223
- If the message is fragmented, yield each fragment as it is received.
224
- The iterator must be fully consumed, or else the connection will become
249
+ This method is designed for receiving fragmented messages. It returns an
250
+ iterator that yields each fragment as it is received. This iterator must
251
+ be fully consumed. Else, future calls to :meth:`recv` or
252
+ :meth:`recv_streaming` will raise
253
+ :exc:`~websockets.exceptions.ConcurrencyError`, making the connection
225
254
unusable.
226
255
227
256
:meth:`recv_streaming` raises the same exceptions as :meth:`recv`.
228
257
258
+ Args:
259
+ decode: Set this flag to override the default behavior of returning
260
+ :class:`str` or :class:`bytes`. See below for details.
261
+
229
262
Returns:
230
263
An iterator of strings (:class:`str`) for a Text_ frame or
231
264
bytestrings (:class:`bytes`) for a Binary_ frame.
232
265
233
266
.. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
234
267
.. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
235
268
269
+ You may override this behavior with the ``decode`` argument:
270
+
271
+ * Set ``decode=False`` to disable UTF-8 decoding of Text_ frames
272
+ and return bytestrings (:class:`bytes`). This may be useful to
273
+ optimize performance when decoding isn't needed.
274
+ * Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
275
+ and return strings (:class:`str`). This is useful for servers
276
+ that send binary frames instead of text frames.
277
+
236
278
Raises:
237
279
ConnectionClosed: When the connection is closed.
238
280
ConcurrencyError: If two threads call :meth:`recv` or
239
281
:meth:`recv_streaming` concurrently.
240
282
241
283
"""
242
284
try :
243
- yield from self .recv_messages .get_iter ()
285
+ yield from self .recv_messages .get_iter (decode )
244
286
except EOFError :
245
287
# Wait for the protocol state to be CLOSED before accessing close_exc.
246
288
self .recv_events_thread .join ()
@@ -571,8 +613,9 @@ def recv_events(self) -> None:
571
613
try :
572
614
while True :
573
615
try :
574
- if self .close_deadline is not None :
575
- self .socket .settimeout (self .close_deadline .timeout ())
616
+ with self .recv_flow_control :
617
+ if self .close_deadline is not None :
618
+ self .socket .settimeout (self .close_deadline .timeout ())
576
619
data = self .socket .recv (self .recv_bufsize )
577
620
except Exception as exc :
578
621
if self .debug :
@@ -622,13 +665,9 @@ def recv_events(self) -> None:
622
665
# Given that automatic responses write small amounts of data,
623
666
# this should be uncommon, so we don't handle the edge case.
624
667
625
- try :
626
- for event in events :
627
- # This may raise EOFError if the closing handshake
628
- # times out while a message is waiting to be read.
629
- self .process_event (event )
630
- except EOFError :
631
- break
668
+ for event in events :
669
+ # This isn't expected to raise an exception.
670
+ self .process_event (event )
632
671
633
672
# Breaking out of the while True: ... loop means that we believe
634
673
# that the socket doesn't work anymore.
0 commit comments