Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync version of broadcast #1466

Open
Phlogistique opened this issue May 14, 2024 · 5 comments
Open

Sync version of broadcast #1466

Phlogistique opened this issue May 14, 2024 · 5 comments

Comments

@Phlogistique
Copy link

Phlogistique commented May 14, 2024

The library includes a broadcast utility, with detailed documentation and motivation here.

However, today that utility only works with the asyncio API of WebSockets.

That leaves users of the sync API with two possibilities, if they want to broadcast a message without risk of blocking the thread if some client's buffer is full:

  • Implement broadcast by accessing directly the socket and the sans-IO API to set the sockets to be non-blocking during the broadcast. (tricky!)
  • Implement broadcast by the way of application-level queues, and per-client threads consuming from the queues (runtime and implementation overhead)

It would be nice if the sync API of this library included a ready-made broadcast utility.

@aaugustin
Copy link
Member

Yes, that would be nice.

As long as it doesn't exist in the library, I'd recommend against your first option.

Your second option can be implemented quite easily with a ThreadPoolExecutor that will handle the queuing and consuming for you. Given enough threads and a short enough timeout on send(), you can guarantee that the operation will complete within a certain time frame e.g. if you have 50 connections, use 25 threads and a timeout at 5 seconds and you will finish within 10 seconds.

If you have 5000 connections you shouldn't be using the thread-based implementation in the first place :-)

@aaugustin
Copy link
Member

Marking as a potential improvement.

@aaugustin
Copy link
Member

I'm still somewhat skeptical that I want to provide this API at all.

For a small number connections, this will do the job:

for connection in connections:
    connection.send(message)

For a larger number of connections, the sync implementation isn't the right choice anyway.

@aaugustin
Copy link
Member

To avoid the risk of blocking on one connection — a risk that exists only in high-throughput scenarios where asyncio is clearly a better choice — one can do this:

with concurrent.futures.ThreadPoolExecutor() as executor:
    for connection in connections:
        executor.submit(connection.send, message)

(which is the concrete implementation of my earlier suggestion to use ThreadPoolExecutor.

If I implement broadcast, this will probably be the core of the implementation, with error handling added.

@Phlogistique
Copy link
Author

Blocking on one connection can happen even with a handful of connections. In my case, typically I will have a frozen/crashed client slowing down the server. As I don't want to create a thread every time I send a message, I settled for creating a thread and an application-level queue per client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants