Async and Await

Many operations we ask the computer to do can take a while to finish. For example, if you used a video editor to create a video of a family celebration, exporting it could take anywhere from minutes to hours. Similarly, downloading a video shared by someone in your family might take a long time. It would be nice if we could do something else while we are waiting for those long-running processes to complete.

The video export will use as much CPU and GPU power as it can. If you only had one CPU core, and your operating system never paused that export until it completed, you couldn’t do anything else on your computer while it was running. That would be a pretty frustrating experience, though. Instead, your computer’s operating system can—and does!—invisibly interrupt the export often enough to let you get other work done along the way.

The file download is different. It does not take up very much CPU time. Instead, the CPU needs to wait on data to arrive from the network. While you can start reading the data once some of it is present, it might take a while for the rest to show up. Even once the data is all present, a video can be quite large, so it might take some time to load it all. Maybe it only takes a second or two—but that’s a very long time for a modern processor, which can do billions of operations every second. It would be nice to be able to put the CPU to use for other work while waiting for the network call to finish—so, again, your operating system will invisibly interrupt your program so other things can happen while the network operation is still ongoing.

Note: The video export is the kind of operation which is often described as “CPU-bound” or “compute-bound”. It’s limited by the speed of the computer’s ability to process data within the CPU or GPU, and how much of that speed it can use. The video download is the kind of operation which is often described as “IO-bound,” because it’s limited by the speed of the computer’s input and output. It can only go as fast as the data can be sent across the network.

In both of these examples, the operating system’s invisible interrupts provide a form of concurrency. That concurrency only happens at the level of a whole program, though: the operating system interrupts one program to let other programs get work done. In many cases, because we understand our programs at a much more granular level than the operating system does, we can spot lots of opportunities for concurrency that the operating system cannot see.

For example, if we’re building a tool to manage file downloads, we should be able to write our program in such a way that starting one download does not lock up the UI, and users should be able to start multiple downloads at the same time. Many operating system APIs for interacting with the network are blocking, though. That is, these APIs block the program’s progress until the data that they are processing is completely ready.

Note: This is how most function calls work, if you think about it! However, we normally reserve the term “blocking” for function calls which interact with files, the network, or other resources on the computer, because those are the places where an individual program would benefit from the operation being non-blocking.

We could avoid blocking our main thread by spawning a dedicated thread to download each file. However, we would eventually find that the overhead of those threads was a problem. It would also be nicer if the call were not blocking in the first place. Last but not least, it would be better if we could write in the same direct style we use in blocking code. Something similar to this:

let data = fetch_data_from(url).await;
println!("{data}");

That is exactly what Rust’s async abstraction gives us. Before we see how this works in practice, though, we need to take a short detour into the differences between parallelism and concurrency.

Parallelism and Concurrency

In the “Fearless Concurrency” chapter on page XX, we treated parallelism and concurrency as mostly interchangeable. Now we need to distinguish between them more precisely, because the differences will show up as we start working.

Consider the different ways a team could split up work on a software project. We could assign a single individual multiple tasks, or we could assign one task per team member, or we could do a mix of both approaches.

When an individual works on several different tasks before any of them is complete, this is concurrency. Maybe you have two different projects checked out on your computer, and when you get bored or stuck on one project, you switch to the other. You’re just one person, so you can’t make progress on both tasks at the exact same time—but you can multi-task, making progress on multiple tasks by switching between them.

Figure 17-1: A concurrent workflow, switching between Task A and Task B.

When you agree to split up a group of tasks between the people on the team, with each person taking one task and working on it alone, this is parallelism. Each person on the team can make progress at the exact same time.

Figure 17-2: A parallel workflow, where work happens on Task A and Task B independently.

With both of these situations, you might have to coordinate between different tasks. Maybe you thought the task that one person was working on was totally independent from everyone else’s work, but it actually needs something finished by another person on the team. Some of the work could be done in parallel, but some of it was actually serial: it could only happen in a series, one thing after the other, as in Figure 17-3.

Figure 17-3: A partially parallel workflow, where work happens on Task A and Task B independently until task A3 is blocked on the results of task B3.

Likewise, you might realize that one of your own tasks depends on another of your tasks. Now your concurrent work has also become serial.

Parallelism and concurrency can intersect with each other, too. If you learn that a colleague is stuck until you finish one of your tasks, you’ll probably focus all your efforts on that task to “unblock” your colleague. You and your coworker are no longer able to work in parallel, and you’re also no longer able to work concurrently on your own tasks.

The same basic dynamics come into play with software and hardware. On a machine with a single CPU core, the CPU can only do one operation at a time, but it can still work concurrently. Using tools such as threads, processes, and async, the computer can pause one activity and switch to others before eventually cycling back to that first activity again. On a machine with multiple CPU cores, it can also do work in parallel. One core can be doing one thing while another core does something completely unrelated, and those actually happen at the same time.

When working with async in Rust, we’re always dealing with concurrency. Depending on the hardware, the operating system, and the async runtime we are using—more on async runtimes shortly!—that concurrency may also use parallelism under the hood.

Now, let’s dive into how async programming in Rust actually works! In the rest of this chapter, we will:

  • see how to use Rust’s async and await syntax
  • explore how to use the async model to solve some of the same challenges we looked at in Chapter 16
  • look at how multithreading and async provide complementary solutions, which you can even use together in many cases

Futures and the Async Syntax

The key elements of asynchronous programming in Rust are futures and Rust’s async and await keywords.

A future is a value which may not be ready now, but will become ready at some point in the future. (This same concept shows up in many languages, sometimes under other names such as “task” or “promise”.) Rust provides a Future trait as a building block so different async operations can be implemented with different data structures, but with a common interface. In Rust, we say that types which implement the Future trait are futures. Each type which implements Future holds its own information about the progress that has been made and what “ready” means.

The async keyword can be applied to blocks and functions to specify that they can be interrupted and resumed. Within an async block or async function, you can use the await keyword to wait for a future to become ready, called awaiting a future. Each place you await a future within an async block or function is a place that async block or function may get paused and resumed. The process of checking with a future to see if its value is available yet is called polling.

Some other languages also use async and await keywords for async programming. If you’re familiar with those languages, you may notice some significant differences in how Rust does things, including how it handles the syntax. That’s for good reason, as we’ll see!

Most of the time when writing async Rust, we use the async and await keywords. Rust compiles them into equivalent code using the Future trait, much as it compiles for loops into equivalent code using the Iterator trait. Because Rust provides the Future trait, though, you can also implement it for your own data types when you need to. Many of the functions we’ll see throughout this chapter return types with their own implementations of Future. We’ll return to the definition of the trait at the end of the chapter and dig into more of how it works, but this is enough detail to keep us moving forward.

That may all feel a bit abstract. Let’s write our first async program: a little web scraper. We’ll pass in two URLs from the command line, fetch both of them concurrently, and return the result of whichever one finishes first. This example will have a fair bit of new syntax, but don’t worry. We’ll explain everything you need to know as we go.

Our First Async Program

To keep this chapter focused on learning async, rather than juggling parts of the ecosystem, we have created the trpl crate (trpl is short for “The Rust Programming Language”). It re-exports all the types, traits, and functions you’ll need, primarily from the futures and tokio crates, available on https://crates.io.

  • The futures crate is an official home for Rust experimentation for async code, and is actually where the Future type was originally designed.

  • Tokio is the most widely used async runtime in Rust today, especially (but not only!) for web applications. There are other great runtimes out there, and they may be more suitable for your purposes. We use Tokio under the hood for trpl because it’s well-tested and widely used.

In some cases, trpl also renames or wraps the original APIs to let us stay focused on the details relevant to this chapter. If you want to understand what the crate does, we encourage you to check out its source code at https://github.com/rust-lang/book/tree/main/packages/trpl. You’ll be able to see what crate each re-export comes from, and we’ve left extensive comments explaining what the crate does.

Create a new binary project named hello-async and add the trpl crate as a dependency:

$ cargo new hello-async
$ cd hello-async
$ cargo add trpl

Now we can use the various pieces provided by trpl to write our first async program. We’ll build a little command line tool which fetches two web pages, pulls the <title> element from each, and prints out the title of whichever finishes that whole process first.

Let’s start by writing a function that takes one page URL as a parameter, makes a request to it, and returns the text of the title element:

Filename: src/main.rs

use trpl::Html;

async fn page_title(url: &str) -> Option<String> {
    let response = trpl::get(url).await;
    let response_text = response.text().await;
    Html::parse(&response_text)
        .select_first("title")
        .map(|title_element| title_element.inner_html())
}

Listing 17-1: Defining an async function to get the title element from an HTML page

In Listing 17-1, we define a function named page_title, and we mark it with the async keyword. Then we use the trpl::get function to fetch whatever URL is passed in, and, and we await the response by using the await keyword. Then we get the text of the response by calling its text method and once again awaiting it with the await keyword. Both of these steps are asynchronous. For get, we need to wait for the server to send back the first part of its response, which will include HTTP headers, cookies, and so on. That part of the response can be delivered separately from the body of the request. Especially if the body is very large, it can take some time for it all to arrive. Thus, we have to wait for the entirety of the response to arrive, so the text method is also async.

We have to explicitly await both of these futures, because futures in Rust are lazy: they don’t do anything until you ask them to with await. (In fact, Rust will show a compiler warning if you don’t use a future.) This should remind you of our discussion of iterators back in the “Processing a Series of Items with Iterators” section of Chapter 13 on page XX. Iterators do nothing unless you call their next method—whether directly, or using for loops or methods such as map which use next under the hood. With futures, the same basic idea applies: they do nothing unless you explicitly ask them to. This laziness allows Rust to avoid running async code until it’s actually needed.

Note: This is different from the behavior we saw when using thread::spawn in the “Creating a New Thread with spawn” section of Chapter 16 on page XX, where the closure we passed to another thread started running immediately. It’s also different from how many other languages approach async! But it’s important for Rust. We’ll see why that is later.

Once we have response_text, we can then parse it into an instance of the Html type using Html::parse. Instead of a raw string, we now have a data type we can use to work with the HTML as a richer data structure. In particular, we can use the select_first method to find the first instance of a given CSS selector. By passing the string "title", we’ll get the first <title> element in the document, if there is one. Because there may not be any matching element, select_first returns an Option<ElementRef>. Finally, we use the Option::map method, which lets us work with the item in the Option if it’s present, and do nothing if it isn’t. (We could also use a match expression here, but map is more idiomatic.) In the body of the function we supply to map, we call inner_html on the title_element to get its content, which is a String. When all is said and done, we have an Option<String>.

Notice that Rust’s await keyword goes after the expression you’re awaiting, not before it. That is, it’s a postfix keyword. This may be different from what you might be used to if you have used async in other languages. Rust chose this because it makes chains of methods much nicer to work with. As a result, we can change the body of page_url_for to chain the trpl::get and text function calls together with await between them, as shown in Listing 17-2:

Filename: src/main.rs

    let response_text = trpl::get(url).await.text().await;

Listing 17-2: Chaining with the await keyword

With that, we have successfully written our first async function! Before we add some code in main to call it, let’s talk a little more about what we’ve written and what it means.

When Rust sees a block marked with the async keyword, it compiles it into a unique, anonymous data type which implements the Future trait. When Rust sees a function marked with async, it compiles it into a non-async function whose body is an async block. An async function’s return type is the type of the of the anonymous data type the compiler creates for that async block.

Thus, writing async fn is equivalent to writing a function which returns a future of the return type. When the compiler sees a function definition such as the async fn page_title in Listing 17-1, it’s equivalent to a non-async function defined like this:

use std::future::Future;
use trpl::Html;

fn page_title(url: &str) -> impl Future<Output = Option<String>> + '_ {
    async move {
        let text = trpl::get(url).await.text().await;
        Html::parse(&text)
            .select_first("title")
            .map(|title| title.inner_html())
    }
}

Let’s walk through each part of the transformed version:

  • It uses the impl Trait syntax we discussed back in the “Traits as Parameters” section in Chapter 10 on page XX.
  • The returned trait is a Future, with an associated type of Output. Notice that the Output type is Option<String>, which is the same as the the original return type from the async fn version of page_title.
  • All of the code called in the body of the original function is wrapped in an async move block. Remember that blocks are expressions. This whole block is the expression returned from the function.
  • This async block produces a value with the type Option<String>, as described above. That value matches the Output type in the return type. This is just like other blocks you have seen.
  • The new function body is an async move block because of how it uses the url parameter. (We’ll talk about async vs. async move much more later in the chapter.)
  • The new version of the function has a kind of lifetime we haven’t seen before in the output type: '_. Because the function returns a Future which refers to a reference—in this case, the reference from the url parameter—we need to tell Rust that we mean for that reference to be included. We don’t have to name the lifetime here, because Rust is smart enough to know there is only one reference which could be involved, but we do have to be explicit that the resulting Future is bound by that lifetime.

Now we can call page_title in main. To start, we’ll just get the title for a single page. In Listing 17-3, we follow the same pattern we used for getting command line arguments back in the “Accepting Command Line Arguments” section of Chapter 12 on page XX. Then we pass the first URL page_title, and await the result. Because the value produced by the future is an Option<String>, we use a match expression to print different messages to account for whether the page had a <title>.

Filename: src/main.rs

async fn main() {
    let args: Vec<String> = std::env::args().collect();
    let url = &args[1];
    match page_title(url).await {
        Some(title) => println!("The title for {url} was {title}"),
        None => println!("{url} had no title"),
    }
}

Listing 17-3: Calling the page_title function from main with a user-supplied argument

Unfortunately, this doesn’t compile. The only place we can use the await keyword is in async functions or blocks, and Rust won’t let us mark the special main function as async.

error[E0752]: `main` function is not allowed to be `async`
 --> src/main.rs:6:1
  |
6 | async fn main() {
  | ^^^^^^^^^^^^^^^ `main` function is not allowed to be `async`

The reason main can’t be marked async is that async code needs a runtime: a Rust crate which manages the details of executing asynchronous code. A program’s main function can initialize a runtime, but it’s not a runtime itself. (We’ll see more about why this is a bit later.) Every Rust program that executes async code has at least one place where it sets up a runtime and executes the futures.

Most languages which support async bundle a runtime with the language. Rust does not. Instead, there are many different async runtimes available, each of which makes different tradeoffs suitable to the use case they target. For example, a high-throughput web server with many CPU cores and a large amount of RAM has very different different needs than a microcontroller with a single core, a small amount of RAM, and no ability to do heap allocations. The crates which provide those runtimes also often supply async versions of common functionality such as file or network I/O.

Here, and throughout the rest of this chapter, we’ll use the run function from the trpl crate, which takes a future as an argument and runs it to completion. Behind the scenes, calling run sets up a runtime to use to run the future passed in. Once the future completes, run returns whatever value the future produced.

We could pass the future returned by page_title directly to run. Once it completed, we would be able to match on the resulting Option<String>, the way we tried to do in Listing 17-3. However, for most of the examples in the chapter (and most async code in the real world!), we’ll be doing more than just one async function call, so instead we’ll pass an async block and explicitly await the result of calling page_title, as in Listing 17-4.

Filename: src/main.rs

fn main() {
    let args: Vec<String> = std::env::args().collect();

    trpl::run(async {
        let url = &args[1];
        match page_title(url).await {
            Some(title) => println!("The title for {url} was {title}"),
            None => println!("{url} had no title"),
        }
    })
}

Listing 17-4: Awaiting an async block with trpl::run

When we run this, we get the behavior we might have expected initially:

$ cargo run "http://www.rust-lang.org"
The title for http://www.rust-lang.org was
            Rust Programming Language

Phew: we finally have some working async code! This now compiles, and we can run it. Before we add code to race two sites against each other, let’s briefly turn our attention back to how futures work.

Each await point—that is, every place where the code uses the await keyword—represents a place where control gets handed back to the runtime. To make that work, Rust needs to keep track of the state involved in the async block, so that the runtime can kick off some other work and then come back when it’s ready to try advancing this one again. This is an invisible state machine, as if you wrote an enum in this way to save the current state at each await point:

enum PageTitleFuture<'a> {
    Initial { url: &'a str },
    GetAwaitPoint { url: &'a str },
    TextAwaitPoint { response: trpl::Response },
}

Writing the code to transition between each state by hand would be tedious and error-prone, especially when adding more functionality and more states to the code later. Instead, the Rust compiler creates and manages the state machine data structures for async code automatically. If you’re wondering: yep, the normal borrowing and ownership rules around data structures all apply. Happily, the compiler also handles checking those for us, and has good error messages. We’ll work through a few of those later in the chapter!

Ultimately, something has to execute that state machine. That something is a runtime. (This is why you may sometimes come across references to executors when looking into runtimes: an executor is the part of a runtime responsible for executing the async code.)

Now we can understand why the compiler stopped us from making main itself an async function back in Listing 17-3. If main were an async function, something else would need to manage the state machine for whatever future main returned, but main is the starting point for the program! Instead, we call the trpl::run function in main, which sets up a runtime and runs the future returned by the async block until it returns Ready.

Note: some runtimes provide macros to make it so you can write an async main function. Those macros rewrite async fn main() { ... } to be a normal fn main which does the same thing we did by hand in Listing 17-5: call a function which runs a future to completion the way trpl::run does.

Let’s put these pieces together and see how we can write concurrent code, by calling page_title with two different URLs passed in from the command line and racing them.

Filename: src/main.rs

use trpl::{Either, Html};

fn main() {
    let args: Vec<String> = std::env::args().collect();

    trpl::run(async {
        let title_fut_1 = page_title(&args[1]);
        let title_fut_2 = page_title(&args[2]);

        let (url, maybe_title) =
            match trpl::race(title_fut_1, title_fut_2).await {
                Either::Left(left) => left,
                Either::Right(right) => right,
            };

        println!("{url} returned first");
        match maybe_title {
            Some(title) => println!("Its page title is: '{title}'"),
            None => println!("Its title could not be parsed."),
        }
    })
}

async fn page_title(url: &str) -> (&str, Option<String>) {
    let text = trpl::get(url).await.text().await;
    let title = Html::parse(&text)
        .select_first("title")
        .map(|title| title.inner_html());
    (url, title)
}

Listing 17-5: Calling page_title for two URLs to see which returns first

In Listing 17-5, we begin by calling page_title for each of the user-supplied URLs. We save the futures produced by calling page_title as title_fut_1 and title_fut_2. Remember, these don’t do anything yet, because futures are lazy, and we haven’t yet awaited them. Then we pass the futures to trpl::race, which returns a value to indicate which of the futures passed to it finishes first.

Note: Under the hood, race is built on a more general function, select, which you will encounter more often in real-world Rust code. A select function can do a lot of things that trpl::race function can’t, but it also has some additional complexity that we can skip over for now.

Either future can legitimately “win,” so it doesn’t make sense to return a Result. Instead, race returns a type we haven’t seen before, trpl::Either. The Either type is somewhat similar to a Result, in that it has two cases. Unlike Result, though, there is no notion of success or failure baked into Either. Instead, it uses Left and Right to indicate “one or the other”.

enum Either<A, B> {
    Left(A),
    Right(B),
}

The race function returns Left if the first argument finishes first, with that future’s output, and Right with the second future argument’s output if that one finishes first. This matches the order the arguments appear when calling the function: the first argument is to the left of the second argument.

We also update page_title to return the same URL passed in. That way, if the page which returns first does not have a <title> we can resolve, we can still print a meaningful message. With that information available, we wrap up by updating our println! output to indicate both which URL finished first and what the <title> was for the web page at that URL, if any.

You have built a small working web scraper now! Pick a couple URLs and run the command line tool. You may discover that some sites are reliably faster than others, while in other cases which site “wins” varies from run to run. More importantly, you’ve learned the basics of working with futures, so we can now dig into even more of the things we can do with async.

Concurrency With Async

In this section, we’ll apply async to some of the same concurrency challenges we tackled with threads in Chapter 16. Because we already talked about a lot of the key ideas there, in this section we’ll focus on what’s different between threads and futures.

In many cases, the APIs for working with concurrency using async are very similar to those for using threads. In other cases, they end up being shaped quite differently. Even when the APIs look similar between threads and async, they often have different behavior—and they nearly always have different performance characteristics.

Counting

The first task we tackled in the “Creating a New Thread with spawn” section of Chapter 16 on page XX was counting up on two separate threads. Let’s do the same using async. The trpl crate supplies a spawn_task function which looks very similar to the thread::spawn API, and a sleep function which is an async version of the thread::sleep API. We can use these together to implement the same counting example as with threads, in Listing 17-6.

Filename: src/main.rs

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Listing 17-6: Using spawn_task to count with two

As our starting point, we set up our main function with trpl::run, so that our top-level function can be async.

Note: From this point forward in the chapter, every example will include this exact same wrapping code with trpl::run in main, so we’ll often skip it just as we do with main. Don’t forget to include it in your code!

Then we write two loops within that block, each with a trpl::sleep call in it, which waits for half a second (500 milliseconds) before sending the next message. We put one loop in the body of a trpl::spawn_task and the other in a top-level for loop. We also add an await after the sleep calls.

This does something similar to the thread-based implementation—including the fact that you may see the messages appear in a different order in your own terminal when you run it.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

This version stops as soon as the for loop in the body of the main async block finishes, because the task spawned by spawn_task is shut down when the main function ends. If you want to run all the way to the completion of the task, you will need to use a join handle to wait for the first task to complete. With threads, we used the join method to “block” until the thread was done running. In Listing 17-7, we can use await to do the same thing, because the task handle itself is a future. Its Output type is a Result, so we also unwrap it after awaiting it.

Filename: src/main.rs

let handle = trpl::spawn_task(async {
    for i in 1..10 {
        println!("hi number {i} from the first task!");
        trpl::sleep(Duration::from_millis(500)).await;
    }
});

for i in 1..5 {
    println!("hi number {i} from the second task!");
    trpl::sleep(Duration::from_millis(500)).await;
}

handle.await.unwrap();

Listing 17-7: Using await with a join handle to run a task to completion

This updated version runs till both loops finish.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

So far, it looks like async and threads give us the same basic outcomes, just with different syntax: using await instead of calling join on the join handle, and awaiting the sleep calls.

The bigger difference is that we didn’t need to spawn another operating system thread to do this. In fact, we don’t even need to spawn a task here. Because async blocks compile to anonymous futures, we can put each loop in an async block and have the runtime run them both to completion using the trpl::join function.

In the “Waiting for All Threads to Finish Using join Handles” section of Chapter 16 on page XX, we showed how to use the join method on the JoinHandle type returned when you call std::thread::spawn. The trpl::join function is similar, but for futures. When you give it two futures, it produces a single new future whose output is a tuple with the output of each of the futures you passed in once both complete. Thus, in Listing 17-8, we use trpl::join to wait for both fut1 and fut2 to finish. We do not await fut1 and fut2, but instead the new future produced by trpl::join. We ignore the output, because it’s just a tuple with two unit values in it.

Filename: src/main.rs

let fut1 = async {
    for i in 1..10 {
        println!("hi number {i} from the first task!");
        trpl::sleep(Duration::from_millis(500)).await;
    }
};

let fut2 = async {
    for i in 1..5 {
        println!("hi number {i} from the second task!");
        trpl::sleep(Duration::from_millis(500)).await;
    }
};

trpl::join(fut1, fut2).await;

Listing 17-8: Using trpl::join to await two anonymous futures

When we run this, we see both futures run to completion:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Here, you’ll see the exact same order every time, which is very different from what we saw with threads. That is because the trpl::join function is fair, meaning it checks each future equally often, alternating between them, and never lets one race ahead if the other is ready. With threads, the operating system decides which thread to check and how long to let it run. With async Rust, the runtime decides which task to check. (In practice, the details get complicated because an async runtime might use operating system threads under the hood as part of how it manages concurrency, so guaranteeing fairness can be more work for a runtime—but it’s still possible!) Runtimes don’t have to guarantee fairness for any given operation, and runtimes often offer different APIs to let you choose whether you want fairness or not.

Try some of these different variations on awaiting the futures and see what they do:

  • Remove the async block from around either or both of the loops.
  • Await each async block immediately after defining it.
  • Wrap only the first loop in an async block, and await the resulting future after the body of second loop.

For an extra challenge, see if you can figure out what the output will be in each case before running the code!

Message Passing

Sharing data between futures will also be familiar: we’ll use message passing again, but this with async versions of the types and functions. We’ll take a slightly different path than we did in the “Using Message Passing to Transfer Data Between Threads” section of Chapter 16 on page XX, to illustrate some of the key differences between thread-based and futures-based concurrency. In Listing 17-9, we’ll begin with just a single async block—not spawning a separate task as we spawned a separate thread.

Filename: src/main.rs

let (tx, mut rx) = trpl::channel();

let val = String::from("hi");
tx.send(val).unwrap();

let received = rx.recv().await.unwrap();
println!("Got: {received}");

Listing 17-9: Creating an async channel and assigning the two halves to tx and rx

Here, we use trpl::channel, an async version of the multiple-producer, single-consumer channel API we used with threads back in the “Using Message Passing to Transfer Data Between Threads” section of Chapter 16 on page XX. The async version of the API is only a little different from the thread-based version: it uses a mutable rather than an immutable receiver rx, and its recv method produces a future we need to await rather than producing the value directly. Now we can send messages from the sender to the receiver. Notice that we don’t have to spawn a separate thread or even a task; we merely need to await the rx.recv call.

The synchronous Receiver::recv method in std::mpsc::channel blocks until it receives a message. The trpl::Receiver::recv method does not, because it is async. Instead of blocking, it hands control back to the runtime until either a message is received or the send side of the channel closes. By contrast, we don’t await the send call, because it doesn’t block. It doesn’t need to, because the channel we’re sending it into is unbounded.

Note: Because all of this async code runs in an async block in a trpl::run call, everything within it can avoid blocking. However, the code outside it will block on the run function returning. That is the whole point of the trpl::run function: it lets you choose where to block on some set of async code, and thus where to transition between sync and async code. In most async runtimes, run is actually named block_on for exactly this reason.

Notice two things about this example: First, the message will arrive right away! Second, although we use a future here, there’s no concurrency yet. Everything in the listing happens in sequence, just as it would if there were no futures involved.

Let’s address the first part by sending a series of messages, and sleep in between them, as shown in Listing 17-10:

Filename: src/main.rs

let (tx, mut rx) = trpl::channel();

let vals = vec![
    String::from("hi"),
    String::from("from"),
    String::from("the"),
    String::from("future"),
];

for val in vals {
    tx.send(val).unwrap();
    trpl::sleep(Duration::from_millis(500)).await;
}

while let Some(value) = rx.recv().await {
    println!("received '{value}'");
}

Listing 17-10: Sending and receiving multiple messages over the async channel and sleeping with an await between each message

In addition to sending the messages, we need to receive them. In this case, we could do that manually, by just doing rx.recv().await four times, because we know how many messages are coming in. In the real world, though, we’ll generally be waiting on some unknown number of messages. In that case, we need to keep waiting until we determine that there are no more messages.

In Listing 16-10, we used a for loop to process all the items received from a synchronous channel. However, Rust doesn’t yet have a way to write a for loop over an asynchronous series of items. Instead, we need to use a new kind of loop we haven’t seen before, the while let conditional loop. A while let loop is the loop version of the if let construct we saw back in the “Concise Control Flow with if let” section in Chapter 6 on page XX. The loop will continue executing as long as the pattern it specifies continues to match the value.

The rx.recv call produces a Future, which we await. The runtime will pause the Future until it is ready. Once a message arrives, the future will resolve to Some(message), as many times as a message arrives. When the channel closes, regardless of whether any messages have arrived, the future will instead resolve to None to indicate that there are no more values, and we should stop polling—that is, stop awaiting.

The while let loop pulls all of this together. If the result of calling rx.recv().await is Some(message), we get access to the message and we can use it in the loop body, just as we could with if let. If the result is None, the loop ends. Every time the loop completes, it hits the await point again, so the runtime pauses it again until another message arrives.

The code now successfully sends and receives all of the messages. Unfortunately, there are still a couple problems. For one thing, the messages do not arrive at half-second intervals. They arrive all at once, two seconds (2,000 milliseconds) after we start the program. For another, this program also never exits! Instead, it waits forever for new messages. You will need to shut it down using ctrl-c.

Let’s start by understanding why the messages all come in at once after the full delay, rather than coming in with delays in between each one. Within a given async block, the order that await keywords appear in the code is also the order they happen when running the program.

There’s only one async block in Listing 17-10, so everything in it runs linearly. There’s still no concurrency. All the tx.send calls happen, interspersed with all of the trpl::sleep calls and their associated await points. Only then does the while let loop get to go through any of the await points on the recv calls.

To get the behavior we want, where the sleep delay happens between receiving each message, we need to put the tx and rx operations in their own async blocks. Then the runtime can execute each of them separately using trpl::join, just as in the counting example. Once again, we await the result of calling trpl::join, not the individual futures. If we awaited the individual futures in sequence, we would just end up back in a sequential flow—exactly what we’re trying not to do.

Filename: src/main.rs

let tx_fut = async {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("future"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        trpl::sleep(Duration::from_millis(500)).await;
    }
};

let rx_fut = async {
    while let Some(value) = rx.recv().await {
        println!("received '{value}'");
    }
};

trpl::join(tx_fut, rx_fut).await;

Listing 17-11: Separating send and recv into their own async blocks and awaiting the futures for those blocks

With the updated code in Listing 17-11, the messages get printed at 500-millisecond intervals, rather than all in a rush after two seconds.

The program still never exits, though, because of the way while let loop interacts with trpl::join:

  • The future returned from trpl::join only completes once both futures passed to it have completed.
  • The tx future completes once it finishes sleeping after sending the last message in vals.
  • The rx future won’t complete until the while let loop ends.
  • The while let loop won’t end until awaiting rx.recv produces None.
  • Awaiting rx.recv will only return None once the other end of the channel is closed.
  • The channel will only close if we call rx.close or when the sender side, tx, is dropped.
  • We don’t call rx.close anywhere, and tx won’t be dropped until the outermost async block passed to trpl::run ends.
  • The block can’t end because it is blocked on trpl::join completing, which takes us back to the top of this list!

We could manually close rx by calling rx.close somewhere, but that doesn’t make much sense. Stopping after handling some arbitrary number of messages would make the program shut down, but we could miss messages. We need some other way to make sure that tx gets dropped before the end of the function.

Right now, the async block where we send the messages only borrows tx because sending a message doesn’t require ownership, but if we could move tx into that async block, it would be dropped once that block ends. In the “Capturing References or Moving Ownership” section of Chapter 13 on page XX, we learned how to use the move keyword with closures, and in the “Using move Closures with Threads” section of Chapter 16 on page XX, we saw that we often need to move data into closures when working with threads. The same basic dynamics apply to async blocks, so the move keyword works with async blocks just as it does with closures.

In Listing 17-12, we change the async block for sending messages from a plain async block to an async move block. When we run this version of the code, it shuts down gracefully after the last message is sent and received.

Filename: src/main.rs

let (tx, mut rx) = trpl::channel();

let tx_fut = async move {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("future"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        trpl::sleep(Duration::from_millis(500)).await;
    }
};

let rx_fut = async {
    while let Some(value) = rx.recv().await {
        eprintln!("received '{value}'");
    }
};

trpl::join(tx_fut, rx_fut).await;

Listing 17-12: A working example of sending and receiving messages between futures which correctly shuts down when complete

This async channel is also a multiple-producer channel, so we can call clone on tx if we want to send messages from multiple futures. In Listing 17-13, we clone tx, creating tx1 outside the first async block. We move tx1 into that block just as we did before with tx. Then, later, we move the original tx into a new async block, where we send more messages on a slightly slower delay. We happen to put this new async block after the async block for receiving messages, but it could go before it just as well. The key is the order of the futures are awaited in, not the order they are created in.

Both of the async blocks for sending messages need to be async move blocks, so that both tx and tx1 get dropped when those blocks finish. Otherwise we’ll end up back in the same infinite loop we started out in. Finally, we switch from trpl::join to trpl::join3 to handle the additional future.

Filename: src/main.rs

let (tx, mut rx) = trpl::channel();

let tx1 = tx.clone();
let tx1_fut = async move {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("future"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        trpl::sleep(Duration::from_millis(500)).await;
    }
};

let rx_fut = async {
    while let Some(value) = rx.recv().await {
        println!("received '{value}'");
    }
};

let tx_fut = async move {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        trpl::sleep(Duration::from_millis(1500)).await;
    }
};

trpl::join3(tx1_fut, tx_fut, rx_fut).await;

Listing 17-13: Using multiple producers with async blocks

Now we see all the messages from both sending futures. Because the sending futures use slightly different delays after sending, the messages are also received at those different intervals.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

This is a good start, but it limits us to just a handful of futures: two with join, or three with join3. Let’s see how we might work with more futures.

Working With Any Number of Futures

When we switched from using two futures to three in the previous section, we also had to switch from using join to using join3. It would be annoying to have to call a different function every time we changed the number of futures we wanted to join. Happily, we have a macro form of join to which we can pass an arbitrary number of arguments. It also handles awaiting the futures itself. Thus, we could rewrite the code from Listing 17-13 to use join! instead of join3, as in Listing 17-14:

Filename: src/main.rs

trpl::join!(tx1_fut, tx_fut, rx_fut);

Listing 17-14: Using join! to wait for multiple futures

This is definitely a nice improvement over needing to swap between join and join3 and join4 and so on! However, even this macro form only works when we know the number of futures ahead of time. In real-world Rust, though, pushing futures into a collection and then waiting on some or all the futures in that collection to complete is a common pattern.

To check all the futures in some collection, we’ll need to iterate over and join on all of them. The trpl::join_all function accepts any type which implements the Iterator trait, which we learned about back in “The Iterator Trait and the next Method” section of Chapter 13 on page XX, so it seems like just the ticket. Let’s try putting our futures in a vector, and replace join! with join_all.

let futures = vec![tx1_fut, rx_fut, tx_fut];

trpl::join_all(futures).await;

Listing 17-15: Storing anonymous futures in a vector and calling join_all

Unfortunately, this doesn’t compile. Instead, we get this error:

error[E0308]: mismatched types
  --> src/main.rs:43:37
   |
8  |           let tx1_fut = async move {
   |  _______________________-
9  | |             let vals = vec![
10 | |                 String::from("hi"),
11 | |                 String::from("from"),
...  |
19 | |             }
20 | |         };
   | |_________- the expected `async` block
21 |
22 |           let rx_fut = async {
   |  ______________________-
23 | |             while let Some(value) = rx.recv().await {
24 | |                 println!("received '{value}'");
25 | |             }
26 | |         };
   | |_________- the found `async` block
...
43 |           let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                       ^^^^^^ expected `async` block, found a different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:8:23: 20:10}`
              found `async` block `{async block@src/main.rs:22:22: 26:10}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and and casting it to a trait object

This might be surprising. After all, none of them return anything, so each block produces a Future<Output = ()>. However, Future is a trait, not a concrete type. The concrete types are the individual data structures generated by the compiler for async blocks. You can’t put two different hand-written structs in a Vec, and the same thing applies to the different structs generated by the compiler.

To make this work, we need to use trait objects, just as we did in the “Returning Errors from the run function” section in Chapter 12 on page XX. (We’ll cover trait objects in detail in Chapter 18.) Using trait objects lets us treat each of the anonymous futures produced by these types as the same type, because all of them implement the Future trait.

Note: In the “Using an Enum to Store Multiple Types” section of Chapter 8 on page XX, we discussed another way to include multiple types in a Vec: using an enum to represent each of the different types which can appear in the vector. We can’t do that here, though. For one thing, we have no way to name the different types, because they are anonymous. For another, the reason we reached for a vector and join_all in the first place was to be able to work with a dynamic collection of futures where we don’t know what they will all be until runtime.

We start by wrapping each of the futures in the vec! in a Box::new, as shown in Listing 17-16.

Filename: src/main.rs

let futures =
    vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

trpl::join_all(futures).await;

Listing 17-16: Trying to use Box::new to align the types of the futures in a Vec

Unfortunately, this still doesn’t compile. In fact, we have the same basic error we did before, but we get one for both the second and third Box::new calls, and we also get new errors referring to the Unpin trait. We will come back to the Unpin errors in a moment. First, let’s fix the type errors on the Box::new calls, by explicitly annotating the type of the futures variable:

Filename: src/main.rs

let futures: Vec<Box<dyn Future<Output = ()>>> =
    vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

Listing 17-17: Fixing the rest of the type mismatch errors by using an explicit type declaration

The type we had to write here is a little involved, so let’s walk through it:

  • The innermost type is the future itself. We note explicitly that the output of the future is the unit type () by writing Future<Output = ()>.
  • Then we annotate the trait with dyn to mark it as dynamic.
  • The entire trait reference is wrapped in a Box.
  • Finally, we state explicitly that futures is a Vec containing these items.

That already made a big difference. Now when we run the compiler, we only have the errors mentioning Unpin. Although there are three of them, notice that each is very similar in its contents.

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
   --> src/main.rs:46:24
    |
46  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `join_all`
   --> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
  --> src/main.rs:46:9
   |
46 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
  --> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
  --> src/main.rs:46:33
   |
46 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
  --> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.

That is a lot to digest, so let’s pull it apart. The first part of the message tell us that the first async block (src/main.rs:8:23: 20:10) does not implement the Unpin trait, and suggests using pin! or Box::pin to resolve it. Later in the chapter, we’ll dig into a few more details about Pin and Unpin. For the moment, though, we can just follow the compiler’s advice to get unstuck! In Listing 17-18, we start by updating the type annotation for futures, with a Pin wrapping each Box. Second, we use Box::pin to pin the futures themselves.

Filename: src/main.rs

let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
    vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

Listing 17-18: Using Pin and Box::pin to make the Vec type check

If we compile and run this, we finally get the output we hoped for:

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

Phew!

There’s a bit more we can explore here. For one thing, using Pin<Box<T>> comes with a small amount of extra overhead from putting these futures on the heap with Box—and we’re only doing that to get the types to line up. We don’t actually need the heap allocation, after all: these futures are local to this particular function. As noted above, Pin is itself a wrapper type, so we can get the benefit of having a single type in the Vec—the original reason we reached for Box—without doing a heap allocation. We can use Pin directly with each future, using the std::pin::pin macro.

However, we must still be explicit about the type of the pinned reference; otherwise Rust will still not know to interpret these as dynamic trait objects, which is what we need them to be in the Vec. We therefore pin! each future when we define it, and define futures as a Vec containing pinned mutable references to the dynamic Future type, as in Listing 17-19.

Filename: src/main.rs

let tx1_fut = pin!(async move {
    // --snip--
});

let rx_fut = pin!(async {
    // --snip--
});

let tx_fut = pin!(async move {
    // --snip--
});

let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
    vec![tx1_fut, rx_fut, tx_fut];

Listing 17-19: Using Pin directly with the pin! macro to avoid unnecessary heap allocations

We got this far by ignoring the fact that we might have different Output types. For example, in Listing 17-20, the anonymous future for a implements Future<Output = u32>, the anonymous future for b implements Future<Output = &str>, and the anonymous future for c implements Future<Output = bool>.

Filename: src/main.rs

        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");

Listing 17-20: Three futures with distinct types

We can use trpl::join! to await them, because it allows you to pass in multiple future types and produces a tuple of those types. We cannot use trpl::join_all, because it requires the futures passed in all to have the same type. Remember, that error is what got us started on this adventure with Pin!

This is a fundamental tradeoff: we can either deal with a dynamic number of futures with join_all, as long as they all have the same type, or we can deal with a set number of futures with the join functions or the join! macro, even if they have different types. This is the same as working with any other types in Rust, though. Futures are not special, even though we have some nice syntax for working with them, and that is a good thing.

Racing futures

When we “join” futures with the join family of functions and macros, we require all of them to finish before we move on. Sometimes, though, we only need some future from a set to finish before we move on—kind of similar to racing one future against another.

In Listing 17-21, we once again use trpl::race to run two futures, slow and fast, against each other. Each one prints a message when it starts running, pauses for some amount of time by calling and awaiting sleep, and then prints another message when it finishes. Then we pass both to trpl::race and wait for one of them to finish. (The outcome here won’t be too surprising: fast wins!) Unlike when we used race back in the “Our First Async Program” section of this chapter on page XX, we just ignore the Either instance it returns here, because all of the interesting behavior happens in the body of the async blocks.

Filename: src/main.rs

let slow = async {
    println!("'slow' started.");
    trpl::sleep(Duration::from_millis(100)).await;
    println!("'slow' finished.");
};

let fast = async {
    println!("'fast' started.");
    trpl::sleep(Duration::from_millis(50)).await;
    println!("'fast' finished.");
};

trpl::race(slow, fast).await;

Listing 17-21: Using race to get the result of whichever future finishes first

Notice that if you flip the order of the arguments to race, the order of the “started” messages changes, even though the fast future always completes first. That’s because the implementation of this particular race function is not fair. It always runs the futures passed as arguments in the order they’re passed. Other implementations are fair, and will randomly choose which future to poll first. Regardless of whether the implementation of race we’re using is fair, though, one of the futures will run up to the first await in its body before another task can start.

Recall from the “Our First Async Program” section of this chapter on page XX that at each await point, Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited isn’t ready. The inverse is also true: Rust only pauses async blocks and hands control back to a runtime at an await point. Everything between await points is synchronous.

That means if you do a bunch of work in an async block without an await point, that future will block any other futures from making progress. You may sometimes hear this referred to as one future starving other futures. In some cases, that may not be a big deal. However, if you are doing some kind of expensive setup or long-running work, or if you have a future which will keep doing some particular task indefinitely, you’ll need to think about when and where to hand control back to the runtime.

By the same token, if you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.

But how would you hand control back to the runtime in those cases?

Yielding

Let’s simulate a long-running operation. Listing 17-22 introduces a slow function. It uses std::thread::sleep instead of trpl::sleep so that calling slow will block the current thread for some number of milliseconds. We can use slow to stand in for real-world operations which are both long-running and blocking.

Filename: src/main.rs

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Listing 17-22: Using thread::sleep to simulate slow operations

In Listing 17-23, we use slow to emulate doing this kind of CPU-bound work in a pair of futures. To begin, each future only hands control back to the runtime after carrying out a bunch of slow operations.

Filename: src/main.rs

let a = async {
    println!("'a' started.");
    slow("a", 30);
    slow("a", 10);
    slow("a", 20);
    trpl::sleep(Duration::from_millis(50)).await;
    println!("'a' finished.");
};

let b = async {
    println!("'b' started.");
    slow("b", 75);
    slow("b", 10);
    slow("b", 15);
    slow("b", 350);
    trpl::sleep(Duration::from_millis(50)).await;
    println!("'b' finished.");
};

trpl::race(a, b).await;

Listing 17-23: Using thread::sleep to simulate slow operations

If you run this, you will see this output:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

As with our earlier example, race still finishes as soon as a is done. There’s no interleaving between the two futures, though. The a future does all of its work until the trpl::sleep call is awaited, then the b future does all of its work until its own trpl::sleep call is awaited, and then the a future completes. To allow both futures to make progress between their slow tasks, we need await points so we can hand control back to the runtime. That means we need something we can await!

We can already see this kind of handoff happening in Listing 17-23: if we removed the trpl::sleep at the end of the a future, it would complete without the b future running at all. Maybe we could use the sleep function as a starting point?

Filename: src/main.rs

let one_ms = Duration::from_millis(1);

let a = async {
    println!("'a' started.");
    slow("a", 30);
    trpl::sleep(one_ms).await;
    slow("a", 10);
    trpl::sleep(one_ms).await;
    slow("a", 20);
    trpl::sleep(one_ms).await;
    println!("'a' finished.");
};

let b = async {
    println!("'b' started.");
    slow("b", 75);
    trpl::sleep(one_ms).await;
    slow("b", 10);
    trpl::sleep(one_ms).await;
    slow("b", 15);
    trpl::sleep(one_ms).await;
    slow("b", 35);
    trpl::sleep(one_ms).await;
    println!("'b' finished.");
};

Listing 17-24: Using sleep to let operations switch off making progress

In Listing 17-24, we add trpl::sleep calls with await points between each call to slow. Now the two futures’ work is interleaved:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

The a future still runs for a bit before handing off control to b, because it calls slow before ever calling trpl::sleep, but after that the futures swap back and forth each time one of them hits an await point. In this case, we have done that after every call to slow, but we could break up the work however makes the most sense to us.

We don’t really want to sleep here, though: we want to make progress as fast as we can. We just need to hand back control to the runtime. We can do that directly, using the yield_now function. In Listing 17-25, we replace all those sleep calls with yield_now.

Filename: src/main.rs

let a = async {
    println!("'a' started.");
    slow("a", 30);
    trpl::yield_now().await;
    slow("a", 10);
    trpl::yield_now().await;
    slow("a", 20);
    trpl::yield_now().await;
    println!("'a' finished.");
};

let b = async {
    println!("'b' started.");
    slow("b", 75);
    trpl::yield_now().await;
    slow("b", 10);
    trpl::yield_now().await;
    slow("b", 15);
    trpl::yield_now().await;
    slow("b", 35);
    trpl::yield_now().await;
    println!("'b' finished.");
};

Listing 17-25: Using yield_now to let operations switch off making progress

This is both clearer about the actual intent and can be significantly faster than using sleep, because timers such as the one used by sleep often have limits to how granular they can be. The version of sleep we are using, for example, will always sleep for at least a millisecond, even if we pass it a Duration of one nanosecond. Again, modern computers are fast: they can do a lot in one millisecond!

You can see this for yourself by setting up a little benchmark, such as the one in Listing 17-26. (This isn’t an especially rigorous way to do performance testing, but it suffices to show the difference here.) Here, we skip all the status printing, pass a one-nanosecond Duration to trpl::sleep, and let each future run by itself, with no switching between the futures. Then we run for 1,000 iterations and see how long the future using trpl::sleep takes compared to the future using trpl::yield_now.

Filename: src/main.rs

let one_ns = Duration::from_nanos(1);
let start = Instant::now();
async {
    for _ in 1..1000 {
        trpl::sleep(one_ns).await;
    }
}
.await;
let time = Instant::now() - start;
println!(
    "'sleep' version finished after {} seconds.",
    time.as_secs_f32()
);

let start = Instant::now();
async {
    for _ in 1..1000 {
        trpl::yield_now().await;
    }
}
.await;
let time = Instant::now() - start;
println!(
    "'yield' version finished after {} seconds.",
    time.as_secs_f32()
);

Listing 17-26: Comparing the performance of sleep and yield_now

The version with yield_now is way faster!

This means that async can be useful even for compute-bound tasks, depending on what else your program is doing, because it provides a useful tool for structuring the relationships between different parts of the program. This is a form of cooperative multitasking, where each future has the power to determine when it hands over control via await points. Each future therefore also has the responsibility to avoid blocking for too long. In some Rust-based embedded operating systems, this is the only kind of multitasking!

In real-world code, you won’t usually be alternating function calls with await points on every single line, of course. While yielding control in this way is relatively inexpensive, it’s not free! In many cases, trying to break up a compute-bound task might make it significantly slower, so sometimes it’s better for overall performance to let an operation block briefly. You should always measure to see what your code’s actual performance bottlenecks are. The underlying dynamic is an important one to keep in mind if you are seeing a lot of work happening in serial that you expected to happen concurrently, though!

Building Our Own Async Abstractions

We can also compose futures together to create new patterns. For example, we can build a timeout function with async building blocks we already have. When we’re done, the result will be another building block we could use to build up yet further async abstractions.

Listing 17-27 shows how we would expect this timeout to work with a slow future.

Filename: src/main.rs

let slow = async {
    trpl::sleep(Duration::from_millis(100)).await;
    "I finished!"
};

match timeout(slow, Duration::from_millis(10)).await {
    Ok(message) => println!("Succeeded with '{message}'"),
    Err(duration) => {
        println!("Failed after {} seconds", duration.as_secs())
    }
}

Listing 17-27: Using our imagined timeout to run a slow operation with a time limit

Let’s implement this! To begin, let’s think about the API for timeout:

  • It needs to be an async function itself so we can await it.
  • Its first parameter should be a future to run. We can make it generic to allow it to work with any future.
  • Its second parameter will be the maximum time to wait. If we use a Duration, that will make it easy to pass along to trpl::sleep.
  • It should return a Result. If the future completes successfully, the Result will be Ok with the value produced by the future. If the timeout elapses first, the Result will be Err with the duration that the timeout waited for.

Listing 17-28 shows this declaration.

Filename: src/main.rs

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

Listing 17-28: Defining the signature of timeout

That satisfies our goals for the types. Now let’s think about the behavior we need: we want to race the future passed in against the duration. We can use trpl::sleep to make a timer future from the duration, and use trpl::race to run that timer with the future the caller passes in.

We also know that race is not fair, and polls arguments in the order they are passed. Thus, we pass future_to_try to race first so it gets a chance to complete even if max_time is a very short duration. If future_to_try finishes first, race will return Left with the output from future. If timer finishes first, race will return Right with the timer’s output of ().

In Listing 17-29, we match on the result of awaiting trpl::race. If the future_to_try succeeded and we get a Left(output), we return Ok(output). If the sleep timer elapsed instead and we get a Right(()), we ignore the () with _ and return Err(max_time) instead.

Filename: src/main.rs

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }

Listing 17-29: Defining timeout with race and sleep

With that, we have a working timeout, built out of two other async helpers. If we run our code, it will print the failure mode after the timeout:

Failed after 2 seconds

Because futures compose with other futures, you can build really powerful tools using smaller async building blocks. For example, you can use this same approach to combine timeouts with retries, and in turn use those with things such as network calls—one of the examples from the beginning of the chapter!

In practice, you will usually work directly with async and await, and secondarily with functions and macros such as join, join_all, race, and so on. You’ll only need to reach for pin now and again to use them with those APIs.

We’ve now seen a number of ways to work with multiple futures at the same time. Up next, we’ll look at how we can work with multiple futures in a sequence over time, with streams. Here are a couple more things you might want to consider first, though:

  • We used a Vec with join_all to wait for all of the futures in some group to finish. How could you use a Vec to process a group of futures in sequence, instead? What are the tradeoffs of doing that?

  • Take a look at the futures::stream::FuturesUnordered type from the futures crate. How would using it be different from using a Vec? (Don’t worry about the fact that it is from the stream part of the crate; it works just fine with any collection of futures.)

Streams

So far in this chapter, we have mostly stuck to individual futures. The one big exception was the async channel we used. Recall how we used the receiver for our async channel in the “Message Passing” section of this chapter on page XX. The async recv method produces a sequence of items over time. This is an instance of a much more general pattern, often called a stream.

A sequence of items is something we’ve seen before, when we looked at the Iterator trait in “The Iterator Trait and the next Method” section of Chapter 13 on page XX, but there are two differences between iterators and the async channel receiver. The first difference is the element of time: iterators are synchronous, while the channel receiver is asynchronous. The second difference is the API. When working directly with an Iterator, we call its synchronous next method. With the trpl::Receiver stream in particular, we called an asynchronous recv method instead, but these APIs otherwise feel very similar.

That similarity isn’t a coincidence. A stream is similar to an asynchronous form of iteration. Whereas the trpl::Receiver specifically waits to receive messages, though, the general-purpose stream API is much more general: it provides the next item the way Iterator does, but asynchronously. The similarity between iterators and streams in Rust means we can actually create a stream from any iterator. As with an iterator, we can work with a stream by calling its next method and then awaiting the output, as in Listing 17-30.

Filename: src/main.rs

let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);

while let Some(value) = stream.next().await {
    println!("The value was: {value}");
}

Listing 17-30: Creating a stream from an iterator and printing its values

We start with an array of numbers, which we convert to an iterator and then call map on to double all the values. Then we convert the iterator into a stream using the trpl::stream_from_iter function. Then we loop over the items in the stream as they arrive with the while let loop.

Unfortunately, when we try to run the code, it doesn’t compile. Instead, as we can see in the output, it reports that there is no next method available.

error[E0599]: no method named `next` found for struct `Iter` in the current scope
 --> src/main.rs:8:40
  |
8 |         while let Some(value) = stream.next().await {
  |                                        ^^^^
  |
  = note: the full type name has been written to '~/projects/hello-async/target/debug/deps/async_await-bbd5bb8f6851cb5f.long-type-18426562901668632191.txt'
  = note: consider using `--verbose` to print the full type name to the console
  = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
  |
1 + use futures_util::stream::stream::StreamExt;
  |
1 + use std::iter::Iterator;
  |
1 + use std::str::pattern::Searcher;
  |
1 + use trpl::StreamExt;
  |
help: there is a method `try_next` with a similar name
  |
8 |         while let Some(value) = stream.try_next().await {
  |                                        ~~~~~~~~

For more information about this error, try `rustc --explain E0599`.

As the output suggests, the reason for the compiler error is that we need the right trait in scope to be able to use the next method. Given our discussion so far, you might reasonably expect that to be Stream, but the trait we need here is actually StreamExt. The Ext there is for “extension”: this is a common pattern in the Rust community for extending one trait with another.

Why do we need StreamExt instead of Stream, and what does the Stream trait itself do? Briefly, the answer is that throughout the Rust ecosystem, the Stream trait defines a low-level interface which effectively combines the Iterator and Future traits. The StreamExt trait supplies a higher-level set of APIs on top of Stream, including the next method as well as other utility methods similar to those provided by the Iterator trait. We’ll return to the Stream and StreamExt traits in a bit more detail at the end of the chapter. For now, this is enough to let us keep moving.

The fix to the compiler error is to add a use statement for trpl::StreamExt, as in Listing 17-31.

Filename: src/main.rs

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

Listing 17-31: Successfully using an iterator as the basis for a stream

With all those pieces put together, this code works the way we want! What’s more, now that we have StreamExt in scope, we can use all of its utility methods, just as with iterators. For example, in Listing 17-32, we use the filter method to filter out everything but multiples of three and five.

Filename: src/main.rs

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}

Listing 17-32: Filtering a Stream with the StreamExt::filter method

Of course, this isn’t very interesting. We could do that with normal iterators and without any async at all. So let’s look at some of the other things we can do which are unique to streams.

Composing Streams

Many concepts are naturally represented as streams: items becoming available in a queue, or working with more data than can fit in a computer’s memory by only pulling chunks of it from the file system at a time, or data arriving over the network over time. Because streams are futures, we can use them with any other kind of future, too, and we can combine them in interesting ways. For example, we can batch up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.

Let’s start by building a little stream of messages, as a stand-in for a stream of data we might see from a WebSocket or another real-time communication protocol. In Listing 17-33, we create a function get_messages which returns impl Stream<Item = String>. For its implementation, we create an async channel, loop over the first ten letters of the English alphabet, and send them across the channel.

We also use a new type: ReceiverStream, which converts the rx receiver from the trpl::channel into a Stream with a next method. Back in main, we use a while let loop to print all the messages from the stream.

Filename: src/main.rs

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Listing 17-33: Using the rx receiver as a ReceiverStream

When we run this code, we get exactly the results we would expect:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

We could do this with the regular Receiver API, or even the regular Iterator API, though. Let’s add something that requires streams: adding a timeout which applies to every item in the stream, and a delay on the items we emit.

In Listing 17-34, we start by adding a timeout to the stream with the timeout method, which comes from the StreamExt trait. Then we update the body of the while let loop, because the stream now returns a Result. The Ok variant indicates a message arrived in time; the Err variant indicates that the timeout elapsed before any message arrived. We match on that result and either print the message when we receive it successfully, or print a notice about the timeout. Finally, notice that we pin the messages after applying the timeout to them, because the timeout helper produces a stream which needs to be pinned to be polled.

Filename: src/main.rs

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

Listing 17-34: Using the StreamExt::timeout method to set a time limit on the items in a stream

However, because there are no delays between messages, this timeout does not change the behavior of the program. Let’s add a variable delay to the messages we send. In get_messages, we use the enumerate iterator method with the messages array so that we can get the index of each item we are sending along with the item itself. Then we apply a 100 millisecond delay to even-index items and a 300 millisecond delay to odd-index items, to simulate the different delays we might see from a stream of messages in the real world. Because our timeout is for 200 milliseconds, this should affect half of the messages.

Filename: src/main.rs

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Listing 17-35: Sending messages through tx with an async delay without making get_messages an async function

To sleep between messages in the get_messages function without blocking, we need to use async. However, we can’t make get_messages itself into an async function, because then we’d return a Future<Output = Stream<Item = String>> instead of a Stream<Item = String>>. The caller would have to await get_messages itself to get access to the stream. But remember: everything in a given future happens linearly; concurrency happens between futures. Awaiting get_messages would require it to send all the messages, including sleeping between sending each message, before returning the receiver stream. As a result, the timeout would end up useless. There would be no delays in the stream itself: the delays would all happen before the stream was even available.

Instead, we leave get_messages as a regular function which returns a stream, and spawn a task to handle the async sleep calls.

Note: calling spawn_task in this way works because we already set up our runtime. Calling this particular implementation of spawn_task without first setting up a runtime will cause a panic. Other implementations choose different tradeoffs: they might spawn a new runtime and so avoid the panic but end up with a bit of extra overhead, or simply not provide a standalone way to spawn tasks without reference to a runtime. You should make sure you know what tradeoff your runtime has chosen and write your code accordingly!

Now our code has a much more interesting result! Between every other pair of messages, we see an error reported: Problem: Elapsed(()).

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

The timeout doesn’t prevent the messages from arriving in the end—we still get all of the original messages. This is because our channel is unbounded: it can hold as many messages as we can fit in memory. If the message doesn’t arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message may now have arrived.

You can get different behavior if needed by using other kinds of channels, or other kinds of streams more generally. Let’s see one of those in practice in our final example for this section, by combining a stream of time intervals with this stream of messages.

Merging Streams

First, let’s create another stream, which will emit an item every millisecond if we let it run directly. For simplicity, we can use the sleep function to send a message on a delay, and combine it with the same approach of creating a stream from a channel we used in get_messages. The difference is that this time, we’re going to send back the count of intervals which has elapsed, so the return type will be impl Stream<Item = u32>, and we can call the function get_intervals.

In Listing 17-36, we start by defining a count in the task. (We could define it outside the task, too, but it is clearer to limit the scope of any given variable.) Then we create an infinite loop. Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel. Because this is all wrapped in the task created by spawn_task, all of it will get cleaned up along with the runtime, including the infinite loop.

Filename: src/main.rs

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Listing 17-36: Creating a stream with a counter that will be emitted once every millisecond

This kind of infinite loop, which only ends when the whole runtime gets torn down, is fairly common in async Rust: many programs need to keep running indefinitely. With async, this doesn’t block anything else, as long as there is at least one await point in each iteration through the loop.

Back in our main function’s async block, we start by calling get_intervals. Then we merge the messages and intervals streams with the merge method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without imposing any particular ordering. Finally, we loop over that combined stream instead of over messages (Listing 17-37).

Filename: src/main.rs

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);

Listing 17-37: Attempting to merge streams of messages and intervals

At this point, neither messages nor intervals needs to be pinned or mutable, because both will be combined into the single merged stream. However, this call to merge does not compile! (Neither does the next call in the while let loop, but we’ll come back to that after fixing this.) The two streams have different types. The messages stream has the type Timeout<impl Stream<Item = String>>, where Timeout is the type which implements Stream for a timeout call. Meanwhile, the intervals stream has the type impl Stream<Item = u32>. To merge these two streams, we need to transform one of them to match the other.

In Listing 17-38, we rework the intervals stream, because messages is already in the basic format we want and has to handle timeout errors. First, we can use the map helper method to transform the intervals into a string. Second, we need to match the Timeout from messages. Because we don’t actually want a timeout for intervals, though, we can just create a timeout which is longer than the other durations we are using. Here, we create a 10-second timeout with Duration::from_secs(10). Finally, we need to make stream mutable, so that the while let loop’s next calls can iterate through the stream, and pin it so that it’s safe to do so.

Filename: src/main.rs

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
    .map(|count| format!("Interval: {count}"))
    .timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);

Listing 17-38: Aligning the types of the the intervals stream with the type of the messages stream

That gets us almost to where we need to be. Everything type checks. If you run this, though, there will be two problems. First, it will never stop! You’ll need to stop it with ctrl-c. Second, the messages from the English alphabet will be buried in the midst of all the interval counter messages:

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

Listing 17-39 shows one way to solve these last two problems. First, we use the throttle method on the intervals stream, so that it doesn’t overwhelm the messages stream. Throttling is a way of limiting the rate at which a function will be called—or, in this case, how often the stream will be polled. Once every hundred milliseconds should do, because that is in the same ballpark as how often our messages arrive.

To limit the number of items we will accept from a stream, we can use the take method. We apply it to the merged stream, because we want to limit the final output, not just one stream or the other.

Filename: src/main.rs

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
    .map(|count| format!("Interval: {count}"))
    .throttle(Duration::from_millis(100))
    .timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);

Listing 17-39: Using throttle and take to manage the merged streams

Now when we run the program, it stops after pulling twenty items from the stream, and the intervals don’t overwhelm the messages. We also don’t get Interval: 100 or Interval: 200 or so on, but instead get Interval: 1, Interval: 2, and so on—even though we have a source stream which can produce an event every millisecond. That’s because the throttle call produces a new stream, wrapping the original stream, so that the original stream only gets polled at the throttle rate, not its own “native” rate. We don’t have a bunch of unhandled interval messages we’re choosing to ignore. Instead, we never produce those interval messages in the first place! This is the inherent “laziness” of Rust’s futures at work again, allowing us to choose our performance characteristics.

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

There’s one last thing we need to handle: errors! With both of these channel-based streams, the send calls could fail when the other side of the channel closes—and that’s just a matter of how the runtime executes the futures which make up the stream. Up until now we have ignored this by calling unwrap, but in a well-behaved app, we should explicitly handle the error, at minimum by ending the loop so we don’t try to send any more messages! Listing 17-40 shows a simple error strategy: print the issue and then break from the loops. As usual, the correct way to handle a message send error will vary—just make sure you have a strategy.

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}

Listing 17-40: Handling errors and shutting down the loops

Now that we’ve seen a bunch of async in practice, let’s take a step back and dig into a few of the details of how Future, Stream, and the other key traits which Rust uses to make async work.

Digging Into the Traits for Async

Throughout the chapter, we’ve used the Future, Pin, Unpin, Stream, and StreamExt traits in various ways. So far, though, we’ve avoided digging too far into the details of how they work or how they fit together. Much of the time when writing Rust day to day, this is fine. Sometimes, though, you’ll hit situations where understanding a few more of these details matters. In this section, we’ll dig down enough further to help with those situations—while still leaving the really deep dive for other documentation!

Future

Back in the “Futures and the Async Syntax” section of this chapter on page XX, we noted that Future is a trait. Let’s start by taking a closer look at how it works. Here is how Rust defines a Future:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

That trait definition includes a bunch of new types and also some syntax we haven’t seen before, so let’s walk through the definition piece by piece.

First, Future’s associated type Output says what the future resolves to. This is analogous to the Item associated type for the Iterator trait. Second, Future also has the poll method, which takes a special Pin reference for its self parameter and a mutable reference to a Context type, and returns a Poll<Self::Output>. We’ll talk a little more about Pin and Context later in the section. For now, let’s focus on what the method returns, the Poll type:

enum Poll<T> {
    Ready(T),
    Pending,
}

This Poll type is similar to an Option: it has one variant which has a value (Ready(T)), and one which does not (Pending). It means something quite different, though! The Pending variant indicates that the future still has work to do, so the caller will need to check again later. The Ready variant indicates that the Future has finished its work and the T value is available.

Note: With most futures, the caller should not call poll again after the future has returned Ready. Many futures will panic if polled again after becoming ready! Futures which are safe to poll again will say so explicitly in their documentation. This is similar to how Iterator::next behaves!

Under the hood, when you see code which uses await, Rust compiles that to code which calls poll. If you look back at Listing 17-4, where we printed out the page title for a single URL once it resolved, Rust compiles it into something kind of (although not exactly) like this:

match page_title(url).poll() {
    Ready(page_title) => match page_title {
        Some(title) => println!("The title for {url} was {title}"),
        None => println!("{url} had no title"),
    }
    Pending => {
        // But what goes here?
    }
}

What should we do when the Future is still Pending? We need some way to try again… and again, and again, until the future is finally ready. In other words, a loop:

let mut page_title_fut = page_title(url);
loop {
    match page_title_fut.poll() {
        Ready(value) => match page_title {
            Some(title) => println!("The title for {url} was {title}"),
            None => println!("{url} had no title"),
        }
        Pending => {
            // continue
        }
    }
}

If Rust compiled it to exactly that code, though, every await would be blocking—exactly the opposite of what we were going for! Instead, Rust needs makes sure that the loop can hand off control to something which can pause work on this future and work on other futures and check this one again later. That “something” is an async runtime, and this scheduling and coordination work is one of the main jobs for a runtime.

Recall our description (in the “Counting” section of this chapter on page XX) of waiting on rx.recv. The recv call returns a Future, and awaiting it polls it. In our initial discussion, we noted that a runtime will pause the future until it’s ready with either Some(message) or None when the channel closes. With our deeper understanding of Future in place, and specifically Future::poll, we can see how that works. The runtime knows the future isn’t ready when it returns Poll::Pending. Conversely, the runtime knows the future is ready and advances it when poll returns Poll::Ready(Some(message)) or Poll::Ready(None).

The exact details of how a runtime does that are more than we will cover in even this deep dive section. The key here is to see the basic mechanic of futures: a runtime polls each future it is responsible for, putting it back to sleep when it is not yet ready.

Pinning and the Pin and Unpin Traits

When we introduced the idea of pinning while working on Listing 17-17, we ran into a very gnarly error message. Here is the relevant part of it again:

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
  --> src/main.rs:46:33
   |
46 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
  --> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.

When we read this error message carefully, it not only tells us that we need to pin the values, but also tells us why pinning is required. The trpl::join_all function returns a struct called JoinAll. That struct is generic over a type F, which is constrained to implement the Future trait. Directly awaiting a future with await pins the future implicitly. That’s why we don’t need to use pin! everywhere we want to await futures.

However, we’re not directly awaiting a future here. Instead, we construct a new future, JoinAll, by passing a collection of futures to the join_all function. The signature for join_all produces requires that the type of the items in the collection all implement the Future trait, and Box<T> only implements Future if the T that it wraps is a future which implements the Unpin trait.

That’s a lot! But we can understand it, if we dive a little further into how the Future type actually works, in particular around pinning.

Let’s look again at the definition of Future:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

The cx parameter and its Context type is the key to how a runtime actually knows when to check any given future, while still being lazy. The details of how that works are beyond the scope of this chapter, though: you generally only need to worry about it when writing a custom Future implementation.

Instead, we’ll focus on the type for self. This is the first time we’ve seen a method where self has a type annotation. A type annotation for self is similar to type annotations for other function parameters, with two key differences. First, when we specify the type of self in this way, we’re telling Rust what type self must be to call this method. Second, a type annotation on self can’t be just any type. It’s only allowed to be the type on which the method is implemented, a reference or smart pointer to that type, or a Pin wrapping a reference to that type. We’ll see more on this syntax in Chapter 18. For now, it’s enough to know that if we want to poll a future (to check whether it is Pending or Ready(Output)), we need a mutable reference to the type, which is wrapped in a Pin.

Pin is a wrapper type. In some ways, it’s similar to the Box, Rc, and other smart pointer types we saw in Chapter 15, which also wrap other types. Unlike those, however, Pin only works with pointer types such as references (& and &mut) and smart pointers (Box, Rc, and so on). To be precise, Pin works with types which implement the Deref or DerefMut traits, which we covered in the “Treating Smart Pointers Like Regular References with the Deref Trait” section of Chapter 15 on page XX. You can think of this restriction as equivalent to only working with pointers, though, because implementing Deref or DerefMut means your type behaves similarly to a pointer type. Pin is also not a pointer itself, and it doesn’t have any behavior of its own the way Rc and Arc do with ref counting. It’s purely a tool the compiler can use to uphold the relevant guarantees, by wrapping pointers in the type.

Recalling that await is implemented in terms of calls to poll, this starts to explain the error message we saw above—but that was in terms of Unpin, not Pin. So what exactly are Pin and Unpin, how do they relate, and why does Future need self to be in a Pin type to call poll?

In the “”ur First Async Program” section of this chapter on page XX, we described how a series of await points in a future get compiled into a state machine—and noted how the compiler helps make sure that state machine follows all of Rust’s normal rules around safety, including borrowing and ownership. To make that work, Rust looks at what data is needed between each await point and the next await point or the end of the async block. It then creates a corresponding variant in the state machine it creates. Each variant gets the access it needs to the data that will be used in that section of the source code, whether by taking ownership of that data or by getting a mutable or immutable reference to it.

So far so good: if we get anything wrong about the ownership or references in a given async block, the borrow checker will tell us. When we want to move around the future that corresponds to that block—like moving it into a Vec to pass to join_all, the way we did back in—things get trickier.

When we move a future—whether by pushing into a data structure to use as an iterator with join_all, or returning them from a function—that actually means moving the state machine Rust creates for us. And unlike most other types in Rust, the futures Rust creates for async blocks can end up with references to themselves in the fields of any given variant, as in Figure 17-4 (a simplified illustration to help you get a feel for the idea, rather than digging into what are often fairly complicated details).

Figure 17-4: A self-referential data type.

By default, though, any object which has a reference to itself is unsafe to move, because references always point to the actual memory address of the thing they refer to. If you move the data structure itself, those internal references will be left pointing to the old location. However, that memory location is now invalid. For one thing, its value will not be updated when you make changes to the data structure. For another—and more importantly!—the computer is now free to reuse that memory for other things! You could end up reading completely unrelated data later.

Figure 17-5: The unsafe result of moving a self-referential data type.

In principle, the Rust compiler could try to update every reference to an object every time it gets moved. That would potentially be a lot of performance overhead, especially given there can be a whole web of references that need updating. On the other hand, if we could make sure the data structure in question doesn’t move in memory, we don’t have to update any references. This is exactly what Rust’s borrow checker requires: you can’t move an item which has any active references to it using safe code.

Pin builds on that to give us the exact guarantee we need. When we pin a value by wrapping a pointer to that value in Pin, it can no longer move. Thus, if you have Pin<Box<SomeType>>, you actually pin the SomeType value, not the Box pointer. Figure 17-6 illustrates this:

Figure 17-6: Pinning a Box which points to a self-referential future type.

In fact, the Box pointer can still move around freely. Remember: we care about making sure the data ultimately being referenced stays in its place. If a pointer moves around, but the data it points to is in the same place, as in Figure 17-7, there’s no potential problem. (How you would do this with a Pin wrapping a Box is more than we’ll get into in this particular discussion, but it would make for a good exercise! If you look at the docs for the types as well as the std::pin module, you might be able to work out how you would do that.) The key is that the self-referential type itself cannot move, because it is still pinned.

Figure 17-7: Moving a Box which points to a self-referential future type.

However, most types are perfectly safe to move around, even if they happen to be behind a Pin pointer. We only need to think about pinning when items have internal references. Primitive values such as numbers and booleans don’t have any internal references, so they’re obviously safe. Neither do most types you normally work with in Rust. A Vec, for example, doesn’t have any internal references it needs to keep up to date this way, so you can move it around without worrying. If you have a Pin<Vec<String>>, you’d have to do everything via the safe but restrictive APIs provided by Pin, even though a Vec<String> is always safe to move if there are no other references to it. We need a way to tell the compiler that it’s actually just fine to move items around in cases such as these. For that, we have Unpin.

Unpin is a marker trait, similar to the Send and Sync traits we saw in the “Extensible Concurrency with the Sync and Send Traits” section of Chapter 16 on page XX. Recall that marker traits have no functionality of their own. They exist only to tell the compiler that it’s safe to use the type which implements a given trait in a particular context. Unpin informs the compiler that a given type does not need to uphold any particular guarantees about whether the value in question can be moved.

Just as with Send and Sync, the compiler implements Unpin automatically for all types where it can prove it is safe. The special case, again similar to Send and Sync, is the case where Unpin is not implemented for a type. The notation for this is impl !Unpin for SomeType, where SomeType is the name of a type which does need to uphold those guarantees to be safe whenever a pointer to that type it is used in a Pin.

In other words, there are two things to keep in mind about the relationship between Pin and Unpin. First, Unpin is the “normal” case, and !Unpin is the special case. Second, whether a type implements Unpin or !Unpin only matters when using a pinned pointer to that type like Pin<&mut SomeType>.

To make that concrete, think about a String: it has a length and the Unicode characters which make it up. We can wrap a String in Pin, as seen in Figure 17-8. However, String automatically implements Unpin, the same as most other types in Rust.

Figure 17-8: Pinning a String, with a dotted line indicating that the String implements the Unpin trait, so it is not pinned.

As a result, we can do things which would be illegal if String implemented !Unpin instead, such as replace one string with another at the exact same location in memory as in Figure 17-9. This doesn’t violate the Pin contract, because String has no internal references that make it unsafe to move around! That is precisely why it implements Unpin rather than !Unpin.

Figure 17-9: Replacing the String with an entirely different String in memory.

Now we know enough to understand the errors reported for that join_all call from back in Listing 17-17. We originally tried to move the futures produced by async blocks into a Vec<Box<dyn Future<Output = ()>>>, but as we’ve seen, those futures may have internal references, so they don’t automatically implement Unpin. Once we pin them, we can pass the resulting Pin type into the Vec, confident that the underlying data in the futures will not be moved.

Pin and Unpin are mostly important for building lower-level libraries, or when you’re building a runtime itself, rather than for day to day Rust code. When you see these traits in error messages, though, now you’ll have a better idea of how to fix the code!

Note: This combination of Pin and Unpin allows a whole class of complex types to be safe in Rust which are otherwise difficult to implement because they’re self-referential. Types which require Pin show up most commonly in async Rust today, but you might—very rarely!—see it in other contexts, too.

The specifics of how Pin and Unpin work, and the rules they’re required to uphold, are covered extensively in the API documentation for std::pin, so if you’d like to understand them more deeply, that’s a great place to start.

If you want to understand how things work “under the hood” in even more detail, the official Asynchronous Programming in Rust book available at https://rust-lang.github.io/async-book/ has you covered:

  • Chapter 2: Under the Hood: Executing Futures and Tasks
  • Chapter 4: Pinning

The Stream Trait

Now that we have a deeper grasp on the Future, Pin, and Unpin traits, we can turn our attention to the Stream trait. As described in the section introducing streams, streams are similar to asynchronous iterators. Unlike Iterator and Future, there is no definition of a Stream trait in the standard library as of the time of writing, but there is a very common definition from the futures crate used throughout the ecosystem.

Let’s review the definitions of the Iterator and Future traits, so we can build up to how a Stream trait that merges them together might look. From Iterator, we have the idea of a sequence: its next method provides an Option<Self::Item>. From Future, we have the idea of readiness over time: its poll method provides a Poll<Self::Output>. To represent a sequence of items which become ready over time, we define a Stream trait which puts those features together:

use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}

The Stream trait defines an associated type Item for the type of the items produced by the stream. This is similar to Iterator: there may be zero to many of these, and unlike Future, where there is always a single Output (even if it’s the unit type ()).

Stream also defines a method to get those items. We call it poll_next, to make it clear that it polls in the same way Future::poll does and produces a sequence of items in the same way Iterator::next does. Its return type combines Poll with Option. The outer type is Poll, because it has to be checked for readiness, just as a future does. The inner type is Option, because it needs to signal whether there are more messages, just as an iterator does.

Something very similar to this will likely end up standardized as part of Rust’s standard library. In the meantime, it’s part of the toolkit of most runtimes, so you can rely on it, and everything we cover below should generally apply!

In the example we saw in the section on streaming, though, we didn’t use poll_next or Stream, but instead used next and StreamExt. We could work directly in terms of the poll_next API by hand-writing our own Stream state machines, of course, just as we could work with futures directly via their poll method. Using await is much nicer, though, so the StreamExt trait supplies the next method so we can do just that.

trait StreamExt: Stream {
    async fn next(&mut self) -> Option<Self::Item>
    where
        Self: Unpin;

    // other methods...
}

Note: The actual definition we used earlier in the chapter looks slightly different than this, because it supports versions of Rust which did not yet support using async functions in traits. As a result, it looks like this:

fn next(&mut self) -> Next<'_, Self> where Self: Unpin;

That Next type is a struct which implements Future and gives a way to name the lifetime of the reference to self with Next<'_, Self>, so that await can work with this method!

The StreamExt trait is also the home of all the interesting methods available to use with streams. StreamExt is automatically implemented for every type which implements Stream, but these traits are defined separately so that the community can iterate on the foundational trait distinctly from the convenience APIs.

In the version of StreamExt used in the trpl crate, the trait not only defines the next method, it also supplies an implementation of next, which correctly handles the details of calling Stream::poll_next. This means that even when you need to write your own streaming data type, you only have to implement Stream, and then anyone who uses your data type can use StreamExt and its methods with it automatically.

That’s all we’re going to cover for the lower-level details on these traits. To wrap up, let’s consider how futures (including streams), tasks, and threads all fit together!

Futures, Tasks, and Threads

As we saw in the “Using Threads to Run Code Simultaneously” section of Chapter 16 on page XX, threads provide one approach to concurrency. We’ve seen another approach to concurrency in this chapter, using async with futures and streams. You might be wondering why you would choose one or the other. The answer is: it depends! And in many cases, the choice isn’t threads or async but rather threads and async.

Many operating systems have supplied threading-based concurrency models for decades now, and many programming languages have support for them as a result. However, they are not without their tradeoffs. On many operating systems, they use a fair bit of memory for each thread, and they come with some overhead for starting up and shutting down. Threads are also only an option when your operating system and hardware support them! Unlike mainstream desktop and mobile computers, some embedded systems don’t have an OS at all, so they also don’t have threads!

The async model provides a different—and ultimately complementary—set of tradeoffs. In the async model, concurrent operations don’t require their own threads. Instead, they can run on tasks, as when we used trpl::spawn_task to kick off work from a synchronous function throughout the streams section. A task is similar to a thread, but instead of being managed by the operating system, it’s managed by library-level code: the runtime.

In the previous section, we saw that we could build a Stream by using an async channel and spawning an async task which we could call from synchronous code. We could do the exact same thing with a thread! In Listing 17-40, we used trpl::spawn_task and trpl::sleep. In Listing 17-41, we replace those with the thread::spawn and thread::sleep APIs from the standard library in the get_intervals function.

Filename: src/main.rs

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}

Listing 17-41: Using the std::thread APIs instead of the async trpl APIs for the get_intervals function

If you run this, the output is identical. And notice how little changes here from the perspective of the calling code! What’s more, even though one of our functions spawned an async task on the runtime and the other spawned an OS thread, the resulting streams were unaffected by the differences.

Despite the similarities, these two approaches behave very differently, although we might have a hard time measuring it in this very simple example. We could spawn millions of async tasks on any modern personal computer. If we tried to do that with threads, we would literally run out of memory!

However, there’s a reason these APIs are so similar. Threads act as a boundary for sets of synchronous operations; concurrency is possible between threads. Tasks act as a boundary for sets of asynchronous operations; concurrency is possible both between and within tasks, because a task can switch between futures in its body. Finally, futures are Rust’s most granular unit of concurrency, and each future may represent a tree of other futures. The runtime—specifically, its executor—manages tasks, and tasks manage futures. In that regard, tasks are similar to lightweight, runtime-managed threads with added capabilities that come from being managed by a runtime instead of by the operating system.

This doesn’t mean that async tasks are always better than threads, any more than that threads are always better than tasks.

Concurrency with threads is in some ways a simpler programming model than concurrency with async. That can be a strength or a weakness. Threads are somewhat “fire and forget,” they have no native equivalent to a future, so they simply run to completion, without interruption except by the operating system itself. That is, they have no built-in support for intra-task concurrency the way futures do. Threads in Rust also have no mechanisms for cancellation—a subject we haven’t covered in depth in this chapter, but which is implicit in the fact that whenever we ended a future, its state got cleaned up correctly.

These limitations also make threads harder to compose than futures. It’s much more difficult, for example, to use threads to build helpers such as the timeout we built in the “Building Our Own Async Abstractions” section of this chapter on page XX or the throttle method we used with streams in the “Composing Streams” section of this chapter on page XX. The fact that futures are richer data structures means they can be composed together more naturally, as we have seen.

Tasks then give additional control over futures, allowing you to choose where and how to group the futures. And it turns out that threads and tasks often work very well together, because tasks can (at least in some runtimes) be moved around between threads. We haven’t mentioned it up until now, but under the hood the Runtime we have been using, including the spawn_blocking and spawn_task functions, is multithreaded by default! Many runtimes use an approach called work stealing to transparently move tasks around between threads based on the current utilization of the threads, with the aim of improving the overall performance of the system. To build that actually requires threads and tasks, and therefore futures.

As a default way of thinking about which to use when:

  • If the work is very parallelizable, such as processing a bunch of data where each part can be processed separately, threads are a better choice.
  • If the work is very concurrent, such as handling messages from a bunch of different sources which may come in a different intervals or different rates, async is a better choice.

And if you need some mix of parallelism and concurrency, you don’t have to choose between threads and async. You can use them together freely, letting each one serve the part it is best at. For example, Listing 17-42 shows a fairly common example of this kind of mix in real-world Rust code.

Filename: src/main.rs

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}

Listing 17-42: Sending messages with blocking code in a thread and awaiting the messages in an async block

We begin by creating an async channel. Then we spawn a thread which takes ownership of the sender side of the channel. Within the thread, we send the numbers 1 through 10, and sleep for a second in between each. Finally, we run a future created with an async block passed to trpl::run just as we have throughout the chapter. In that future, we await those messages, just as in the other message-passing examples we have seen.

To return to the examples we opened the chapter with: you could imagine running a set of video encoding tasks using a dedicated thread, because video encoding is compute bound, but notifying the UI that those operations are done with an async channel. Examples of this kind of mix abound!

Summary

This isn’t the last you’ll see of concurrency in this book: the project in Chapter 21 will use the concepts in this chapter in a more realistic situation than the smaller examples discussed here—and compare more directly what it looks like to solve these kinds of problems with threading vs. with tasks and futures.

Whether with threads, with futures and tasks, or with the combination of them all, Rust gives you the tools you need to write safe, fast, concurrent code—whether for a high-throughput web server or an embedded operating system.

Next, we’ll talk about idiomatic ways to model problems and structure solutions as your Rust programs get bigger. In addition, we’ll discuss how Rust’s idioms relate to those you might be familiar with from object-oriented programming.