Skip to content

Commit fa78d82

Browse files
committed
Rewrite sync Assembler to improve performance.
Previously, a latch was used to synchronize the user thread reading messages and the background thread reading from the network. This required two thread switches per message. Now, the background thread writes messages to queue, from which the user thread reads. This allows passing several frames at each thread switch, reducing the overhead. With this server code:: async def test(websocket): for i in range(int(await websocket.recv())): await websocket.send(f"{{\"iteration\": {i}}}") and this client code:: with connect("ws://localhost:8765", compression=None) as websocket: websocket.send("1_000_000") for message in websocket: pass an unscientific benchmark (running it on my laptop) shows a 2.5x speedup, going from 11 seconds to 4.4 seconds. Setting a very large recv_bufsize and max_size doesn't yield significant further improvement. The new implementation mirrors the asyncio implementation and gains the option to prevent or force decoding of frames. Refs #1376.
1 parent e5182c9 commit fa78d82

12 files changed

+586
-467
lines changed

docs/project/changelog.rst

+15-1
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,21 @@ Backwards-incompatible changes
7070
If you wrote an :class:`extension <extensions.Extension>` that relies on
7171
methods not provided by these new types, you may need to update your code.
7272

73+
New features
74+
............
75+
76+
* Added an option to receive text frames as :class:`bytes`, without decoding,
77+
in the :mod:`threading` implementation; also binary frames as :class:`str`.
78+
79+
* Added an option to send :class:`bytes` as a text frame in the :mod:`asyncio`
80+
and :mod:`threading` implementations, as well as :class:`str` a binary frame.
81+
7382
Improvements
7483
............
7584

76-
* Sending or receiving large compressed frames is now faster.
85+
* Sending or receiving large compressed messages is now faster.
86+
87+
* The :mod:`threading` implementation receives messages faster.
7788

7889
.. _13.1:
7990

@@ -198,6 +209,9 @@ New features
198209

199210
* Validated compatibility with Python 3.12 and 3.13.
200211

212+
* Added an option to receive text frames as :class:`bytes`, without decoding,
213+
in the :mod:`asyncio` implementation; also binary frames as :class:`str`.
214+
201215
* Added :doc:`environment variables <../reference/variables>` to configure debug
202216
logs, the ``Server`` and ``User-Agent`` headers, as well as security limits.
203217

src/websockets/asyncio/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ClientConnection(Connection):
4545
closed with any other code.
4646
4747
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, ``max_queue``,
48-
and ``write_limit`` arguments the same meaning as in :func:`connect`.
48+
and ``write_limit`` arguments have the same meaning as in :func:`connect`.
4949
5050
Args:
5151
protocol: Sans-I/O connection.

src/websockets/asyncio/messages.py

+33-27
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def reset(self, items: Iterable[T]) -> None:
6060
self.queue.extend(items)
6161

6262
def abort(self) -> None:
63+
"""Close the queue, raising EOFError in get() if necessary."""
6364
if self.get_waiter is not None and not self.get_waiter.done():
6465
self.get_waiter.set_exception(EOFError("stream of frames ended"))
6566
# Clear the queue to avoid storing unnecessary data in memory.
@@ -89,7 +90,7 @@ def __init__( # pragma: no cover
8990
pause: Callable[[], Any] = lambda: None,
9091
resume: Callable[[], Any] = lambda: None,
9192
) -> None:
92-
# Queue of incoming messages. Each item is a queue of frames.
93+
# Queue of incoming frames.
9394
self.frames: SimpleQueue[Frame] = SimpleQueue()
9495

9596
# We cannot put a hard limit on the size of the queue because a single
@@ -140,36 +141,35 @@ async def get(self, decode: bool | None = None) -> Data:
140141
if self.get_in_progress:
141142
raise ConcurrencyError("get() or get_iter() is already running")
142143

143-
# Locking with get_in_progress ensures only one coroutine can get here.
144144
self.get_in_progress = True
145145

146-
# First frame
146+
# Locking with get_in_progress prevents concurrent execution until
147+
# get() fetches a complete message or is cancelled.
148+
147149
try:
150+
# First frame
148151
frame = await self.frames.get()
149-
except asyncio.CancelledError:
150-
self.get_in_progress = False
151-
raise
152-
self.maybe_resume()
153-
assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY
154-
if decode is None:
155-
decode = frame.opcode is OP_TEXT
156-
frames = [frame]
157-
158-
# Following frames, for fragmented messages
159-
while not frame.fin:
160-
try:
161-
frame = await self.frames.get()
162-
except asyncio.CancelledError:
163-
# Put frames already received back into the queue
164-
# so that future calls to get() can return them.
165-
self.frames.reset(frames)
166-
self.get_in_progress = False
167-
raise
168152
self.maybe_resume()
169-
assert frame.opcode is OP_CONT
170-
frames.append(frame)
171-
172-
self.get_in_progress = False
153+
assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY
154+
if decode is None:
155+
decode = frame.opcode is OP_TEXT
156+
frames = [frame]
157+
158+
# Following frames, for fragmented messages
159+
while not frame.fin:
160+
try:
161+
frame = await self.frames.get()
162+
except asyncio.CancelledError:
163+
# Put frames already received back into the queue
164+
# so that future calls to get() can return them.
165+
self.frames.reset(frames)
166+
raise
167+
self.maybe_resume()
168+
assert frame.opcode is OP_CONT
169+
frames.append(frame)
170+
171+
finally:
172+
self.get_in_progress = False
173173

174174
data = b"".join(frame.data for frame in frames)
175175
if decode:
@@ -207,9 +207,14 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
207207
if self.get_in_progress:
208208
raise ConcurrencyError("get() or get_iter() is already running")
209209

210-
# Locking with get_in_progress ensures only one coroutine can get here.
211210
self.get_in_progress = True
212211

212+
# Locking with get_in_progress prevents concurrent execution until
213+
# get_iter() fetches a complete message or is cancelled.
214+
215+
# If get_iter() raises an exception e.g. in decoder.decode(),
216+
# get_in_progress remains set and the connection becomes unusable.
217+
213218
# First frame
214219
try:
215220
frame = await self.frames.get()
@@ -233,6 +238,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
233238
# here will leave the assembler in a stuck state. Future calls to
234239
# get() or get_iter() will raise ConcurrencyError.
235240
frame = await self.frames.get()
241+
236242
self.maybe_resume()
237243
assert frame.opcode is OP_CONT
238244
if decode:

src/websockets/asyncio/server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ServerConnection(Connection):
5555
closed with any other code.
5656
5757
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, ``max_queue``,
58-
and ``write_limit`` arguments the same meaning as in :func:`serve`.
58+
and ``write_limit`` arguments have the same meaning as in :func:`serve`.
5959
6060
Args:
6161
protocol: Sans-I/O connection.

src/websockets/sync/client.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ class ClientConnection(Connection):
4040
:exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
4141
closed with any other code.
4242
43+
The ``close_timeout`` and ``max_queue`` arguments have the same meaning as
44+
in :func:`connect`.
45+
4346
Args:
4447
socket: Socket connected to a WebSocket server.
4548
protocol: Sans-I/O connection.
46-
close_timeout: Timeout for closing the connection in seconds.
4749
4850
"""
4951

@@ -53,13 +55,15 @@ def __init__(
5355
protocol: ClientProtocol,
5456
*,
5557
close_timeout: float | None = 10,
58+
max_queue: int | tuple[int, int | None] = 16,
5659
) -> None:
5760
self.protocol: ClientProtocol
5861
self.response_rcvd = threading.Event()
5962
super().__init__(
6063
socket,
6164
protocol,
6265
close_timeout=close_timeout,
66+
max_queue=max_queue,
6367
)
6468

6569
def handshake(
@@ -135,6 +139,7 @@ def connect(
135139
close_timeout: float | None = 10,
136140
# Limits
137141
max_size: int | None = 2**20,
142+
max_queue: int | tuple[int, int | None] = 16,
138143
# Logging
139144
logger: LoggerLike | None = None,
140145
# Escape hatch for advanced customization
@@ -183,6 +188,10 @@ def connect(
183188
:obj:`None` disables the timeout.
184189
max_size: Maximum size of incoming messages in bytes.
185190
:obj:`None` disables the limit.
191+
max_queue: High-water mark of the buffer where frames are received.
192+
It defaults to 16 frames. The low-water mark defaults to ``max_queue
193+
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
194+
and low-water marks.
186195
logger: Logger for this client.
187196
It defaults to ``logging.getLogger("websockets.client")``.
188197
See the :doc:`logging guide <../../topics/logging>` for details.
@@ -287,6 +296,7 @@ def connect(
287296
sock,
288297
protocol,
289298
close_timeout=close_timeout,
299+
max_queue=max_queue,
290300
)
291301
except Exception:
292302
if sock is not None:

src/websockets/sync/connection.py

+59-20
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ def __init__(
4949
protocol: Protocol,
5050
*,
5151
close_timeout: float | None = 10,
52+
max_queue: int | tuple[int, int | None] = 16,
5253
) -> None:
5354
self.socket = socket
5455
self.protocol = protocol
5556
self.close_timeout = close_timeout
57+
if isinstance(max_queue, int):
58+
max_queue = (max_queue, None)
59+
self.max_queue = max_queue
5660

5761
# Inject reference to this instance in the protocol's logger.
5862
self.protocol.logger = logging.LoggerAdapter(
@@ -76,8 +80,15 @@ def __init__(
7680
# Mutex serializing interactions with the protocol.
7781
self.protocol_mutex = threading.Lock()
7882

83+
# Lock stopping reads when the assembler buffer is full.
84+
self.recv_flow_control = threading.Lock()
85+
7986
# Assembler turning frames into messages and serializing reads.
80-
self.recv_messages = Assembler()
87+
self.recv_messages = Assembler(
88+
*self.max_queue,
89+
pause=self.recv_flow_control.acquire,
90+
resume=self.recv_flow_control.release,
91+
)
8192

8293
# Whether we are busy sending a fragmented message.
8394
self.send_in_progress = False
@@ -88,6 +99,10 @@ def __init__(
8899
# Mapping of ping IDs to pong waiters, in chronological order.
89100
self.ping_waiters: dict[bytes, threading.Event] = {}
90101

102+
# Exception raised in recv_events, to be chained to ConnectionClosed
103+
# in the user thread in order to show why the TCP connection dropped.
104+
self.recv_exc: BaseException | None = None
105+
91106
# Receiving events from the socket. This thread is marked as daemon to
92107
# allow creating a connection in a non-daemon thread and using it in a
93108
# daemon thread. This mustn't prevent the interpreter from exiting.
@@ -97,10 +112,6 @@ def __init__(
97112
)
98113
self.recv_events_thread.start()
99114

100-
# Exception raised in recv_events, to be chained to ConnectionClosed
101-
# in the user thread in order to show why the TCP connection dropped.
102-
self.recv_exc: BaseException | None = None
103-
104115
# Public attributes
105116

106117
@property
@@ -172,7 +183,7 @@ def __iter__(self) -> Iterator[Data]:
172183
except ConnectionClosedOK:
173184
return
174185

175-
def recv(self, timeout: float | None = None) -> Data:
186+
def recv(self, timeout: float | None = None, decode: bool | None = None) -> Data:
176187
"""
177188
Receive the next message.
178189
@@ -191,21 +202,36 @@ def recv(self, timeout: float | None = None) -> Data:
191202
If the message is fragmented, wait until all fragments are received,
192203
reassemble them, and return the whole message.
193204
205+
Args:
206+
timeout: Timeout for receiving a message in seconds.
207+
decode: Set this flag to override the default behavior of returning
208+
:class:`str` or :class:`bytes`. See below for details.
209+
194210
Returns:
195211
A string (:class:`str`) for a Text_ frame or a bytestring
196212
(:class:`bytes`) for a Binary_ frame.
197213
198214
.. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
199215
.. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
200216
217+
You may override this behavior with the ``decode`` argument:
218+
219+
* Set ``decode=False`` to disable UTF-8 decoding of Text_ frames and
220+
return a bytestring (:class:`bytes`). This improves performance
221+
when decoding isn't needed, for example if the message contains
222+
JSON and you're using a JSON library that expects a bytestring.
223+
* Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
224+
and return a string (:class:`str`). This may be useful for
225+
servers that send binary frames instead of text frames.
226+
201227
Raises:
202228
ConnectionClosed: When the connection is closed.
203229
ConcurrencyError: If two threads call :meth:`recv` or
204230
:meth:`recv_streaming` concurrently.
205231
206232
"""
207233
try:
208-
return self.recv_messages.get(timeout)
234+
return self.recv_messages.get(timeout, decode)
209235
except EOFError:
210236
# Wait for the protocol state to be CLOSED before accessing close_exc.
211237
self.recv_events_thread.join()
@@ -216,31 +242,47 @@ def recv(self, timeout: float | None = None) -> Data:
216242
"is already running recv or recv_streaming"
217243
) from None
218244

219-
def recv_streaming(self) -> Iterator[Data]:
245+
def recv_streaming(self, decode: bool | None = None) -> Iterator[Data]:
220246
"""
221247
Receive the next message frame by frame.
222248
223-
If the message is fragmented, yield each fragment as it is received.
224-
The iterator must be fully consumed, or else the connection will become
249+
This method is designed for receiving fragmented messages. It returns an
250+
iterator that yields each fragment as it is received. This iterator must
251+
be fully consumed. Else, future calls to :meth:`recv` or
252+
:meth:`recv_streaming` will raise
253+
:exc:`~websockets.exceptions.ConcurrencyError`, making the connection
225254
unusable.
226255
227256
:meth:`recv_streaming` raises the same exceptions as :meth:`recv`.
228257
258+
Args:
259+
decode: Set this flag to override the default behavior of returning
260+
:class:`str` or :class:`bytes`. See below for details.
261+
229262
Returns:
230263
An iterator of strings (:class:`str`) for a Text_ frame or
231264
bytestrings (:class:`bytes`) for a Binary_ frame.
232265
233266
.. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
234267
.. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
235268
269+
You may override this behavior with the ``decode`` argument:
270+
271+
* Set ``decode=False`` to disable UTF-8 decoding of Text_ frames
272+
and return bytestrings (:class:`bytes`). This may be useful to
273+
optimize performance when decoding isn't needed.
274+
* Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
275+
and return strings (:class:`str`). This is useful for servers
276+
that send binary frames instead of text frames.
277+
236278
Raises:
237279
ConnectionClosed: When the connection is closed.
238280
ConcurrencyError: If two threads call :meth:`recv` or
239281
:meth:`recv_streaming` concurrently.
240282
241283
"""
242284
try:
243-
yield from self.recv_messages.get_iter()
285+
yield from self.recv_messages.get_iter(decode)
244286
except EOFError:
245287
# Wait for the protocol state to be CLOSED before accessing close_exc.
246288
self.recv_events_thread.join()
@@ -571,8 +613,9 @@ def recv_events(self) -> None:
571613
try:
572614
while True:
573615
try:
574-
if self.close_deadline is not None:
575-
self.socket.settimeout(self.close_deadline.timeout())
616+
with self.recv_flow_control:
617+
if self.close_deadline is not None:
618+
self.socket.settimeout(self.close_deadline.timeout())
576619
data = self.socket.recv(self.recv_bufsize)
577620
except Exception as exc:
578621
if self.debug:
@@ -622,13 +665,9 @@ def recv_events(self) -> None:
622665
# Given that automatic responses write small amounts of data,
623666
# this should be uncommon, so we don't handle the edge case.
624667

625-
try:
626-
for event in events:
627-
# This may raise EOFError if the closing handshake
628-
# times out while a message is waiting to be read.
629-
self.process_event(event)
630-
except EOFError:
631-
break
668+
for event in events:
669+
# This isn't expected to raise an exception.
670+
self.process_event(event)
632671

633672
# Breaking out of the while True: ... loop means that we believe
634673
# that the socket doesn't work anymore.

0 commit comments

Comments
 (0)