Running multiple callbacks asynchronously for a subscriber #555
-
Hi, I have the following code:
If I published 5 messages onto channel I was hoping for 5 intertwined cb calls running together but they each run sequentially waiting for the previous one to finish. Does the library support have multiple callbacks running in async? I found this issue being asked in the past in STAN nats-io/stan.py#16 but I'm not sure if it's something posible in NATS PYTHON. I also tried to run without manual ack but the result were the same. I replicated the same logic but in golang with goroutines and the same logic is working there. |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 1 reply
-
You'll be better off with either a library such as anyio or the standard library starting from Python 3.11. Basically:
At any time, you can cancel your subscriptions and wait upon your task group. Here is a quick code example, not tested: from asyncio import TaskGroup, Semaphore
from contextlib import AsyncExitStack
from typing import Callable, Coroutine
from nats import connect
from nats.aio.msg import Msg
MsgCallback = Callable[[Msg], Coroutine[None, None, None]]
"""A type alias for NATS message callbacks."""
class TaskExecutor:
"""A class to execute tasks with a limited concurrency."""
def __init__(self, max_concurrency: int) -> None:
self.max_concurrency = max_concurrency
self.semaphore = Semaphore(max_concurrency)
self.task_group = TaskGroup()
self.stack = AsyncExitStack()
async def start(self) -> None:
await self.stack.__aenter__()
await self.stack.enter_async_context(self.task_group)
async def stop(self) -> None:
await self.stack.aclose()
def make_concurrent(self, cb: MsgCallback) -> MsgCallback:
async def callback(msg: Msg) -> None:
# Acquire semaphore to limit concurrency
await self.semaphore.acquire()
# Start task in task group
task = self.task_group.create_task(cb(msg))
# Release semaphore when task is done
task.add_done_callback(lambda _: self.semaphore.release())
return callback
async def my_app() -> None:
# Create your executor
my_executor = TaskExecutor(max_concurrency=10)
# Start the executor
await my_executor.start()
# Define a NATS message callback
async def my_callback(msg: Msg) -> None:
print(f"Received a message: {msg}")
# Create a NATS connection
nc = await connect("localhost:4222")
# Subscribe to a subject with concurrency limit
await nc.subscribe("my_subject", cb=my_executor.make_concurrent(my_callback))
# Subscribe to another subject with shared concurrency limit
await nc.subscribe("another_subject", cb=my_executor.make_concurrent(my_callback))
# Do things
...
# Close the NATS connection
await nc.close()
# Stop the executor
await my_executor.stop() |
Beta Was this translation helpful? Give feedback.
-
BTW, it's not much different from Go, if you don't want to make sure that all your tasks (e.g. goroutines in go) are done before exiting your program, you could just do within your callback: asyncio.create_task(some_coroutine_function()) The task group in the example above is used to wait for tasks, just like a WaitGroup can be used to wait for a collection of goroutines to finish. IMO, it's great that nats client handles message sequentially, developers can rely on the language at hand to achieve concurrency. Golang have goroutines, while asyncio has tasks and more recently higher-level abstractions such as the task group. |
Beta Was this translation helpful? Give feedback.
-
Last comment 😅 Did you check out https://natsbyexample.com/examples/messaging/concurrent/python ? |
Beta Was this translation helpful? Give feedback.
-
Hi @charbonnierg thanks a lot for your help I forgot to mention some important details:
and this is your updated task group example for jetstream:
So as you can see in the callback method I tried three things both in nats and nats jetstream:
Any idea of how to manage concurrency without multiple subjects? Replying to your other messages I prefer using task group instead of just
Thanks again for your help |
Beta Was this translation helpful? Give feedback.
-
I'm not sure I understood your problem, but I think that you're looking for loop.run_in_executor() function to submit a blocking function within a coroutine function. Also, you need to have Click to unfold an exampleimport time
import random
import asyncio
from concurrent.futures import ThreadPoolExecutor
from contextlib import AsyncExitStack
from typing import Callable, Coroutine
from nats.aio.client import Client as NATS
from nats.js import api as jetstream
from nats.js import errors as jetstream_errors
from nats.aio.msg import Msg
MsgCallback = Callable[[Msg], Coroutine[None, None, None]]
"""A type alias for NATS message callbacks."""
class TaskExecutor:
"""A class to execute tasks with a limited concurrency."""
def __init__(self, max_concurrency: int) -> None:
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
self.task_group = asyncio.TaskGroup()
self.stack = AsyncExitStack()
async def start(self) -> None:
await self.stack.__aenter__()
await self.stack.enter_async_context(self.task_group)
async def stop(self) -> None:
await self.stack.aclose()
def make_concurrent(self, cb: MsgCallback) -> MsgCallback:
async def callback(msg: Msg) -> None:
# Acquire semaphore to limit concurrency
await self.semaphore.acquire()
# Start task in task group
task = self.task_group.create_task(cb(msg))
# Release semaphore when task is done
task.add_done_callback(lambda _: self.semaphore.release())
return callback
def blocking_task(idx: int) -> None:
print(f"Start blocking task {idx}")
time.sleep(random.random())
print(f"End blocking task {idx}")
async def main() -> None:
# Create and start executor
task_executor = TaskExecutor(max_concurrency=5)
thread_pool_executor = ThreadPoolExecutor(max_workers=5)
await task_executor.start()
# Create and connect a NATS client
nc = NATS()
await nc.connect()
# Get a jetstream context
js = nc.jetstream()
# Make sure stream does not exist (for test purpose only)
try:
await js.delete_stream("DEMO")
except jetstream_errors.NotFoundError:
pass
# Create a stream
await js.add_stream(jetstream.StreamConfig(name="DEMO", subjects=["demo.>"]))
# Publish 5 messages to the stream
for idx in range(5):
await js.publish("demo.foo", f"{idx}".encode(), stream="DEMO")
# Define a deliver subject
deliver_subject = nc.new_inbox()
# Create an ephemeral consumer on the stream allowing 5 pending acks
consumer = await js.add_consumer(
stream="DEMO",
config=jetstream.ConsumerConfig(
name="DEMO-CONSUMER",
deliver_subject=deliver_subject,
ack_policy=jetstream.AckPolicy.EXPLICIT,
replay_policy=jetstream.ReplayPolicy.INSTANT,
deliver_policy=jetstream.DeliverPolicy.ALL,
max_ack_pending=5,
),
)
# Define an event to be set when all events are received
event = asyncio.Event()
# Define a variable to count the number of messages received
received = 0
# Define a function to increment the received count and set the event
def increment() -> None:
nonlocal received
received += 1
if received == 5:
event.set()
# Get a reference to the event loop
loop = asyncio.get_running_loop()
# Define a callback to process messages for the consumer
async def cb(msg: Msg) -> None:
print(f"Received message: {msg.data}")
# Process the message within the thread pool executor
await loop.run_in_executor(
# The thread pool executor that is used to run the blocking function
thread_pool_executor,
# The blocking function that is executed in the thread pool
blocking_task,
# The arguments of the blocking function
int(msg.data.decode()),
)
# Ack the message
await msg.ack()
# Increment the counter
increment()
# Create a subscription for the consumer using the callback
await js.subscribe_bind(
"DEMO",
consumer.config,
consumer.name,
cb=task_executor.make_concurrent(cb),
manual_ack=True,
)
# Wait for messages to be processed
await event.wait()
# Close the client
await nc.close()
# Stop the executor
await task_executor.stop()
if __name__ == "__main__":
asyncio.run(main()) When I run the example I get:
As you can see, messages are not processed in order. If this still does not help you, I'm afraid you will have to get help from others from the community, as I won't have time to dig more into this. |
Beta Was this translation helpful? Give feedback.
I'm not sure I understood your problem, but I think that you're looking for loop.run_in_executor() function to submit a blocking function within a coroutine function. Also, you need to have
max_ack_pending
greater than 1 if you want several callbacks to run at the same time.Click to unfold an example