Skip to content

Commit 070ff1a

Browse files
committed
Add dedicated ConcurrencyError exception.
Previously, a generic RuntimeError was used. Fix #1499.
1 parent f9cea9c commit 070ff1a

13 files changed

+140
-87
lines changed

docs/project/changelog.rst

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ Improvements
6464

6565
* Improved reporting of errors during the opening handshake.
6666

67+
* Raised :exc:`~exceptions.ConcurrencyError` on unsupported concurrent calls.
68+
Previously, :exc:`RuntimeError` was raised. For backwards compatibility,
69+
:exc:`~exceptions.ConcurrencyError` is a subclass of :exc:`RuntimeError`.
70+
6771
13.0.1
6872
------
6973

docs/reference/exceptions.rst

+5
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ translated to :exc:`ConnectionClosedError` in the other implementations.
6464

6565
.. autoexception:: InvalidState
6666

67+
Miscellaneous exceptions
68+
------------------------
69+
70+
.. autoexception:: ConcurrencyError
71+
6772
Legacy exceptions
6873
-----------------
6974

src/websockets/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"HeadersLike",
1515
"MultipleValuesError",
1616
# .exceptions
17+
"ConcurrencyError",
1718
"ConnectionClosed",
1819
"ConnectionClosedError",
1920
"ConnectionClosedOK",
@@ -72,6 +73,7 @@
7273
from .client import ClientProtocol
7374
from .datastructures import Headers, HeadersLike, MultipleValuesError
7475
from .exceptions import (
76+
ConcurrencyError,
7577
ConnectionClosed,
7678
ConnectionClosedError,
7779
ConnectionClosedOK,
@@ -134,6 +136,7 @@
134136
"HeadersLike": ".datastructures",
135137
"MultipleValuesError": ".datastructures",
136138
# .exceptions
139+
"ConcurrencyError": ".exceptions",
137140
"ConnectionClosed": ".exceptions",
138141
"ConnectionClosedError": ".exceptions",
139142
"ConnectionClosedOK": ".exceptions",

src/websockets/asyncio/connection.py

+22-14
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
cast,
2020
)
2121

22-
from ..exceptions import ConnectionClosed, ConnectionClosedOK, ProtocolError
22+
from ..exceptions import (
23+
ConcurrencyError,
24+
ConnectionClosed,
25+
ConnectionClosedOK,
26+
ProtocolError,
27+
)
2328
from ..frames import DATA_OPCODES, BytesLike, CloseCode, Frame, Opcode
2429
from ..http11 import Request, Response
2530
from ..protocol import CLOSED, OPEN, Event, Protocol, State
@@ -262,16 +267,16 @@ async def recv(self, decode: bool | None = None) -> Data:
262267
263268
Raises:
264269
ConnectionClosed: When the connection is closed.
265-
RuntimeError: If two coroutines call :meth:`recv` or
270+
ConcurrencyError: If two coroutines call :meth:`recv` or
266271
:meth:`recv_streaming` concurrently.
267272
268273
"""
269274
try:
270275
return await self.recv_messages.get(decode)
271276
except EOFError:
272277
raise self.protocol.close_exc from self.recv_exc
273-
except RuntimeError:
274-
raise RuntimeError(
278+
except ConcurrencyError:
279+
raise ConcurrencyError(
275280
"cannot call recv while another coroutine "
276281
"is already running recv or recv_streaming"
277282
) from None
@@ -283,8 +288,9 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
283288
This method is designed for receiving fragmented messages. It returns an
284289
asynchronous iterator that yields each fragment as it is received. This
285290
iterator must be fully consumed. Else, future calls to :meth:`recv` or
286-
:meth:`recv_streaming` will raise :exc:`RuntimeError`, making the
287-
connection unusable.
291+
:meth:`recv_streaming` will raise
292+
:exc:`~websockets.exceptions.ConcurrencyError`, making the connection
293+
unusable.
288294
289295
:meth:`recv_streaming` raises the same exceptions as :meth:`recv`.
290296
@@ -315,7 +321,7 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
315321
316322
Raises:
317323
ConnectionClosed: When the connection is closed.
318-
RuntimeError: If two coroutines call :meth:`recv` or
324+
ConcurrencyError: If two coroutines call :meth:`recv` or
319325
:meth:`recv_streaming` concurrently.
320326
321327
"""
@@ -324,8 +330,8 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
324330
yield frame
325331
except EOFError:
326332
raise self.protocol.close_exc from self.recv_exc
327-
except RuntimeError:
328-
raise RuntimeError(
333+
except ConcurrencyError:
334+
raise ConcurrencyError(
329335
"cannot call recv_streaming while another coroutine "
330336
"is already running recv or recv_streaming"
331337
) from None
@@ -593,7 +599,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
593599
594600
Raises:
595601
ConnectionClosed: When the connection is closed.
596-
RuntimeError: If another ping was sent with the same data and
602+
ConcurrencyError: If another ping was sent with the same data and
597603
the corresponding pong wasn't received yet.
598604
599605
"""
@@ -607,7 +613,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
607613
async with self.send_context():
608614
# Protect against duplicates if a payload is explicitly set.
609615
if data in self.pong_waiters:
610-
raise RuntimeError("already waiting for a pong with the same data")
616+
raise ConcurrencyError("already waiting for a pong with the same data")
611617

612618
# Generate a unique random payload otherwise.
613619
while data is None or data in self.pong_waiters:
@@ -793,7 +799,7 @@ async def send_context(
793799
# Let the caller interact with the protocol.
794800
try:
795801
yield
796-
except (ProtocolError, RuntimeError):
802+
except (ProtocolError, ConcurrencyError):
797803
# The protocol state wasn't changed. Exit immediately.
798804
raise
799805
except Exception as exc:
@@ -1092,15 +1098,17 @@ def broadcast(
10921098
if raise_exceptions:
10931099
if sys.version_info[:2] < (3, 11): # pragma: no cover
10941100
raise ValueError("raise_exceptions requires at least Python 3.11")
1095-
exceptions = []
1101+
exceptions: list[Exception] = []
10961102

10971103
for connection in connections:
1104+
exception: Exception
1105+
10981106
if connection.protocol.state is not OPEN:
10991107
continue
11001108

11011109
if connection.fragmented_send_waiter is not None:
11021110
if raise_exceptions:
1103-
exception = RuntimeError("sending a fragmented message")
1111+
exception = ConcurrencyError("sending a fragmented message")
11041112
exceptions.append(exception)
11051113
else:
11061114
connection.logger.warning(

src/websockets/asyncio/messages.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
TypeVar,
1313
)
1414

15+
from ..exceptions import ConcurrencyError
1516
from ..frames import OP_BINARY, OP_CONT, OP_TEXT, Frame
1617
from ..typing import Data
1718

@@ -49,7 +50,7 @@ async def get(self) -> T:
4950
"""Remove and return an item from the queue, waiting if necessary."""
5051
if not self.queue:
5152
if self.get_waiter is not None:
52-
raise RuntimeError("get is already running")
53+
raise ConcurrencyError("get is already running")
5354
self.get_waiter = self.loop.create_future()
5455
try:
5556
await self.get_waiter
@@ -135,15 +136,15 @@ async def get(self, decode: bool | None = None) -> Data:
135136
136137
Raises:
137138
EOFError: If the stream of frames has ended.
138-
RuntimeError: If two coroutines run :meth:`get` or :meth:`get_iter`
139-
concurrently.
139+
ConcurrencyError: If two coroutines run :meth:`get` or
140+
:meth:`get_iter` concurrently.
140141
141142
"""
142143
if self.closed:
143144
raise EOFError("stream of frames ended")
144145

145146
if self.get_in_progress:
146-
raise RuntimeError("get() or get_iter() is already running")
147+
raise ConcurrencyError("get() or get_iter() is already running")
147148

148149
# Locking with get_in_progress ensures only one coroutine can get here.
149150
self.get_in_progress = True
@@ -190,7 +191,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
190191
:class:`str` or :class:`bytes` for each frame in the message.
191192
192193
The iterator must be fully consumed before calling :meth:`get_iter` or
193-
:meth:`get` again. Else, :exc:`RuntimeError` is raised.
194+
:meth:`get` again. Else, :exc:`ConcurrencyError` is raised.
194195
195196
This method only makes sense for fragmented messages. If messages aren't
196197
fragmented, use :meth:`get` instead.
@@ -202,15 +203,15 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
202203
203204
Raises:
204205
EOFError: If the stream of frames has ended.
205-
RuntimeError: If two coroutines run :meth:`get` or :meth:`get_iter`
206-
concurrently.
206+
ConcurrencyError: If two coroutines run :meth:`get` or
207+
:meth:`get_iter` concurrently.
207208
208209
"""
209210
if self.closed:
210211
raise EOFError("stream of frames ended")
211212

212213
if self.get_in_progress:
213-
raise RuntimeError("get() or get_iter() is already running")
214+
raise ConcurrencyError("get() or get_iter() is already running")
214215

215216
# Locking with get_in_progress ensures only one coroutine can get here.
216217
self.get_in_progress = True
@@ -236,7 +237,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
236237
# We cannot handle asyncio.CancelledError because we don't buffer
237238
# previous fragments — we're streaming them. Canceling get_iter()
238239
# here will leave the assembler in a stuck state. Future calls to
239-
# get() or get_iter() will raise RuntimeError.
240+
# get() or get_iter() will raise ConcurrencyError.
240241
frame = await self.frames.get()
241242
self.maybe_resume()
242243
assert frame.opcode is OP_CONT

src/websockets/exceptions.py

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* :exc:`ProtocolError` (Sans-I/O)
2626
* :exc:`PayloadTooBig` (Sans-I/O)
2727
* :exc:`InvalidState` (Sans-I/O)
28+
* :exc:`ConcurrencyError`
2829
2930
"""
3031

@@ -62,6 +63,7 @@
6263
"WebSocketProtocolError",
6364
"PayloadTooBig",
6465
"InvalidState",
66+
"ConcurrencyError",
6567
]
6668

6769

@@ -354,6 +356,16 @@ class InvalidState(WebSocketException, AssertionError):
354356
"""
355357

356358

359+
class ConcurrencyError(WebSocketException, RuntimeError):
360+
"""
361+
Raised when receiving or sending messages concurrently.
362+
363+
WebSocket is a connection-oriented protocol. Reads must be serialized; so
364+
must be writes. However, reading and writing concurrently is possible.
365+
366+
"""
367+
368+
357369
# When type checking, import non-deprecated aliases eagerly. Else, import on demand.
358370
if typing.TYPE_CHECKING:
359371
from .legacy.exceptions import (

0 commit comments

Comments
 (0)