Skip to content

Rust Concurrency With Async Examples

Roberto Fronteddu edited this page Feb 3, 2025 · 1 revision

In many cases, the APIs for working with concurrency using async are very similar to those for using threads. In other cases, they end up being shaped quite differently. Even when the APIs look similar between threads and async, they often have different behavior—and they nearly always have different performance characteristics.

Counting

Let’s count up on two separate threads using async. The trpl crate supplies a spawn_task function which looks very similar to the thread::spawn API, and a sleep function which is an async version of the thread::sleep API. We can use these together to implement the same counting example as with threads:

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

As our starting point, we set up our main function with trpl::run, so that our top-level function can be async. Every example following will include this exact same wrapping code with trpl::run in main, so we will often skip it just like we do with main.

Then we write two loops within that block, each with a trpl::sleep call in it, which waits for half a second (500 milliseconds) before sending the next message. We put one loop in the body of a trpl::spawn_task and the other in a top-level for loop. We also add an .await after the sleep calls.

This version stops as soon as the for loop in the body of the main async block finishes, because the task spawned by spawn_task is shut down when the main function ends. If you want to run all the way to the completion of the task, you will need to use a join handle to wait for the first task to complete. With threads, we used the join method to “block” until the thread was done running, here we can use await to do the same thing, because the task handle itself is a future. Its Output type is a Result, so we also unwrap it after awaiting it.

        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();

This updated version runs till both loops finish. The bigger difference is that we did not need to spawn another operating system thread to do this. In fact, we do not even need to spawn a task here. Because async blocks compile to anonymous futures, we can put each loop in an async block and have the runtime run them both to completion using the trpl::join function.

Remember how you can use the join method on the JoinHandle type returned when you call std::thread::spawn. The trpl::join function is similar, but for futures. When you give it two futures, it produces a single new future whose output is a tuple with the output of each of the futures you passed in once both complete. Below, we do not await fut1 and fut2, but instead the new future produced by trpl::join. We ignore the output, because it is just a tuple with two unit values in it.

        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;

Here, you will see the exact same order every time, which is very different from what we saw with threads. That is because the trpl::join function is fair, meaning it checks each future equally often, alternating between them, and never lets one race ahead if the other is ready. With threads, the operating system decides which thread to check and how long to let it run. With async Rust, the runtime decides which task to check. Runtimes do not have to guarantee fairness for any given operation, and runtimes often offer different APIs to let you choose whether you want fairness or not.

Message Passing

Sharing data between futures will also be familiar: we will use message passing again, but this with async versions of the types and functions.

        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");

Here, we use trpl::channel, an async version of the multiple-producer, single-consumer channel API we used with threads. The async version of the API is only a little different from the thread-based version: it uses a mutable rather than an immutable receiver rx, and its recv method produces a future we need to await rather than producing the value directly. Now we can send messages from the sender to the receiver. Notice that we do not have to spawn a separate thread or even a task; we merely need to await the rx.recv call.

The synchronous Receiver::recv method in std::mpsc::channel blocks until it receives a message. The trpl::Receiver::recv method does not, because it is async. Instead of blocking, it hands control back to the runtime until either a message is received or the send side of the channel closes. By contrast, we do not await the send call, because it does not block. It does not need to, because the channel we are sending it into is unbounded.

Because all of this async code runs in an async block in a trpl::run call, everything within it can avoid blocking. However, the code outside it will block on the run function returning. That is the whole point of the trpl::run function: it lets you choose where to block on some set of async code, and thus where to transition between sync and async code. In most async runtimes, run is actually named block_on for exactly this reason.

Notice two things about this example: First, the message will arrive right away! Second, although we use a future here, there is no concurrency yet. Everything in the listing happens in sequence, just as it would if there were no futures involved.

        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }

In addition to sending the messages, we need to receive them. In this case, we could do that manually, by just doing rx.recv().await four times, because we know how many messages are coming in. In the real world, though, we will generally be waiting on some unknown number of messages. In that case, we need to keep waiting until we determine that there are no more messages.

However, Rust does not yet have a way to write a for loop over an asynchronous series of items. Instead, we need to use a new kind of loop we haven’t seen before, the while let conditional loop. A while let loop is the loop version of the if let construct we saw somewhere else. The loop will continue executing as long as the pattern it specifies continues to match the value.

The rx.recv call produces a Future, which we await. The runtime will pause the Future until it is ready. Once a message arrives, the future will resolve to Some(message), as many times as a message arrives. When the channel closes, regardless of whether any messages have arrived, the future will instead resolve to None to indicate that there are no more values, and we should stop polling—that is, stop awaiting.

The while let loop pulls all of this together. If the result of calling rx.recv().await is Some(message), we get access to the message and we can use it in the loop body, just like we could with if let. If the result is None, the loop ends. Every time the loop completes, it hits the await point again, so the runtime pauses it again until another message arrives.

The code now successfully sends and receives all of the messages. Unfortunately, there are still a couple problems. For one thing, the messages do not arrive at half-second intervals. They arrive all at once, two seconds after we start the program. For another, this program also never exits! Instead, it waits forever for new messages. You will need to shut it down using ctrl-c.

Let’s start by understanding why the messages all come in at once after the full delay, rather than coming in with delays in between each one. Within a given async block, the order that .await keywords appear in the code is also the order they happen when running the program.

There is only one async block so everything in it runs linearly. There is still no concurrency. All the tx.send calls happen, interspersed with all of the trpl::sleep calls and their associated await points. Only then does the while let loop get to go through any of the .await points on the recv calls.

To get the behavior we want, where the sleep delay happens between receiving each message, we need to put the tx and rx operations in their own async blocks. Then the runtime can execute each of them separately using trpl::join, just like in the counting example. Once again, we await the result of calling trpl::join, not the individual futures. If we awaited the individual futures in sequence, we would just end up back in a sequential flow—exactly what we are trying not to do.

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await

With the updated code the messages get printed at 500-millisecond intervals, rather than all in a rush after two seconds.

The program still never exits, though, because of the way while let loop interacts with trpl::join:

  • The future returned from trpl::join only completes once both futures passed to it have completed.
  • The tx future completes once it finishes sleeping after sending the last message in vals.
  • The rx future will not complete until the while let loop ends.
  • The while let loop will not end until awaiting rx.recv produces None.
  • Awaiting rx.recv will only return None once the other end of the channel is closed.
  • The channel will only close if we call rx.close or when the sender side, tx, is dropped.
  • We do not call rx.close anywhere, and tx will not be dropped until the outermost async block passed to trpl::run ends.
  • The block cannot end because it is blocked on trpl::join completing, which takes us back to the top of this list!

We could manually close rx by calling rx.close somewhere, but that does not make much sense. Stopping after handling some arbitrary number of messages would make the program shut down, but we could miss messages. We need some other way to make sure that tx gets dropped before the end of the function.

Right now, the async block where we send the messages only borrows tx, but if we could move tx into that async block, it would be dropped once that block ends. We have seen how to use the move keyword with closures, and that we often need to use move data into closures when working with threads. The same basic dynamics apply to async blocks, so the move keyword works with async blocks just like it does with closures.

Below, we change the async block for sending messages from a plain async block to an async move block. When we run this version of the code, it shuts down gracefully after the last message is sent and received.

        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;

This async channel is also a multiple-producer channel, so we can call clone on tx if we want to send messages from multiple futures.

we clone tx, creating tx1 outside the first async block. We move tx1 into that block just as we did before with tx. Then, later, we move the original tx into a new async block, where we send more messages on a slightly slower delay. We happen to put this new async block after the async block for receiving messages, but it could go before it just as well. They key is the order of the futures are awaited in, not the order they are created in.

Both of the async blocks for sending messages need to be async move blocks, so that both tx and tx1 get dropped when those blocks finish. Otherwise we will end up back in the same infinite loop we started out in. Finally, we switch from trpl::join to trpl::join3 to handle the additional future.

        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;

Now we see all the messages from both sending futures. Because the sending futures use slightly different delays after sending, the messages are also received at those different intervals.

Clone this wiki locally