Skip to content

Commit 9a2f39f

Browse files
committed
Support max_queue=None like the legacy implementation.
Fix #1540.
1 parent 3034834 commit 9a2f39f

13 files changed

+200
-42
lines changed

docs/project/changelog.rst

+7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ notice.
3434

3535
.. _14.0:
3636

37+
Improvements
38+
............
39+
40+
* Supported ``max_queue=None`` in the :mod:`asyncio` and :mod:`threading`
41+
implementations for consistency with the legacy implementation, even though
42+
this is never a good idea.
43+
3744
Bug fixes
3845
.........
3946

src/websockets/asyncio/client.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(
6060
ping_interval: float | None = 20,
6161
ping_timeout: float | None = 20,
6262
close_timeout: float | None = 10,
63-
max_queue: int | tuple[int, int | None] = 16,
63+
max_queue: int | None | tuple[int | None, int | None] = 16,
6464
write_limit: int | tuple[int, int | None] = 2**15,
6565
) -> None:
6666
self.protocol: ClientProtocol
@@ -222,7 +222,8 @@ class connect:
222222
max_queue: High-water mark of the buffer where frames are received.
223223
It defaults to 16 frames. The low-water mark defaults to ``max_queue
224224
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
225-
and low-water marks.
225+
and low-water marks. If you want to disable flow control entirely,
226+
you may set it to ``None``, although that's a bad idea.
226227
write_limit: High-water mark of write buffer in bytes. It is passed to
227228
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
228229
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
@@ -283,7 +284,7 @@ def __init__(
283284
close_timeout: float | None = 10,
284285
# Limits
285286
max_size: int | None = 2**20,
286-
max_queue: int | tuple[int, int | None] = 16,
287+
max_queue: int | None | tuple[int | None, int | None] = 16,
287288
write_limit: int | tuple[int, int | None] = 2**15,
288289
# Logging
289290
logger: LoggerLike | None = None,

src/websockets/asyncio/connection.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@ def __init__(
5656
ping_interval: float | None = 20,
5757
ping_timeout: float | None = 20,
5858
close_timeout: float | None = 10,
59-
max_queue: int | tuple[int, int | None] = 16,
59+
max_queue: int | None | tuple[int | None, int | None] = 16,
6060
write_limit: int | tuple[int, int | None] = 2**15,
6161
) -> None:
6262
self.protocol = protocol
6363
self.ping_interval = ping_interval
6464
self.ping_timeout = ping_timeout
6565
self.close_timeout = close_timeout
66-
if isinstance(max_queue, int):
66+
if isinstance(max_queue, int) or max_queue is None:
6767
max_queue = (max_queue, None)
6868
self.max_queue = max_queue
6969
if isinstance(write_limit, int):

src/websockets/asyncio/messages.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class Assembler:
8484
# coverage reports incorrectly: "line NN didn't jump to the function exit"
8585
def __init__( # pragma: no cover
8686
self,
87-
high: int = 16,
87+
high: int | None = None,
8888
low: int | None = None,
8989
pause: Callable[[], Any] = lambda: None,
9090
resume: Callable[[], Any] = lambda: None,
@@ -96,12 +96,15 @@ def __init__( # pragma: no cover
9696
# call to Protocol.data_received() could produce thousands of frames,
9797
# which must be buffered. Instead, we pause reading when the buffer goes
9898
# above the high limit and we resume when it goes under the low limit.
99-
if low is None:
99+
if high is not None and low is None:
100100
low = high // 4
101-
if low < 0:
102-
raise ValueError("low must be positive or equal to zero")
103-
if high < low:
104-
raise ValueError("high must be greater than or equal to low")
101+
if high is None and low is not None:
102+
high = low * 4
103+
if high is not None and low is not None:
104+
if low < 0:
105+
raise ValueError("low must be positive or equal to zero")
106+
if high < low:
107+
raise ValueError("high must be greater than or equal to low")
105108
self.high, self.low = high, low
106109
self.pause = pause
107110
self.resume = resume
@@ -256,13 +259,21 @@ def put(self, frame: Frame) -> None:
256259

257260
def maybe_pause(self) -> None:
258261
"""Pause the writer if queue is above the high water mark."""
262+
# Skip if flow control is disabled
263+
if self.high is None:
264+
return
265+
259266
# Check for "> high" to support high = 0
260267
if len(self.frames) > self.high and not self.paused:
261268
self.paused = True
262269
self.pause()
263270

264271
def maybe_resume(self) -> None:
265272
"""Resume the writer if queue is below the low water mark."""
273+
# Skip if flow control is disabled
274+
if self.low is None:
275+
return
276+
266277
# Check for "<= low" to support low = 0
267278
if len(self.frames) <= self.low and self.paused:
268279
self.paused = False

src/websockets/asyncio/server.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(
7171
ping_interval: float | None = 20,
7272
ping_timeout: float | None = 20,
7373
close_timeout: float | None = 10,
74-
max_queue: int | tuple[int, int | None] = 16,
74+
max_queue: int | None | tuple[int | None, int | None] = 16,
7575
write_limit: int | tuple[int, int | None] = 2**15,
7676
) -> None:
7777
self.protocol: ServerProtocol
@@ -643,7 +643,8 @@ def handler(websocket):
643643
max_queue: High-water mark of the buffer where frames are received.
644644
It defaults to 16 frames. The low-water mark defaults to ``max_queue
645645
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
646-
and low-water marks.
646+
and low-water marks. If you want to disable flow control entirely,
647+
you may set it to ``None``, although that's a bad idea.
647648
write_limit: High-water mark of write buffer in bytes. It is passed to
648649
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
649650
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
@@ -713,7 +714,7 @@ def __init__(
713714
close_timeout: float | None = 10,
714715
# Limits
715716
max_size: int | None = 2**20,
716-
max_queue: int | tuple[int, int | None] = 16,
717+
max_queue: int | None | tuple[int | None, int | None] = 16,
717718
write_limit: int | tuple[int, int | None] = 2**15,
718719
# Logging
719720
logger: LoggerLike | None = None,

src/websockets/sync/client.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(
5555
protocol: ClientProtocol,
5656
*,
5757
close_timeout: float | None = 10,
58-
max_queue: int | tuple[int, int | None] = 16,
58+
max_queue: int | None | tuple[int | None, int | None] = 16,
5959
) -> None:
6060
self.protocol: ClientProtocol
6161
self.response_rcvd = threading.Event()
@@ -139,7 +139,7 @@ def connect(
139139
close_timeout: float | None = 10,
140140
# Limits
141141
max_size: int | None = 2**20,
142-
max_queue: int | tuple[int, int | None] = 16,
142+
max_queue: int | None | tuple[int | None, int | None] = 16,
143143
# Logging
144144
logger: LoggerLike | None = None,
145145
# Escape hatch for advanced customization
@@ -191,7 +191,8 @@ def connect(
191191
max_queue: High-water mark of the buffer where frames are received.
192192
It defaults to 16 frames. The low-water mark defaults to ``max_queue
193193
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
194-
and low-water marks.
194+
and low-water marks. If you want to disable flow control entirely,
195+
you may set it to ``None``, although that's a bad idea.
195196
logger: Logger for this client.
196197
It defaults to ``logging.getLogger("websockets.client")``.
197198
See the :doc:`logging guide <../../topics/logging>` for details.

src/websockets/sync/connection.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ def __init__(
4949
protocol: Protocol,
5050
*,
5151
close_timeout: float | None = 10,
52-
max_queue: int | tuple[int, int | None] = 16,
52+
max_queue: int | None | tuple[int | None, int | None] = 16,
5353
) -> None:
5454
self.socket = socket
5555
self.protocol = protocol
5656
self.close_timeout = close_timeout
57-
if isinstance(max_queue, int):
57+
if isinstance(max_queue, int) or max_queue is None:
5858
max_queue = (max_queue, None)
5959
self.max_queue = max_queue
6060

src/websockets/sync/messages.py

+19-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class Assembler:
3333

3434
def __init__(
3535
self,
36-
high: int = 16,
36+
high: int | None = None,
3737
low: int | None = None,
3838
pause: Callable[[], Any] = lambda: None,
3939
resume: Callable[[], Any] = lambda: None,
@@ -49,12 +49,15 @@ def __init__(
4949
# call to Protocol.data_received() could produce thousands of frames,
5050
# which must be buffered. Instead, we pause reading when the buffer goes
5151
# above the high limit and we resume when it goes under the low limit.
52-
if low is None:
52+
if high is not None and low is None:
5353
low = high // 4
54-
if low < 0:
55-
raise ValueError("low must be positive or equal to zero")
56-
if high < low:
57-
raise ValueError("high must be greater than or equal to low")
54+
if high is None and low is not None:
55+
high = low * 4
56+
if high is not None and low is not None:
57+
if low < 0:
58+
raise ValueError("low must be positive or equal to zero")
59+
if high < low:
60+
raise ValueError("high must be greater than or equal to low")
5861
self.high, self.low = high, low
5962
self.pause = pause
6063
self.resume = resume
@@ -260,15 +263,25 @@ def put(self, frame: Frame) -> None:
260263

261264
def maybe_pause(self) -> None:
262265
"""Pause the writer if queue is above the high water mark."""
266+
# Skip if flow control is disabled
267+
if self.high is None:
268+
return
269+
263270
assert self.mutex.locked()
271+
264272
# Check for "> high" to support high = 0
265273
if self.frames.qsize() > self.high and not self.paused:
266274
self.paused = True
267275
self.pause()
268276

269277
def maybe_resume(self) -> None:
270278
"""Resume the writer if queue is below the low water mark."""
279+
# Skip if flow control is disabled
280+
if self.low is None:
281+
return
282+
271283
assert self.mutex.locked()
284+
272285
# Check for "<= low" to support low = 0
273286
if self.frames.qsize() <= self.low and self.paused:
274287
self.paused = False

src/websockets/sync/server.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def __init__(
6666
protocol: ServerProtocol,
6767
*,
6868
close_timeout: float | None = 10,
69-
max_queue: int | tuple[int, int | None] = 16,
69+
max_queue: int | None | tuple[int | None, int | None] = 16,
7070
) -> None:
7171
self.protocol: ServerProtocol
7272
self.request_rcvd = threading.Event()
@@ -356,7 +356,7 @@ def serve(
356356
close_timeout: float | None = 10,
357357
# Limits
358358
max_size: int | None = 2**20,
359-
max_queue: int | tuple[int, int | None] = 16,
359+
max_queue: int | None | tuple[int | None, int | None] = 16,
360360
# Logging
361361
logger: LoggerLike | None = None,
362362
# Escape hatch for advanced customization
@@ -438,7 +438,8 @@ def handler(websocket):
438438
max_queue: High-water mark of the buffer where frames are received.
439439
It defaults to 16 frames. The low-water mark defaults to ``max_queue
440440
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
441-
and low-water marks.
441+
and low-water marks. If you want to disable flow control entirely,
442+
you may set it to ``None``, although that's a bad idea.
442443
logger: Logger for this server.
443444
It defaults to ``logging.getLogger("websockets.server")``. See the
444445
:doc:`logging guide <../../topics/logging>` for details.

tests/asyncio/test_connection.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -1066,14 +1066,22 @@ async def test_close_timeout(self):
10661066
self.assertEqual(connection.close_timeout, 42 * MS)
10671067

10681068
async def test_max_queue(self):
1069-
"""max_queue parameter configures high-water mark of frames buffer."""
1069+
"""max_queue configures high-water mark of frames buffer."""
10701070
connection = Connection(Protocol(self.LOCAL), max_queue=4)
10711071
transport = Mock()
10721072
connection.connection_made(transport)
10731073
self.assertEqual(connection.recv_messages.high, 4)
10741074

1075+
async def test_max_queue_none(self):
1076+
"""max_queue disables high-water mark of frames buffer."""
1077+
connection = Connection(Protocol(self.LOCAL), max_queue=None)
1078+
transport = Mock()
1079+
connection.connection_made(transport)
1080+
self.assertEqual(connection.recv_messages.high, None)
1081+
self.assertEqual(connection.recv_messages.low, None)
1082+
10751083
async def test_max_queue_tuple(self):
1076-
"""max_queue parameter configures high-water mark of frames buffer."""
1084+
"""max_queue configures high-water and low-water marks of frames buffer."""
10771085
connection = Connection(
10781086
Protocol(self.LOCAL),
10791087
max_queue=(4, 2),

0 commit comments

Comments
 (0)