Skip to content

Commit 5cd8df1

Browse files
committed
Add keepalive to the threading implementation.
1 parent 9f345ee commit 5cd8df1

File tree

12 files changed

+257
-28
lines changed

12 files changed

+257
-28
lines changed

docs/project/changelog.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ notice.
3535
New features
3636
............
3737

38-
* Added latency measurement to the :mod:`threading` implementation.
38+
* Added :doc:`keepalive and latency measurement <../topics/keepalive>` to the
39+
:mod:`threading` implementation.
3940

4041
.. _14.2:
4142

docs/reference/features.rst

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ Both sides
6161
+------------------------------------+--------+--------+--------+--------+
6262
| Send a pong |||||
6363
+------------------------------------+--------+--------+--------+--------+
64-
| Keepalive || |||
64+
| Keepalive || |||
6565
+------------------------------------+--------+--------+--------+--------+
66-
| Heartbeat || |||
66+
| Heartbeat || |||
6767
+------------------------------------+--------+--------+--------+--------+
6868
| Measure latency |||||
6969
+------------------------------------+--------+--------+--------+--------+

docs/topics/keepalive.rst

-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
Keepalive and latency
22
=====================
33

4-
.. admonition:: This guide applies only to the :mod:`asyncio` implementation.
5-
:class: tip
6-
7-
The :mod:`threading` implementation doesn't provide keepalive yet.
8-
94
.. currentmodule:: websockets
105

116
Long-lived connections

src/websockets/asyncio/connection.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -686,8 +686,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
686686
pong_waiter = self.loop.create_future()
687687
# The event loop's default clock is time.monotonic(). Its resolution
688688
# is a bit low on Windows (~16ms). This is improved in Python 3.13.
689-
ping_timestamp = self.loop.time()
690-
self.pong_waiters[data] = (pong_waiter, ping_timestamp)
689+
self.pong_waiters[data] = (pong_waiter, self.loop.time())
691690
self.protocol.send_ping(data)
692691
return pong_waiter
693692

@@ -792,13 +791,18 @@ async def keepalive(self) -> None:
792791
latency = 0.0
793792
try:
794793
while True:
795-
# If self.ping_timeout > latency > self.ping_interval, pings
796-
# will be sent immediately after receiving pongs. The period
797-
# will be longer than self.ping_interval.
794+
# If self.ping_timeout > latency > self.ping_interval,
795+
# pings will be sent immediately after receiving pongs.
796+
# The period will be longer than self.ping_interval.
798797
await asyncio.sleep(self.ping_interval - latency)
799798

800-
self.logger.debug("% sending keepalive ping")
801-
pong_waiter = await self.ping()
799+
try:
800+
pong_waiter = await self.ping()
801+
except ConnectionClosed:
802+
# TODO: prove that this can happen (I think it can)
803+
break
804+
if self.debug:
805+
self.logger.debug("% sent keepalive ping")
802806

803807
if self.ping_timeout is not None:
804808
try:

src/websockets/sync/client.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ class ClientConnection(Connection):
4040
:exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
4141
closed with any other code.
4242
43-
The ``close_timeout`` and ``max_queue`` arguments have the same meaning as
44-
in :func:`connect`.
43+
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, and
44+
``max_queue`` arguments have the same meaning as in :func:`connect`.
4545
4646
Args:
4747
socket: Socket connected to a WebSocket server.
@@ -54,6 +54,8 @@ def __init__(
5454
socket: socket.socket,
5555
protocol: ClientProtocol,
5656
*,
57+
ping_interval: float | None = 20,
58+
ping_timeout: float | None = 20,
5759
close_timeout: float | None = 10,
5860
max_queue: int | None | tuple[int | None, int | None] = 16,
5961
) -> None:
@@ -62,6 +64,8 @@ def __init__(
6264
super().__init__(
6365
socket,
6466
protocol,
67+
ping_interval=ping_interval,
68+
ping_timeout=ping_timeout,
6569
close_timeout=close_timeout,
6670
max_queue=max_queue,
6771
)
@@ -136,6 +140,8 @@ def connect(
136140
compression: str | None = "deflate",
137141
# Timeouts
138142
open_timeout: float | None = 10,
143+
ping_interval: float | None = 20,
144+
ping_timeout: float | None = 20,
139145
close_timeout: float | None = 10,
140146
# Limits
141147
max_size: int | None = 2**20,
@@ -184,6 +190,10 @@ def connect(
184190
:doc:`compression guide <../../topics/compression>` for details.
185191
open_timeout: Timeout for opening the connection in seconds.
186192
:obj:`None` disables the timeout.
193+
ping_interval: Interval between keepalive pings in seconds.
194+
:obj:`None` disables keepalive.
195+
ping_timeout: Timeout for keepalive pings in seconds.
196+
:obj:`None` disables timeouts.
187197
close_timeout: Timeout for closing the connection in seconds.
188198
:obj:`None` disables the timeout.
189199
max_size: Maximum size of incoming messages in bytes.
@@ -296,6 +306,8 @@ def connect(
296306
connection = create_connection(
297307
sock,
298308
protocol,
309+
ping_interval=ping_interval,
310+
ping_timeout=ping_timeout,
299311
close_timeout=close_timeout,
300312
max_queue=max_queue,
301313
)
@@ -315,6 +327,7 @@ def connect(
315327
connection.recv_events_thread.join()
316328
raise
317329

330+
connection.start_keepalive()
318331
return connection
319332

320333

src/websockets/sync/connection.py

+64
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,15 @@ def __init__(
4949
socket: socket.socket,
5050
protocol: Protocol,
5151
*,
52+
ping_interval: float | None = 20,
53+
ping_timeout: float | None = 20,
5254
close_timeout: float | None = 10,
5355
max_queue: int | None | tuple[int | None, int | None] = 16,
5456
) -> None:
5557
self.socket = socket
5658
self.protocol = protocol
59+
self.ping_interval = ping_interval
60+
self.ping_timeout = ping_timeout
5761
self.close_timeout = close_timeout
5862
if isinstance(max_queue, int) or max_queue is None:
5963
max_queue = (max_queue, None)
@@ -120,8 +124,15 @@ def __init__(
120124
Latency is defined as the round-trip time of the connection. It is
121125
measured by sending a Ping frame and waiting for a matching Pong frame.
122126
Before the first measurement, :attr:`latency` is ``0``.
127+
128+
By default, websockets enables a :ref:`keepalive <keepalive>` mechanism
129+
that sends Ping frames automatically at regular intervals. You can also
130+
send Ping frames and measure latency with :meth:`ping`.
123131
"""
124132

133+
# Thread that sends keepalive pings. None when ping_interval is None.
134+
self.keepalive_thread: threading.Thread | None = None
135+
125136
# Public attributes
126137

127138
@property
@@ -700,6 +711,59 @@ def acknowledge_pending_pings(self) -> None:
700711

701712
self.pong_waiters.clear()
702713

714+
def keepalive(self) -> None:
715+
"""
716+
Send a Ping frame and wait for a Pong frame at regular intervals.
717+
718+
"""
719+
assert self.ping_interval is not None
720+
try:
721+
while True:
722+
# If self.ping_timeout > self.latency > self.ping_interval,
723+
# pings will be sent immediately after receiving pongs.
724+
# The period will be longer than self.ping_interval.
725+
self.recv_events_thread.join(self.ping_interval - self.latency)
726+
if not self.recv_events_thread.is_alive():
727+
break
728+
729+
try:
730+
pong_waiter = self.ping(ack_on_close=True)
731+
except ConnectionClosed:
732+
# TODO: cover this race condition
733+
break
734+
if self.debug:
735+
self.logger.debug("% sent keepalive ping")
736+
737+
if self.ping_timeout is not None:
738+
#
739+
if pong_waiter.wait(self.ping_timeout):
740+
if self.debug:
741+
self.logger.debug("% received keepalive pong")
742+
else:
743+
if self.debug:
744+
self.logger.debug("- timed out waiting for keepalive pong")
745+
with self.send_context():
746+
self.protocol.fail(
747+
CloseCode.INTERNAL_ERROR,
748+
"keepalive ping timeout",
749+
)
750+
break
751+
except Exception:
752+
self.logger.error("keepalive ping failed", exc_info=True)
753+
754+
def start_keepalive(self) -> None:
755+
"""
756+
Run :meth:`keepalive` in a thread, unless keepalive is disabled.
757+
758+
"""
759+
if self.ping_interval is not None:
760+
# This thread is marked as daemon like self.recv_events_thread.
761+
self.keepalive_thread = threading.Thread(
762+
target=self.keepalive,
763+
daemon=True,
764+
)
765+
self.keepalive_thread.start()
766+
703767
def recv_events(self) -> None:
704768
"""
705769
Read incoming data from the socket and process events.

src/websockets/sync/server.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ class ServerConnection(Connection):
5252
:exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
5353
closed with any other code.
5454
55-
The ``close_timeout`` and ``max_queue`` arguments have the same meaning as
56-
in :func:`serve`.
55+
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, and
56+
``max_queue`` arguments have the same meaning as in :func:`serve`.
5757
5858
Args:
5959
socket: Socket connected to a WebSocket client.
@@ -66,6 +66,8 @@ def __init__(
6666
socket: socket.socket,
6767
protocol: ServerProtocol,
6868
*,
69+
ping_interval: float | None = 20,
70+
ping_timeout: float | None = 20,
6971
close_timeout: float | None = 10,
7072
max_queue: int | None | tuple[int | None, int | None] = 16,
7173
) -> None:
@@ -74,6 +76,8 @@ def __init__(
7476
super().__init__(
7577
socket,
7678
protocol,
79+
ping_interval=ping_interval,
80+
ping_timeout=ping_timeout,
7781
close_timeout=close_timeout,
7882
max_queue=max_queue,
7983
)
@@ -354,6 +358,8 @@ def serve(
354358
compression: str | None = "deflate",
355359
# Timeouts
356360
open_timeout: float | None = 10,
361+
ping_interval: float | None = 20,
362+
ping_timeout: float | None = 20,
357363
close_timeout: float | None = 10,
358364
# Limits
359365
max_size: int | None = 2**20,
@@ -434,6 +440,10 @@ def handler(websocket):
434440
:doc:`compression guide <../../topics/compression>` for details.
435441
open_timeout: Timeout for opening connections in seconds.
436442
:obj:`None` disables the timeout.
443+
ping_interval: Interval between keepalive pings in seconds.
444+
:obj:`None` disables keepalive.
445+
ping_timeout: Timeout for keepalive pings in seconds.
446+
:obj:`None` disables timeouts.
437447
close_timeout: Timeout for closing connections in seconds.
438448
:obj:`None` disables the timeout.
439449
max_size: Maximum size of incoming messages in bytes.
@@ -563,6 +573,8 @@ def protocol_select_subprotocol(
563573
connection = create_connection(
564574
sock,
565575
protocol,
576+
ping_interval=ping_interval,
577+
ping_timeout=ping_timeout,
566578
close_timeout=close_timeout,
567579
max_queue=max_queue,
568580
)
@@ -590,6 +602,7 @@ def protocol_select_subprotocol(
590602

591603
assert connection.protocol.state is OPEN
592604
try:
605+
connection.start_keepalive()
593606
handler(connection)
594607
except Exception:
595608
connection.logger.error("connection handler failed", exc_info=True)

tests/asyncio/test_connection.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -1036,23 +1036,23 @@ async def test_keepalive_ignores_timeout(self, getrandbits):
10361036

10371037
async def test_keepalive_terminates_while_sleeping(self):
10381038
"""keepalive task terminates while waiting to send a ping."""
1039-
self.connection.ping_interval = 2 * MS
1039+
self.connection.ping_interval = 3 * MS
10401040
self.connection.start_keepalive()
10411041
await asyncio.sleep(MS)
10421042
await self.connection.close()
10431043
self.assertTrue(self.connection.keepalive_task.done())
10441044

10451045
async def test_keepalive_terminates_while_waiting_for_pong(self):
10461046
"""keepalive task terminates while waiting to receive a pong."""
1047-
self.connection.ping_interval = 2 * MS
1048-
self.connection.ping_timeout = 2 * MS
1047+
self.connection.ping_interval = MS
1048+
self.connection.ping_timeout = 3 * MS
10491049
async with self.drop_frames_rcvd():
10501050
self.connection.start_keepalive()
1051-
# 2 ms: keepalive() sends a ping frame.
1052-
await asyncio.sleep(2 * MS)
1051+
# 1 ms: keepalive() sends a ping frame.
1052+
# 1.x ms: a pong frame is dropped.
1053+
await asyncio.sleep(MS)
10531054
# Exiting the context manager sleeps for MS.
1054-
# 2.x ms: a pong frame is dropped.
1055-
# 3 ms: close the connection before ping_timeout elapses.
1055+
# 2 ms: close the connection before ping_timeout elapses.
10561056
await self.connection.close()
10571057
self.assertTrue(self.connection.keepalive_task.done())
10581058

@@ -1062,9 +1062,9 @@ async def test_keepalive_reports_errors(self):
10621062
async with self.drop_frames_rcvd():
10631063
self.connection.start_keepalive()
10641064
# 2 ms: keepalive() sends a ping frame.
1065+
# 2.x ms: a pong frame is dropped.
10651066
await asyncio.sleep(2 * MS)
10661067
# Exiting the context manager sleeps for MS.
1067-
# 2.x ms: a pong frame is dropped.
10681068
# 3 ms: inject a fault: raise an exception in the pending pong waiter.
10691069
pong_waiter = next(iter(self.connection.pong_waiters.values()))[0]
10701070
with self.assertLogs("websockets", logging.ERROR) as logs:

tests/sync/test_client.py

+15
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ def test_disable_compression(self):
7676
with connect(get_uri(server), compression=None) as client:
7777
self.assertEqual(client.protocol.extensions, [])
7878

79+
def test_keepalive_is_enabled(self):
80+
"""Client enables keepalive and measures latency by default."""
81+
with run_server() as server:
82+
with connect(get_uri(server), ping_interval=MS) as client:
83+
self.assertEqual(client.latency, 0)
84+
time.sleep(2 * MS)
85+
self.assertGreater(client.latency, 0)
86+
87+
def test_disable_keepalive(self):
88+
"""Client disables keepalive."""
89+
with run_server() as server:
90+
with connect(get_uri(server), ping_interval=None) as client:
91+
time.sleep(2 * MS)
92+
self.assertEqual(client.latency, 0)
93+
7994
def test_logger(self):
8095
"""Client accepts a logger argument."""
8196
logger = logging.getLogger("test")

0 commit comments

Comments
 (0)