Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add keepalive & latency measurement to the threading implementation. #1577

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ notice.

*In development*

New features
............

* Added :doc:`keepalive and latency measurement <../topics/keepalive>` to the
:mod:`threading` implementation.

.. _14.2:

14.2
Expand Down
7 changes: 3 additions & 4 deletions docs/reference/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ Both sides
+------------------------------------+--------+--------+--------+--------+
| Send a pong | ✅ | ✅ | ✅ | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Keepalive | ✅ | | — | ✅ |
| Keepalive | ✅ | | — | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Heartbeat | ✅ | | — | ✅ |
| Heartbeat | ✅ | | — | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Measure latency | ✅ | | — | ✅ |
| Measure latency | ✅ | | — | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Perform the closing handshake | ✅ | ✅ | ✅ | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Enforce closing timeout | ✅ | ✅ | — | ✅ |
+------------------------------------+--------+--------+--------+--------+
| Report close codes and reasons | ✅ | ✅ | ✅ | ❌ |
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/sync/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Using a connection

.. autoproperty:: remote_address

.. autoproperty:: latency

.. autoproperty:: state

The following attributes are available after the opening handshake,
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/sync/common.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Both sides (:mod:`threading`)

.. autoproperty:: remote_address

.. autoattribute:: latency

.. autoproperty:: state

The following attributes are available after the opening handshake,
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/sync/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Using a connection

.. autoproperty:: remote_address

.. autoproperty:: latency

.. autoproperty:: state

The following attributes are available after the opening handshake,
Expand Down
5 changes: 0 additions & 5 deletions docs/topics/keepalive.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
Keepalive and latency
=====================

.. admonition:: This guide applies only to the :mod:`asyncio` implementation.
:class: tip

The :mod:`threading` implementation doesn't provide keepalive yet.

.. currentmodule:: websockets

Long-lived connections
Expand Down
33 changes: 19 additions & 14 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def __init__(
# Protect sending fragmented messages.
self.fragmented_send_waiter: asyncio.Future[None] | None = None

# Exception raised while reading from the connection, to be chained to
# ConnectionClosed in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Completed when the TCP connection is closed and the WebSocket
# connection state becomes CLOSED.
self.connection_lost_waiter: asyncio.Future[None] = self.loop.create_future()

# Mapping of ping IDs to pong waiters, in chronological order.
self.pong_waiters: dict[bytes, tuple[asyncio.Future[float], float]] = {}

Expand All @@ -120,14 +128,6 @@ def __init__(
# Task that sends keepalive pings. None when ping_interval is None.
self.keepalive_task: asyncio.Task[None] | None = None

# Exception raised while reading from the connection, to be chained to
# ConnectionClosed in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Completed when the TCP connection is closed and the WebSocket
# connection state becomes CLOSED.
self.connection_lost_waiter: asyncio.Future[None] = self.loop.create_future()

# Adapted from asyncio.FlowControlMixin
self.paused: bool = False
self.drain_waiters: collections.deque[asyncio.Future[None]] = (
Expand Down Expand Up @@ -686,8 +686,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
pong_waiter = self.loop.create_future()
# The event loop's default clock is time.monotonic(). Its resolution
# is a bit low on Windows (~16ms). This is improved in Python 3.13.
ping_timestamp = self.loop.time()
self.pong_waiters[data] = (pong_waiter, ping_timestamp)
self.pong_waiters[data] = (pong_waiter, self.loop.time())
self.protocol.send_ping(data)
return pong_waiter

Expand Down Expand Up @@ -792,13 +791,19 @@ async def keepalive(self) -> None:
latency = 0.0
try:
while True:
# If self.ping_timeout > latency > self.ping_interval, pings
# will be sent immediately after receiving pongs. The period
# will be longer than self.ping_interval.
# If self.ping_timeout > latency > self.ping_interval,
# pings will be sent immediately after receiving pongs.
# The period will be longer than self.ping_interval.
await asyncio.sleep(self.ping_interval - latency)

self.logger.debug("% sending keepalive ping")
# This cannot raise ConnectionClosed when the connection is
# closing because ping(), via send_context(), waits for the
# connection to be closed before raising ConnectionClosed.
# However, connection_lost() cancels keepalive_task before
# it gets a chance to resume excuting.
pong_waiter = await self.ping()
if self.debug:
self.logger.debug("% sent keepalive ping")

if self.ping_timeout is not None:
try:
Expand Down
17 changes: 15 additions & 2 deletions src/websockets/sync/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class ClientConnection(Connection):
:exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
closed with any other code.

The ``close_timeout`` and ``max_queue`` arguments have the same meaning as
in :func:`connect`.
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, and
``max_queue`` arguments have the same meaning as in :func:`connect`.

Args:
socket: Socket connected to a WebSocket server.
Expand All @@ -54,6 +54,8 @@ def __init__(
socket: socket.socket,
protocol: ClientProtocol,
*,
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
Expand All @@ -62,6 +64,8 @@ def __init__(
super().__init__(
socket,
protocol,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
close_timeout=close_timeout,
max_queue=max_queue,
)
Expand Down Expand Up @@ -136,6 +140,8 @@ def connect(
compression: str | None = "deflate",
# Timeouts
open_timeout: float | None = 10,
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
Expand Down Expand Up @@ -184,6 +190,10 @@ def connect(
:doc:`compression guide <../../topics/compression>` for details.
open_timeout: Timeout for opening the connection in seconds.
:obj:`None` disables the timeout.
ping_interval: Interval between keepalive pings in seconds.
:obj:`None` disables keepalive.
ping_timeout: Timeout for keepalive pings in seconds.
:obj:`None` disables timeouts.
close_timeout: Timeout for closing the connection in seconds.
:obj:`None` disables the timeout.
max_size: Maximum size of incoming messages in bytes.
Expand Down Expand Up @@ -296,6 +306,8 @@ def connect(
connection = create_connection(
sock,
protocol,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
close_timeout=close_timeout,
max_queue=max_queue,
)
Expand All @@ -315,6 +327,7 @@ def connect(
connection.recv_events_thread.join()
raise

connection.start_keepalive()
return connection


Expand Down
Loading
Loading