-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More than one subscription, routing it to a specific handler #261
Comments
Hi there Stefan, I'll respond to your question of handling incoming messages concurrently. We can also happily discuss how to make aiomqtt more elegant if you have specific ideas! The second admonition in the "Message queue" section of our docs shows a small example of how to handle each message in a separate coroutine. However, as far as I understand your pseudocode, you'd like to have one coroutine per subscription instead of per message. We'll need to dive a bit into asyncio for this 😋 The idea is to implement a "distributor" that sorts the incoming messages into different asyncio queues, which are then processed concurrently (but sequentially inside a subscription). Here's a minimal working example: import asyncio
import aiomqtt
async def fast_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(0.2)
await client.publish("fast", "fast")
async def fast_consumer():
while True:
message = await fast_queue.get()
print(f"Fast consumer received: {message.payload}")
async def slow_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(2)
await client.publish("slow", "slow")
async def slow_consumer():
while True:
message = await slow_queue.get()
print(f"Slow consumer received: {message.payload}")
fast_queue = asyncio.Queue()
slow_queue = asyncio.Queue()
async def distributor(client: aiomqtt.Client):
async with client.messages() as messages:
await client.subscribe("fast")
await client.subscribe("slow")
# Sort messages into the appropriate queues
async for message in messages:
if message.topic.matches("fast"):
fast_queue.put_nowait(message)
elif message.topic.matches("slow"):
slow_queue.put_nowait(message)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_producer(client))
tg.create_task(fast_consumer())
tg.create_task(slow_producer(client))
tg.create_task(slow_consumer())
tg.create_task(distributor(client))
asyncio.run(main()) Does that make sense? 🙂 Issue #250 is similar and could have some more context. |
@empicano what I would love to avoid is the |
We had I agree with you however that aiomqtt is not yet as elegant as it could be. |
@empicano was there a change? |
Hi Stefan, I should have commented something before closing, sorry about that. I'm currently weeding out issues to get an overview of what to work on. Do you have specific changes to aiomqtt that you still want to discuss? |
If you say "just solve it with if-then-else" that is fine for me. But I wish some way of routing would exists based on on the semantics that is allowed making subscriptions (the + #). Ideally a bit bigger than that so a single subscription could be used as well, with matching happening inside of the application. Might even be more elegant that the actual subscription would not be required at all, and the annotations figuring out the 'best' subscription(s). |
You probably know this, but the I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like? |
Did not know this :-) I literally redo all those things manually so it would be already an improvement.
With the above it may become obvious that creating two subscription would be the best towards the server, and a subscription on '/#' wasteful. |
Interesting, this seems similar to how fastapi-mqtt is designed, and in general many web frameworks. Some thoughts on this:
I guess what could work is to pass a handler when calling subscribe and assigning the messages to the correct handler under the hood instead of letting the user do that with async def handle_temperature(message):
print(message.payload)
async def handle_humidity(message, room):
print(message.payload)
async with Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#", handle_temperature)
await client.subscribe("humidity/{room}/#", handle_humidity)
# Block here or do something else such that we don't exit the context manager However, this would be a big change to the interface that I'm not sure is worth it. Some more intricate ways of prioritizing and handling messages concurrently would for example be more difficult to achieve. I believe that one of aiomqtt's strengths is that it's very flexible. The @frederikaalund, what do you think about this? 😊 |
my PR #302 has code that does this. async def get_temperature(client):
async with client.subscription("temperature/#") as sub:
async for msg in sub:
await handle_temperature(msg)
print(message.payload)
async with Client("test.mosquitto.org") as client, anyio.create_task_group() as tg:
tg.start_soon(get_temperature, client)
tg.start_soon(get_humidity, client) The "old" |
@smurfix how does it work? Does it make a new connection for every subscription? Does it do any deduplication work, for example if a subscription to # is made, and a subscription to something/#, is only one upstream subscription created? |
The MQTT protocol has a nice feature where you can send a code along with each subscription; the server adds the codes of all matching subscriptions to the PUBLISH packet that it sends to you. My patch uses a unique code for each This of course assumes that no user of aiomqtt uses this subscription feature directly … which is moderately unlikely, since the server might not support it and the Connect Ack handler didn't save whether it does or not. If the server doesn't support these codes I have a fallback implementation that dispatches on the message topic in the client. |
But if I have two 'processes' that use the data from the same subscription, that subscription is not 'reused' from the MQTT server perspective, there is a subscription for each time |
That part doesn't work because you can't ask the MQTT server for two (non-shared) subscriptions on the same topic. That's per the MQTT protocol specification. I thought about implementing this (simply attach more than one queue to the topic) but if you try you run into interesting problems (does the second task want retained messages? does the server even send them when you re-SUBSCRIBE? if so, shall the existing task get these too? which QoS should be used and what about the other properties you can attach to a subscription?) which I don't have a good answer for, thus for now the code doesn't allow duplicate subscriptions. What you can do is for one task to ask for Assuming, of course, that you use MQTT5 and that the server supports shared subscriptions. |
I just wonder how it will go with the more interesting cases. I think the variant 'temperature/#' (storage in database) and 'temperature/site_no_1' (streaming to a client) needs the explanation. Will there be one subscription or two? |
@skinkie Well if one task calls A reasonable server will send us messages to "temperature/somewhere exactly once, tagged with both subscription IDs. |
@smurfix I was not aware this subscription id could be duplicate on a message level as well. In that case this is a very clean implementation. My rationale was that the subscribing client was responsible for the deduplication of the inter process communication. Now you basically have pushed this to the server by using the subscription IDs. |
Or rather I have asked the server to please tell us about the result of its dispatching effort. There's no deduplication involved – at least unless you also consider shared subscriptions. Those are separate from unshared subscriptions, thus when you have both shared and unshared subscriptions that match a message, the server is free to send the same message twice — mosquitto does that. You can't handle the resulting ambiguity without using subscription IDs. |
The example denotes the following.
If a user wants to also test for temperature:
Is there any way to do this more elegantly? Maybe even in a concurrent way?
Pseudo:
The text was updated successfully, but these errors were encountered: