8
8
from types import TracebackType
9
9
from typing import (
10
10
Any ,
11
+ AsyncIterable ,
12
+ AsyncIterator ,
11
13
Dict ,
12
14
Iterable ,
13
15
Iterator ,
14
16
Mapping ,
15
17
Optional ,
18
+ Tuple ,
16
19
Type ,
17
- Union ,
18
20
)
19
21
20
22
from ..asyncio .deadline import Deadline
21
23
from ..exceptions import ConnectionClosed , ConnectionClosedOK
22
- from ..frames import DATA_OPCODES , BytesLike , Frame , Opcode , prepare_ctrl
24
+ from ..frames import DATA_OPCODES , BytesLike , CloseCode , Frame , Opcode , prepare_ctrl
23
25
from ..http11 import Request , Response
24
26
from ..protocol import Event , Protocol
25
- from ..typing import AsyncIterator , Data , LoggerLike , Subprotocol
27
+ from ..typing import Data , LoggerLike , Subprotocol
26
28
from .messages import Assembler
27
29
28
30
29
31
__all__ = ["Connection" ]
30
32
31
- logger = logging .getLogger (__name__ )
32
-
33
33
34
34
class Connection (asyncio .Protocol ):
35
35
"""
@@ -75,14 +75,14 @@ def __init__(
75
75
# Assembler turning frames into messages and serializing reads.
76
76
self .recv_messages = Assembler ()
77
77
78
- # Whether we are busy sending a fragmented message.
79
- self .send_in_progress = False
80
-
81
78
# Deadline for the closing handshake.
82
79
self .close_deadline : Optional [Deadline ] = None
83
80
81
+ # Protect sending fragmented messages.
82
+ self .fragmented_send_waiter : Optional [asyncio .Future [None ]] = None
83
+
84
84
# Mapping of ping IDs to pong waiters, in chronological order.
85
- self .pings : Dict [bytes , asyncio .Future ] = {}
85
+ self .pings : Dict [bytes , Tuple [ asyncio .Future [ float ], float ] ] = {}
86
86
87
87
# Exception raised in recv_events, to be chained to ConnectionClosed
88
88
# in the user thread in order to show why the TCP connection dropped.
@@ -137,13 +137,17 @@ async def __aexit__(
137
137
exc_value : Optional [BaseException ],
138
138
traceback : Optional [TracebackType ],
139
139
) -> None :
140
- await self .close (1000 if exc_type is None else 1011 )
140
+ if exc_type is None :
141
+ await self .close ()
142
+ else :
143
+ await self .close (CloseCode .INTERNAL_ERROR )
141
144
142
145
async def __aiter__ (self ) -> Iterator [Data ]:
143
146
"""
144
147
Iterate on incoming messages.
145
148
146
- The iterator calls :meth:`recv` and yields messages in an infinite loop.
149
+ The iterator calls :meth:`recv` and yields messages asynchronously in an
150
+ infinite loop.
147
151
148
152
It exits when the connection is closed normally. It raises a
149
153
:exc:`~websockets.exceptions.ConnectionClosedError` exception after a
@@ -203,6 +207,8 @@ async def recv_streaming(self) -> AsyncIterator[Data]:
203
207
"""
204
208
Receive the next message frame by frame.
205
209
210
+ Canceling :meth:`send` is discouraged. TODO is it recoverable?
211
+
206
212
If the message is fragmented, yield each fragment as it is received.
207
213
The iterator must be fully consumed, or else the connection will become
208
214
unusable.
@@ -233,7 +239,7 @@ async def recv_streaming(self) -> AsyncIterator[Data]:
233
239
"is already running recv or recv_streaming"
234
240
) from None
235
241
236
- async def send (self , message : Union [ Data , Iterable [Data ]]) -> None :
242
+ async def send (self , message : Data | Iterable [Data ] | AsyncIterable [ Data ]) -> None :
237
243
"""
238
244
Send a message.
239
245
@@ -244,18 +250,31 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None:
244
250
.. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
245
251
.. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
246
252
247
- :meth:`send` also accepts an iterable of strings, bytestrings, or
248
- bytes-like objects to enable fragmentation_. Each item is treated as a
249
- message fragment and sent in its own frame. All items must be of the
250
- same type, or else :meth:`send` will raise a :exc:`TypeError` and the
251
- connection will be closed.
253
+ :meth:`send` also accepts an iterable or an asynchronous iterable of
254
+ strings, bytestrings, or bytes-like objects to enable fragmentation_.
255
+ Each item is treated as a message fragment and sent in its own frame.
256
+ All items must be of the same type, or else :meth:`send` will raise a
257
+ :exc:`TypeError` and the connection will be closed.
252
258
253
259
.. _fragmentation: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.4
254
260
255
261
:meth:`send` rejects dict-like objects because this is often an error.
256
262
(If you really want to send the keys of a dict-like object as fragments,
257
263
call its :meth:`~dict.keys` method and pass the result to :meth:`send`.)
258
264
265
+ Canceling :meth:`send` is discouraged. Instead, you should close the
266
+ connection with :meth:`close`. Indeed, there are only two situations
267
+ where :meth:`send` may yield control to the event loop and then get
268
+ canceled; in both cases, :meth:`close` has the same effect and is
269
+ more clear:
270
+
271
+ 1. The write buffer is full. If you don't want to wait until enough
272
+ data is sent, your only alternative is to close the connection.
273
+ :meth:`close` will likely time out then abort the TCP connection.
274
+ 2. ``message`` is an asynchronous iterator that yields control.
275
+ Stopping in the middle of a fragmented message will cause a
276
+ protocol error and the connection will be closed.
277
+
259
278
When the connection is closed, :meth:`send` raises
260
279
:exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it
261
280
raises :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal
@@ -272,6 +291,11 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None:
272
291
TypeError: If ``message`` doesn't have a supported type.
273
292
274
293
"""
294
+ # While sending a fragmented message, prevent sending other messages
295
+ # until all fragments are sent.
296
+ while self .fragmented_send_waiter is not None :
297
+ await asyncio .shield (self .fragmented_send_waiter )
298
+
275
299
# Unfragmented message -- this case must be handled first because
276
300
# strings and bytes-like objects are iterable.
277
301
0 commit comments