blob: 225b0a8a42d60294a35de0c183e7ad9c35c5796c [file] [log] [blame] [view] [edit]
[TOC]
## Async and Await
Many operations we ask the computer to do can take a while to finish. For
example, if you used a video editor to create a video of a family celebration,
exporting it could take anywhere from minutes to hours. Similarly, downloading a
video shared by someone in your family might take a long time. It would be nice
if we could do something else while we are waiting for those long-running
processes to complete.
The video export will use as much CPU and GPU power as it can. If you only had
one CPU core, and your operating system never paused that export until it
completed, you couldnt do anything else on your computer while it was running.
That would be a pretty frustrating experience, though. Instead, your computers
operating system canand does!—invisibly interrupt the export often enough to
let you get other work done along the way.
The file download is different. It does not take up very much CPU time. Instead,
the CPU needs to wait on data to arrive from the network. While you can start
reading the data once some of it is present, it might take a while for the rest
to show up. Even once the data is all present, a video can be quite large, so it
might take some time to load it all. Maybe it only takes a second or twobut
thats a very long time for a modern processor, which can do billions of
operations every second. It would be nice to be able to put the CPU to use for
other work while waiting for the network call to finishso, again, your
operating system will invisibly interrupt your program so other things can
happen while the network operation is still ongoing.
> Note: The video export is the kind of operation which is often described as
> CPU-bound or compute-bound”. Its limited by the speed of the computers
> ability to process data within the *CPU* or *GPU*, and how much of that speed
> it can use. The video download is the kind of operation which is often
> described as IO-bound,” because its limited by the speed of the computers
> *input and output*. It can only go as fast as the data can be sent across the
> network.
In both of these examples, the operating systems invisible interrupts provide a
form of concurrency. That concurrency only happens at the level of a whole
program, though: the operating system interrupts one program to let other
programs get work done. In many cases, because we understand our programs at a
much more granular level than the operating system does, we can spot lots of
opportunities for concurrency that the operating system cannot see.
For example, if were building a tool to manage file downloads, we should be
able to write our program in such a way that starting one download does not lock
up the UI, and users should be able to start multiple downloads at the same
time. Many operating system APIs for interacting with the network are
*blocking*, though. That is, these APIs block the programs progress until the
data that they are processing is completely ready.
> Note: This is how *most* function calls work, if you think about it! However,
> we normally reserve the term blocking for function calls which interact with
> files, the network, or other resources on the computer, because those are the
> places where an individual program would benefit from the operation being
> *non*-blocking.
We could avoid blocking our main thread by spawning a dedicated thread to
download each file. However, we would eventually find that the overhead of those
threads was a problem. It would also be nicer if the call were not blocking in
the first place. Last but not least, it would be better if we could write in the
same direct style we use in blocking code. Something similar to this:
```
let data = fetch_data_from(url).await;
println!("{data}");
```
That is exactly what Rusts async abstraction gives us. Before we see how this
works in practice, though, we need to take a short detour into the differences
between parallelism and concurrency.
### Parallelism and Concurrency
In the Fearless Concurrency chapter on page XX, we treated parallelism and
concurrency as mostly interchangeable. Now we need to distinguish between them
more precisely, because the differences will show up as we start working.
Consider the different ways a team could split up work on a software project. We
could assign a single individual multiple tasks, or we could assign one task per
team member, or we could do a mix of both approaches.
When an individual works on several different tasks before any of them is
complete, this is *concurrency*. Maybe you have two different projects checked
out on your computer, and when you get bored or stuck on one project, you switch
to the other. Youre just one person, so you cant make progress on both tasks
at the exact same timebut you can multi-task, making progress on multiple
tasks by switching between them.
<img alt="Concurrent work flow" src="img/trpl17-01.svg" />
Figure 17-1: A concurrent workflow, switching between Task A and Task B.
When you agree to split up a group of tasks between the people on the team, with
each person taking one task and working on it alone, this is *parallelism*. Each
person on the team can make progress at the exact same time.
<img alt="Concurrent work flow" src="img/trpl17-02.svg" />
Figure 17-2: A parallel workflow, where work happens on Task A and Task B
independently.
With both of these situations, you might have to coordinate between different
tasks. Maybe you *thought* the task that one person was working on was totally
independent from everyone elses work, but it actually needs something finished
by another person on the team. Some of the work could be done in parallel, but
some of it was actually *serial*: it could only happen in a series, one thing
after the other, as in Figure 17-3.
<img alt="Concurrent work flow" src="img/trpl17-03.svg" class="center" />
Figure 17-3: A partially parallel workflow, where work happens on Task A and Task B independently until task A3 is blocked on the results of task B3.
Likewise, you might realize that one of your own tasks depends on another of
your tasks. Now your concurrent work has also become serial.
Parallelism and concurrency can intersect with each other, too. If you learn
that a colleague is stuck until you finish one of your tasks, youll probably
focus all your efforts on that task to unblock your colleague. You and your
coworker are no longer able to work in parallel, and youre also no longer able
to work concurrently on your own tasks.
The same basic dynamics come into play with software and hardware. On a machine
with a single CPU core, the CPU can only do one operation at a time, but it can
still work concurrently. Using tools such as threads, processes, and async, the
computer can pause one activity and switch to others before eventually cycling
back to that first activity again. On a machine with multiple CPU cores, it can
also do work in parallel. One core can be doing one thing while another core
does something completely unrelated, and those actually happen at the same
time.
When working with async in Rust, were always dealing with concurrency.
Depending on the hardware, the operating system, and the async runtime we are
usingmore on async runtimes shortly!—that concurrency may also use parallelism
under the hood.
Now, lets dive into how async programming in Rust actually works! In the rest
of this chapter, we will:
* see how to use Rusts `async` and `await` syntax
* explore how to use the async model to solve some of the same challenges we
looked at in Chapter 16
* look at how multithreading and async provide complementary solutions, which
you can even use together in many cases
## Futures and the Async Syntax
The key elements of asynchronous programming in Rust are *futures* and Rusts
`async` and `await` keywords.
A *future* is a value which may not be ready now, but will become ready at some
point in the future. (This same concept shows up in many languages, sometimes
under other names such as task or promise”.) Rust provides a `Future` trait
as a building block so different async operations can be implemented with
different data structures, but with a common interface. In Rust, we say that
types which implement the `Future` trait are futures. Each type which
implements `Future` holds its own information about the progress that has been
made and what ready means.
The `async` keyword can be applied to blocks and functions to specify that they
can be interrupted and resumed. Within an async block or async function, you can
use the `await` keyword to wait for a future to become ready, called *awaiting a
future*. Each place you await a future within an async block or function is a
place that async block or function may get paused and resumed. The process of
checking with a future to see if its value is available yet is called *polling*.
Some other languages also use `async` and `await` keywords for async
programming. If youre familiar with those languages, you may notice some
significant differences in how Rust does things, including how it handles the
syntax. Thats for good reason, as well see!
Most of the time when writing async Rust, we use the `async` and `await`
keywords. Rust compiles them into equivalent code using the `Future` trait, much
as it compiles `for` loops into equivalent code using the `Iterator` trait.
Because Rust provides the `Future` trait, though, you can also implement it for
your own data types when you need to. Many of the functions well see
throughout this chapter return types with their own implementations of `Future`.
Well return to the definition of the trait at the end of the chapter and dig
into more of how it works, but this is enough detail to keep us moving forward.
That may all feel a bit abstract. Lets write our first async program: a little
web scraper. Well pass in two URLs from the command line, fetch both of them
concurrently, and return the result of whichever one finishes first. This
example will have a fair bit of new syntax, but dont worry. Well explain
everything you need to know as we go.
### Our First Async Program
To keep this chapter focused on learning async, rather than juggling parts of
the ecosystem, we have created the `trpl` crate (`trpl` is short for The Rust
Programming Language”). It re-exports all the types, traits, and functions
youll need, primarily from the `futures` and `tokio` crates, available on
*https://crates.io*.
* The `futures` crate is an official home for Rust experimentation for async
code, and is actually where the `Future` type was originally designed.
* Tokio is the most widely used async runtime in Rust today, especially (but
not only!) for web applications. There are other great runtimes out there,
and they may be more suitable for your purposes. We use Tokio under the hood
for `trpl` because its well-tested and widely used.
In some cases, `trpl` also renames or wraps the original APIs to let us stay
focused on the details relevant to this chapter. If you want to understand what
the crate does, we encourage you to check out its source code at
*https://github.com/rust-lang/book/tree/main/packages/trpl*.
Youll be able to see what crate each re-export comes from, and weve left
extensive comments explaining what the crate does.
Create a new binary project named `hello-async` and add the `trpl` crate as a
dependency:
```
$ cargo new hello-async
$ cd hello-async
$ cargo add trpl
```
Now we can use the various pieces provided by `trpl` to write our first async
program. Well build a little command line tool which fetches two web pages,
pulls the `<title>` element from each, and prints out the title of whichever
finishes that whole process first.
Lets start by writing a function that takes one page URL as a parameter, makes
a request to it, and returns the text of the title element:
Filename: src/main.rs
```
use trpl::Html;
async fn page_title(url: &str) -> Option<String> {
let response = trpl::get(url).await;
let response_text = response.text().await;
Html::parse(&response_text)
.select_first("title")
.map(|title_element| title_element.inner_html())
}
```
Listing 17-1: Defining an async function to get the title element from an HTML page
In Listing 17-1, we define a function named `page_title`, and we mark it with
the `async` keyword. Then we use the `trpl::get` function to fetch whatever URL
is passed in, and, and we await the response by using the `await` keyword. Then
we get the text of the response by calling its `text` method and once again
awaiting it with the `await` keyword. Both of these steps are asynchronous. For
`get`, we need to wait for the server to send back the first part of its
response, which will include HTTP headers, cookies, and so on. That part of the
response can be delivered separately from the body of the request. Especially if
the body is very large, it can take some time for it all to arrive. Thus, we
have to wait for the *entirety* of the response to arrive, so the `text` method
is also async.
We have to explicitly await both of these futures, because futures in Rust are
*lazy*: they dont do anything until you ask them to with `await`. (In fact,
Rust will show a compiler warning if you dont use a future.) This should
remind you of our discussion of iterators back in the Processing a Series of
Items with Iterators section of Chapter 13 on page XX. Iterators do nothing
unless you call their `next` methodwhether directly, or using `for` loops or
methods such as `map` which use `next` under the hood. With futures, the same
basic idea applies: they do nothing unless you explicitly ask them to. This
laziness allows Rust to avoid running async code until its actually needed.
> Note: This is different from the behavior we saw when using `thread::spawn` in
> the Creating a New Thread with `spawn` section of Chapter 16 on page XX,
> where the closure we passed to another thread started running immediately.
> Its also different from how many other languages approach async! But its
> important for Rust. Well see why that is later.
Once we have `response_text`, we can then parse it into an instance of the
`Html` type using `Html::parse`. Instead of a raw string, we now have a data
type we can use to work with the HTML as a richer data structure. In particular,
we can use the `select_first` method to find the first instance of a given CSS
selector. By passing the string `"title"`, well get the first `<title>`
element in the document, if there is one. Because there may not be any matching
element, `select_first` returns an `Option<ElementRef>`. Finally, we use the
`Option::map` method, which lets us work with the item in the `Option` if its
present, and do nothing if it isnt. (We could also use a `match` expression
here, but `map` is more idiomatic.) In the body of the function we supply to
`map`, we call `inner_html` on the `title_element` to get its content, which is
a `String`. When all is said and done, we have an `Option<String>`.
Notice that Rusts `await` keyword goes after the expression youre awaiting,
not before it. That is, its a *postfix keyword*. This may be different from
what you might be used to if you have used async in other languages. Rust chose
this because it makes chains of methods much nicer to work with. As a result, we
can change the body of `page_url_for` to chain the `trpl::get` and `text`
function calls together with `await` between them, as shown in Listing 17-2:
Filename: src/main.rs
```
let response_text = trpl::get(url).await.text().await;
```
Listing 17-2: Chaining with the `await` keyword
With that, we have successfully written our first async function! Before we add
some code in `main` to call it, lets talk a little more about what weve
written and what it means.
When Rust sees a block marked with the `async` keyword, it compiles it into a
unique, anonymous data type which implements the `Future` trait. When Rust sees
a function marked with `async`, it compiles it into a non-async function whose
body is an async block. An async functions return type is the type of the of
the anonymous data type the compiler creates for that async block.
Thus, writing `async fn` is equivalent to writing a function which returns a
*future* of the return type. When the compiler sees a function definition such
as the `async fn page_title` in Listing 17-1, its equivalent to a non-async
function defined like this:
```
use std::future::Future;
use trpl::Html;
fn page_title(url: &str) -> impl Future<Output = Option<String>> + '_ {
async move {
let text = trpl::get(url).await.text().await;
Html::parse(&text)
.select_first("title")
.map(|title| title.inner_html())
}
}
```
Lets walk through each part of the transformed version:
* It uses the `impl Trait` syntax we discussed back in the Traits as
Parameters section in Chapter 10 on page XX.
* The returned trait is a `Future`, with an associated type of `Output`. Notice
that the `Output` type is `Option<String>`, which is the same as the the
original return type from the `async fn` version of `page_title`.
* All of the code called in the body of the original function is wrapped in an
`async move` block. Remember that blocks are expressions. This whole block is
the expression returned from the function.
* This async block produces a value with the type `Option<String>`, as described
above. That value matches the `Output` type in the return type. This is just
like other blocks you have seen.
* The new function body is an `async move` block because of how it uses the
`url` parameter. (Well talk about `async` vs. `async move` much more later
in the chapter.)
* The new version of the function has a kind of lifetime we havent seen before
in the output type: `'_`. Because the function returns a `Future` which refers
to a referencein this case, the reference from the `url` parameterwe need to
tell Rust that we mean for that reference to be included. We dont have to
name the lifetime here, because Rust is smart enough to know there is only one
reference which could be involved, but we *do* have to be explicit that the
resulting `Future` is bound by that lifetime.
Now we can call `page_title` in `main`. To start, well just get the title for
a single page. In Listing 17-3, we follow the same pattern we used for getting
command line arguments back in the Accepting Command Line Arguments section
of Chapter 12 on page XX. Then we pass the first URL `page_title`, and await
the result. Because the value produced by the future is an `Option<String>`, we
use a `match` expression to print different messages to account for whether the
page had a `<title>`.
Filename: src/main.rs
```
async fn main() {
let args: Vec<String> = std::env::args().collect();
let url = &args[1];
match page_title(url).await {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
}
```
Listing 17-3: Calling the `page_title` function from `main` with a
user-supplied argument
Unfortunately, this doesnt compile. The only place we can use the `await`
keyword is in async functions or blocks, and Rust wont let us mark the
special `main` function as `async`.
```
error[E0752]: `main` function is not allowed to be `async`
--> src/main.rs:6:1
|
6 | async fn main() {
| ^^^^^^^^^^^^^^^ `main` function is not allowed to be `async`
```
The reason `main` cant be marked `async` is that async code needs a *runtime*:
a Rust crate which manages the details of executing asynchronous code. A
programs `main` function can *initialize* a runtime, but its not a runtime
*itself*. (Well see more about why this is a bit later.) Every Rust program
that executes async code has at least one place where it sets up a runtime and
executes the futures.
Most languages which support async bundle a runtime with the language. Rust does
not. Instead, there are many different async runtimes available, each of which
makes different tradeoffs suitable to the use case they target. For example, a
high-throughput web server with many CPU cores and a large amount of RAM has
very different different needs than a microcontroller with a single core, a
small amount of RAM, and no ability to do heap allocations. The crates which
provide those runtimes also often supply async versions of common functionality
such as file or network I/O.
Here, and throughout the rest of this chapter, well use the `run` function
from the `trpl` crate, which takes a future as an argument and runs it to
completion. Behind the scenes, calling `run` sets up a runtime to use to run the
future passed in. Once the future completes, `run` returns whatever value the
future produced.
We could pass the future returned by `page_title` directly to `run`. Once it
completed, we would be able to match on the resulting `Option<String>`, the way
we tried to do in Listing 17-3. However, for most of the examples in the chapter
(and most async code in the real world!), well be doing more than just one
async function call, so instead well pass an `async` block and explicitly
await the result of calling `page_title`, as in Listing 17-4.
Filename: src/main.rs
```
fn main() {
let args: Vec<String> = std::env::args().collect();
trpl::run(async {
let url = &args[1];
match page_title(url).await {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
})
}
```
Listing 17-4: Awaiting an async block with `trpl::run`
When we run this, we get the behavior we might have expected initially:
```
$ cargo run "http://www.rust-lang.org"
The title for http://www.rust-lang.org was
Rust Programming Language
```
Phew: we finally have some working async code! This now compiles, and we can run
it. Before we add code to race two sites against each other, lets briefly turn
our attention back to how futures work.
Each *await point*—that is, every place where the code uses the `await`
keywordrepresents a place where control gets handed back to the runtime. To
make that work, Rust needs to keep track of the state involved in the async
block, so that the runtime can kick off some other work and then come back when
its ready to try advancing this one again. This is an invisible state machine,
as if you wrote an enum in this way to save the current state at each `await`
point:
```
enum PageTitleFuture<'a> {
Initial { url: &'a str },
GetAwaitPoint { url: &'a str },
TextAwaitPoint { response: trpl::Response },
}
```
Writing the code to transition between each state by hand would be tedious and
error-prone, especially when adding more functionality and more states to the
code later. Instead, the Rust compiler creates and manages the state machine
data structures for async code automatically. If youre wondering: yep, the
normal borrowing and ownership rules around data structures all apply. Happily,
the compiler also handles checking those for us, and has good error messages.
Well work through a few of those later in the chapter!
Ultimately, something has to execute that state machine. That something is a
runtime. (This is why you may sometimes come across references to *executors*
when looking into runtimes: an executor is the part of a runtime responsible for
executing the async code.)
Now we can understand why the compiler stopped us from making `main` itself an
async function back in Listing 17-3. If `main` were an async function, something
else would need to manage the state machine for whatever future `main` returned,
but `main` is the starting point for the program! Instead, we call the
`trpl::run` function in `main`, which sets up a runtime and runs the future
returned by the `async` block until it returns `Ready`.
> Note: some runtimes provide macros to make it so you *can* write an async
> main function. Those macros rewrite `async fn main() { ... }` to be a normal
> `fn main` which does the same thing we did by hand in Listing 17-5: call a
> function which runs a future to completion the way `trpl::run` does.
Lets put these pieces together and see how we can write concurrent code, by
calling `page_title` with two different URLs passed in from the command line
and racing them.
Filename: src/main.rs
```
use trpl::{Either, Html};
fn main() {
let args: Vec<String> = std::env::args().collect();
trpl::run(async {
let title_fut_1 = page_title(&args[1]);
let title_fut_2 = page_title(&args[2]);
let (url, maybe_title) =
match trpl::race(title_fut_1, title_fut_2).await {
Either::Left(left) => left,
Either::Right(right) => right,
};
println!("{url} returned first");
match maybe_title {
Some(title) => println!("Its page title is: '{title}'"),
None => println!("Its title could not be parsed."),
}
})
}
async fn page_title(url: &str) -> (&str, Option<String>) {
let text = trpl::get(url).await.text().await;
let title = Html::parse(&text)
.select_first("title")
.map(|title| title.inner_html());
(url, title)
}
```
Listing 17-5: Calling `page_title` for two URLs to see which returns first
In Listing 17-5, we begin by calling `page_title` for each of the user-supplied
URLs. We save the futures produced by calling `page_title` as `title_fut_1` and
`title_fut_2`. Remember, these dont do anything yet, because futures are lazy,
and we havent yet awaited them. Then we pass the futures to `trpl::race`,
which returns a value to indicate which of the futures passed to it finishes
first.
> Note: Under the hood, `race` is built on a more general function, `select`,
> which you will encounter more often in real-world Rust code. A `select`
> function can do a lot of things that `trpl::race` function cant, but it also
> has some additional complexity that we can skip over for now.
Either future can legitimately win,” so it doesnt make sense to return a
`Result`. Instead, `race` returns a type we havent seen before,
`trpl::Either`. The `Either` type is somewhat similar to a `Result`, in that it
has two cases. Unlike `Result`, though, there is no notion of success or
failure baked into `Either`. Instead, it uses `Left` and `Right` to indicate
one or the other”.
```
enum Either<A, B> {
Left(A),
Right(B),
}
```
The `race` function returns `Left` if the first argument finishes first, with
that futures output, and `Right` with the second future arguments output if
*that* one finishes first. This matches the order the arguments appear when
calling the function: the first argument is to the left of the second argument.
We also update `page_title` to return the same URL passed in. That way, if
the page which returns first does not have a `<title>` we can resolve, we can
still print a meaningful message. With that information available, we wrap up by
updating our `println!` output to indicate both which URL finished first and
what the `<title>` was for the web page at that URL, if any.
You have built a small working web scraper now! Pick a couple URLs and run the
command line tool. You may discover that some sites are reliably faster than
others, while in other cases which site wins varies from run to run. More
importantly, youve learned the basics of working with futures, so we can now
dig into even more of the things we can do with async.
## Concurrency With Async
In this section, well apply async to some of the same concurrency challenges
we tackled with threads in Chapter 16. Because we already talked about a lot of
the key ideas there, in this section well focus on whats different between
threads and futures.
In many cases, the APIs for working with concurrency using async are very
similar to those for using threads. In other cases, they end up being shaped
quite differently. Even when the APIs *look* similar between threads and async,
they often have different behaviorand they nearly always have different
performance characteristics.
### Counting
The first task we tackled in the Creating a New Thread with spawn section of
Chapter 16 on page XX was counting up on two separate threads. Lets do the
same using async. The `trpl` crate supplies a `spawn_task` function which looks
very similar to the `thread::spawn` API, and a `sleep` function which is an
async version of the `thread::sleep` API. We can use these together to
implement the same counting example as with threads, in Listing 17-6.
Filename: src/main.rs
```
use std::time::Duration;
fn main() {
trpl::run(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
```
Listing 17-6: Using `spawn_task` to count with two
As our starting point, we set up our `main` function with `trpl::run`, so
that our top-level function can be async.
> Note: From this point forward in the chapter, every example will include this
> exact same wrapping code with `trpl::run` in `main`, so well often skip it
> just as we do with `main`. Dont forget to include it in your code!
Then we write two loops within that block, each with a `trpl::sleep` call in it,
which waits for half a second (500 milliseconds) before sending the next
message. We put one loop in the body of a `trpl::spawn_task` and the other in a
top-level `for` loop. We also add an `await` after the `sleep` calls.
This does something similar to the thread-based implementationincluding the
fact that you may see the messages appear in a different order in your own
terminal when you run it.
```
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
```
This version stops as soon as the for loop in the body of the main async block
finishes, because the task spawned by `spawn_task` is shut down when the main
function ends. If you want to run all the way to the completion of the task, you
will need to use a join handle to wait for the first task to complete. With
threads, we used the `join` method to block until the thread was done running.
In Listing 17-7, we can use `await` to do the same thing, because the task
handle itself is a future. Its `Output` type is a `Result`, so we also unwrap it
after awaiting it.
Filename: src/main.rs
```
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
```
Listing 17-7: Using `await` with a join handle to run a task to completion
This updated version runs till *both* loops finish.
```
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
```
So far, it looks like async and threads give us the same basic outcomes, just
with different syntax: using `await` instead of calling `join` on the join
handle, and awaiting the `sleep` calls.
The bigger difference is that we didnt need to spawn another operating system
thread to do this. In fact, we dont even need to spawn a task here. Because
async blocks compile to anonymous futures, we can put each loop in an async
block and have the runtime run them both to completion using the `trpl::join`
function.
In the Waiting for All Threads to Finish Using `join` Handles section of
Chapter 16 on page XX, we showed how to use the `join` method on the
`JoinHandle` type returned when you call `std::thread::spawn`. The `trpl::join`
function is similar, but for futures. When you give it two futures, it produces
a single new future whose output is a tuple with the output of each of the
futures you passed in once *both* complete. Thus, in Listing 17-8, we use
`trpl::join` to wait for both `fut1` and `fut2` to finish. We do *not* await
`fut1` and `fut2`, but instead the new future produced by `trpl::join`. We
ignore the output, because its just a tuple with two unit values in it.
Filename: src/main.rs
```
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
```
Listing 17-8: Using `trpl::join` to await two anonymous futures
When we run this, we see both futures run to completion:
```
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
```
Here, youll see the exact same order every time, which is very different from
what we saw with threads. That is because the `trpl::join` function is *fair*,
meaning it checks each future equally often, alternating between them, and never
lets one race ahead if the other is ready. With threads, the operating system
decides which thread to check and how long to let it run. With async Rust, the
runtime decides which task to check. (In practice, the details get complicated
because an async runtime might use operating system threads under the hood as
part of how it manages concurrency, so guaranteeing fairness can be more work
for a runtimebut its still possible!) Runtimes dont have to guarantee
fairness for any given operation, and runtimes often offer different APIs to let
you choose whether you want fairness or not.
Try some of these different variations on awaiting the futures and see what they
do:
* Remove the async block from around either or both of the loops.
* Await each async block immediately after defining it.
* Wrap only the first loop in an async block, and await the resulting future
after the body of second loop.
For an extra challenge, see if you can figure out what the output will be in
each case *before* running the code!
### Message Passing
Sharing data between futures will also be familiar: well use message passing
again, but this with async versions of the types and functions. Well take a
slightly different path than we did in the Using Message Passing to Transfer
Data Between Threads section of Chapter 16 on page XX, to illustrate some of
the key differences between thread-based and futures-based concurrency. In
Listing 17-9, well begin with just a single async block—*not* spawning a
separate task as we spawned a separate thread.
Filename: src/main.rs
```
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("Got: {received}");
```
Listing 17-9: Creating an async channel and assigning the two halves to `tx`
and `rx`
Here, we use `trpl::channel`, an async version of the multiple-producer,
single-consumer channel API we used with threads back in the Using Message
Passing to Transfer Data Between Threads section of Chapter 16 on page XX. The
async version of the API is only a little different from the thread-based
version: it uses a mutable rather than an immutable receiver `rx`, and its
`recv` method produces a future we need to await rather than producing the
value directly. Now we can send messages from the sender to the receiver.
Notice that we dont have to spawn a separate thread or even a task; we merely
need to await the `rx.recv` call.
The synchronous `Receiver::recv` method in `std::mpsc::channel` blocks until
it receives a message. The `trpl::Receiver::recv` method does not, because it
is async. Instead of blocking, it hands control back to the runtime until either
a message is received or the send side of the channel closes. By contrast, we
dont await the `send` call, because it doesnt block. It doesnt need to,
because the channel were sending it into is unbounded.
> Note: Because all of this async code runs in an async block in a `trpl::run`
> call, everything within it can avoid blocking. However, the code *outside* it
> will block on the `run` function returning. That is the whole point of the
> `trpl::run` function: it lets you *choose* where to block on some set of async
> code, and thus where to transition between sync and async code. In most async
> runtimes, `run` is actually named `block_on` for exactly this reason.
Notice two things about this example: First, the message will arrive right away!
Second, although we use a future here, theres no concurrency yet. Everything
in the listing happens in sequence, just as it would if there were no futures
involved.
Lets address the first part by sending a series of messages, and sleep in
between them, as shown in Listing 17-10:
Filename: src/main.rs
```
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
```
Listing 17-10: Sending and receiving multiple messages over the async channel
and sleeping with an `await` between each message
In addition to sending the messages, we need to receive them. In this case, we
could do that manually, by just doing `rx.recv().await` four times, because we
know how many messages are coming in. In the real world, though, well
generally be waiting on some *unknown* number of messages. In that case, we need
to keep waiting until we determine that there are no more messages.
In Listing 16-10, we used a `for` loop to process all the items received from a
synchronous channel. However, Rust doesnt yet have a way to write a `for` loop
over an *asynchronous* series of items. Instead, we need to use a new kind of
loop we havent seen before, the `while let` conditional loop. A `while let`
loop is the loop version of the `if let` construct we saw back in the Concise
Control Flow with `if let` section in Chapter 6 on page XX. The loop will
continue executing as long as the pattern it specifies continues to match the
value.
The `rx.recv` call produces a `Future`, which we await. The runtime will pause
the `Future` until it is ready. Once a message arrives, the future will resolve
to `Some(message)`, as many times as a message arrives. When the channel closes,
regardless of whether *any* messages have arrived, the future will instead
resolve to `None` to indicate that there are no more values, and we should stop
pollingthat is, stop awaiting.
The `while let` loop pulls all of this together. If the result of calling
`rx.recv().await` is `Some(message)`, we get access to the message and we can
use it in the loop body, just as we could with `if let`. If the result is
`None`, the loop ends. Every time the loop completes, it hits the await point
again, so the runtime pauses it again until another message arrives.
The code now successfully sends and receives all of the messages. Unfortunately,
there are still a couple problems. For one thing, the messages do not arrive at
half-second intervals. They arrive all at once, two seconds (2,000 milliseconds)
after we start the program. For another, this program also never exits! Instead,
it waits forever for new messages. You will need to shut it down using <span
class="keystroke">ctrl-c</span>.
Lets start by understanding why the messages all come in at once after the full
delay, rather than coming in with delays in between each one. Within a given
async block, the order that `await` keywords appear in the code is also the
order they happen when running the program.
Theres only one async block in Listing 17-10, so everything in it runs
linearly. Theres still no concurrency. All the `tx.send` calls happen,
interspersed with all of the `trpl::sleep` calls and their associated await
points. Only then does the `while let` loop get to go through any of the `await`
points on the `recv` calls.
To get the behavior we want, where the sleep delay happens between receiving
each message, we need to put the `tx` and `rx` operations in their own async
blocks. Then the runtime can execute each of them separately using `trpl::join`,
just as in the counting example. Once again, we await the result of calling
`trpl::join`, not the individual futures. If we awaited the individual futures
in sequence, we would just end up back in a sequential flowexactly what were
trying *not* to do.
Filename: src/main.rs
```
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
```
Listing 17-11: Separating `send` and `recv` into their own `async` blocks and
awaiting the futures for those blocks
With the updated code in Listing 17-11, the messages get printed at
500-millisecond intervals, rather than all in a rush after two seconds.
The program still never exits, though, because of the way `while let` loop
interacts with `trpl::join`:
* The future returned from `trpl::join` only completes once *both* futures
passed to it have completed.
* The `tx` future completes once it finishes sleeping after sending the last
message in `vals`.
* The `rx` future wont complete until the `while let` loop ends.
* The `while let` loop wont end until awaiting `rx.recv` produces `None`.
* Awaiting `rx.recv` will only return `None` once the other end of the channel
is closed.
* The channel will only close if we call `rx.close` or when the sender side,
`tx`, is dropped.
* We dont call `rx.close` anywhere, and `tx` wont be dropped until the
outermost async block passed to `trpl::run` ends.
* The block cant end because it is blocked on `trpl::join` completing, which
takes us back to the top of this list!
We could manually close `rx` by calling `rx.close` somewhere, but that doesnt
make much sense. Stopping after handling some arbitrary number of messages would
make the program shut down, but we could miss messages. We need some other way
to make sure that `tx` gets dropped *before* the end of the function.
Right now, the async block where we send the messages only borrows `tx` because
sending a message doesnt require ownership, but if we could move `tx` into
that async block, it would be dropped once that block ends. In the Capturing
References or Moving Ownership section of Chapter 13 on page XX, we learned
how to use the `move` keyword with closures, and in the Using `move` Closures
with Threads section of Chapter 16 on page XX, we saw that we often need to
move data into closures when working with threads. The same basic dynamics
apply to async blocks, so the `move` keyword works with async blocks just as it
does with closures.
In Listing 17-12, we change the async block for sending messages from a plain
`async` block to an `async move` block. When we run *this* version of the code,
it shuts down gracefully after the last message is sent and received.
Filename: src/main.rs
```
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
eprintln!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
```
Listing 17-12: A working example of sending and receiving messages between
futures which correctly shuts down when complete
This async channel is also a multiple-producer channel, so we can call `clone`
on `tx` if we want to send messages from multiple futures. In Listing 17-13, we
clone `tx`, creating `tx1` outside the first async block. We move `tx1` into
that block just as we did before with `tx`. Then, later, we move the original
`tx` into a *new* async block, where we send more messages on a slightly slower
delay. We happen to put this new async block after the async block for receiving
messages, but it could go before it just as well. The key is the order of the
futures are awaited in, not the order they are created in.
Both of the async blocks for sending messages need to be `async move` blocks, so
that both `tx` and `tx1` get dropped when those blocks finish. Otherwise well
end up back in the same infinite loop we started out in. Finally, we switch from
`trpl::join` to `trpl::join3` to handle the additional future.
Filename: src/main.rs
```
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join3(tx1_fut, tx_fut, rx_fut).await;
```
Listing 17-13: Using multiple producers with async blocks
Now we see all the messages from both sending futures. Because the sending
futures use slightly different delays after sending, the messages are also
received at those different intervals.
```
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
```
This is a good start, but it limits us to just a handful of futures: two with
`join`, or three with `join3`. Lets see how we might work with more futures.
## Working With Any Number of Futures
When we switched from using two futures to three in the previous section, we
also had to switch from using `join` to using `join3`. It would be annoying to
have to call a different function every time we changed the number of futures we
wanted to join. Happily, we have a macro form of `join` to which we can pass an
arbitrary number of arguments. It also handles awaiting the futures itself.
Thus, we could rewrite the code from Listing 17-13 to use `join!` instead of
`join3`, as in Listing 17-14:
Filename: src/main.rs
```
trpl::join!(tx1_fut, tx_fut, rx_fut);
```
Listing 17-14: Using `join!` to wait for multiple futures
This is definitely a nice improvement over needing to swap between `join` and
`join3` and `join4` and so on! However, even this macro form only works when we
know the number of futures ahead of time. In real-world Rust, though, pushing
futures into a collection and then waiting on some or all the futures in that
collection to complete is a common pattern.
To check all the futures in some collection, well need to iterate over and
join on *all* of them. The `trpl::join_all` function accepts any type which
implements the `Iterator` trait, which we learned about back in The Iterator
Trait and the next Method section of Chapter 13 on page XX, so it seems like
just the ticket. Lets try putting our futures in a vector, and replace `join!`
with `join_all`.
```
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
```
Listing 17-15: Storing anonymous futures in a vector and calling `join_all`
Unfortunately, this doesnt compile. Instead, we get this error:
```
error[E0308]: mismatched types
--> src/main.rs:43:37
|
8 | let tx1_fut = async move {
| _______________________-
9 | | let vals = vec![
10 | | String::from("hi"),
11 | | String::from("from"),
... |
19 | | }
20 | | };
| |_________- the expected `async` block
21 |
22 | let rx_fut = async {
| ______________________-
23 | | while let Some(value) = rx.recv().await {
24 | | println!("received '{value}'");
25 | | }
26 | | };
| |_________- the found `async` block
...
43 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:8:23: 20:10}`
found `async` block `{async block@src/main.rs:22:22: 26:10}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and and casting it to a trait object
```
This might be surprising. After all, none of them return anything, so each
block produces a `Future<Output = ()>`. However, `Future` is a trait, not a
concrete type. The concrete types are the individual data structures generated
by the compiler for async blocks. You cant put two different hand-written
structs in a `Vec`, and the same thing applies to the different structs
generated by the compiler.
To make this work, we need to use *trait objects*, just as we did in the
Returning Errors from the run function section in Chapter 12 on page XX.
(Well cover trait objects in detail in Chapter 18.) Using trait objects lets
us treat each of the anonymous futures produced by these types as the same
type, because all of them implement the `Future` trait.
> Note: In the Using an Enum to Store Multiple Types section of Chapter 8 on
> page XX, we discussed another way to include multiple types in a `Vec`: using
> an enum to represent each of the different types which can appear in the
> vector. We cant do that here, though. For one thing, we have no way to name
> the different types, because they are anonymous. For another, the reason we
> reached for a vector and `join_all` in the first place was to be able to work
> with a dynamic collection of futures where we dont know what they will all
> be until runtime.
We start by wrapping each of the futures in the `vec!` in a `Box::new`, as shown
in Listing 17-16.
Filename: src/main.rs
```
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
```
Listing 17-16: Trying to use `Box::new` to align the types of the futures in a
`Vec`
Unfortunately, this still doesnt compile. In fact, we have the same basic
error we did before, but we get one for both the second and third `Box::new`
calls, and we also get new errors referring to the `Unpin` trait. We will come
back to the `Unpin` errors in a moment. First, lets fix the type errors on the
`Box::new` calls, by explicitly annotating the type of the `futures` variable:
Filename: src/main.rs
```
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
```
Listing 17-17: Fixing the rest of the type mismatch errors by using an explicit
type declaration
The type we had to write here is a little involved, so lets walk through it:
* The innermost type is the future itself. We note explicitly that the output of
the future is the unit type `()` by writing `Future<Output = ()>`.
* Then we annotate the trait with `dyn` to mark it as dynamic.
* The entire trait reference is wrapped in a `Box`.
* Finally, we state explicitly that `futures` is a `Vec` containing these items.
That already made a big difference. Now when we run the compiler, we only have
the errors mentioning `Unpin`. Although there are three of them, notice that
each is very similar in its contents.
```
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:24
|
46 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `join_all`
--> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:9
|
46 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:33
|
46 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
```
That is a *lot* to digest, so lets pull it apart. The first part of the message
tell us that the first async block (`src/main.rs:8:23: 20:10`) does not
implement the `Unpin` trait, and suggests using `pin!` or `Box::pin` to resolve
it. Later in the chapter, well dig into a few more details about `Pin` and
`Unpin`. For the moment, though, we can just follow the compilers advice to get
unstuck! In Listing 17-18, we start by updating the type annotation for
`futures`, with a `Pin` wrapping each `Box`. Second, we use `Box::pin` to pin
the futures themselves.
Filename: src/main.rs
```
let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];
```
Listing 17-18: Using `Pin` and `Box::pin` to make the `Vec` type check
If we compile and run this, we finally get the output we hoped for:
```
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
```
Phew!
Theres a bit more we can explore here. For one thing, using `Pin<Box<T>>`
comes with a small amount of extra overhead from putting these futures on the
heap with `Box`and were only doing that to get the types to line up. We dont
actually *need* the heap allocation, after all: these futures are local to this
particular function. As noted above, `Pin` is itself a wrapper type, so we can
get the benefit of having a single type in the `Vec`the original reason we
reached for `Box`without doing a heap allocation. We can use `Pin` directly
with each future, using the `std::pin::pin` macro.
However, we must still be explicit about the type of the pinned reference;
otherwise Rust will still not know to interpret these as dynamic trait objects,
which is what we need them to be in the `Vec`. We therefore `pin!` each future
when we define it, and define `futures` as a `Vec` containing pinned mutable
references to the dynamic `Future` type, as in Listing 17-19.
Filename: src/main.rs
```
let tx1_fut = pin!(async move {
// --snip--
});
let rx_fut = pin!(async {
// --snip--
});
let tx_fut = pin!(async move {
// --snip--
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, rx_fut, tx_fut];
```
Listing 17-19: Using `Pin` directly with the `pin!` macro to avoid unnecessary
heap allocations
We got this far by ignoring the fact that we might have different `Output`
types. For example, in Listing 17-20, the anonymous future for `a` implements
`Future<Output = u32>`, the anonymous future for `b` implements
`Future<Output = &str>`, and the anonymous future for `c` implements
`Future<Output = bool>`.
Filename: src/main.rs
```
let a = async { 1u32 };
let b = async { "Hello!" };
let c = async { true };
let (a_result, b_result, c_result) = trpl::join!(a, b, c);
println!("{a_result}, {b_result}, {c_result}");
```
Listing 17-20: Three futures with distinct types
We can use `trpl::join!` to await them, because it allows you to pass in
multiple future types and produces a tuple of those types. We *cannot* use
`trpl::join_all`, because it requires the futures passed in all to have the same
type. Remember, that error is what got us started on this adventure with `Pin`!
This is a fundamental tradeoff: we can either deal with a dynamic number of
futures with `join_all`, as long as they all have the same type, or we can deal
with a set number of futures with the `join` functions or the `join!` macro,
even if they have different types. This is the same as working with any other
types in Rust, though. Futures are not special, even though we have some nice
syntax for working with them, and that is a good thing.
### Racing futures
When we join futures with the `join` family of functions and macros, we
require *all* of them to finish before we move on. Sometimes, though, we only
need *some* future from a set to finish before we move onkind of similar to
racing one future against another.
In Listing 17-21, we once again use `trpl::race` to run two futures, `slow` and
`fast`, against each other. Each one prints a message when it starts running,
pauses for some amount of time by calling and awaiting `sleep`, and then prints
another message when it finishes. Then we pass both to `trpl::race` and wait for
one of them to finish. (The outcome here wont be too surprising: `fast` wins!)
Unlike when we used `race` back in the Our First Async Program section of this
chapter on page XX, we just ignore the `Either` instance it returns here,
because all of the interesting behavior happens in the body of the async blocks.
Filename: src/main.rs
```
let slow = async {
println!("'slow' started.");
trpl::sleep(Duration::from_millis(100)).await;
println!("'slow' finished.");
};
let fast = async {
println!("'fast' started.");
trpl::sleep(Duration::from_millis(50)).await;
println!("'fast' finished.");
};
trpl::race(slow, fast).await;
```
Listing 17-21: Using `race` to get the result of whichever future finishes first
Notice that if you flip the order of the arguments to `race`, the order of the
started messages changes, even though the `fast` future always completes
first. Thats because the implementation of this particular `race` function is
not fair. It always runs the futures passed as arguments in the order theyre
passed. Other implementations *are* fair, and will randomly choose which future
to poll first. Regardless of whether the implementation of race were using is
fair, though, *one* of the futures will run up to the first `await` in its body
before another task can start.
Recall from the Our First Async Program section of this chapter on page XX
that at each await point, Rust gives a runtime a chance to pause the task and
switch to another one if the future being awaited isnt ready. The inverse is
also true: Rust *only* pauses async blocks and hands control back to a runtime
at an await point. Everything between await points is synchronous.
That means if you do a bunch of work in an async block without an await point,
that future will block any other futures from making progress. You may sometimes
hear this referred to as one future *starving* other futures. In some cases,
that may not be a big deal. However, if you are doing some kind of expensive
setup or long-running work, or if you have a future which will keep doing some
particular task indefinitely, youll need to think about when and where to
hand control back to the runtime.
By the same token, if you have long-running blocking operations, async can be a
useful tool for providing ways for different parts of the program to relate to
each other.
But *how* would you hand control back to the runtime in those cases?
### Yielding
Lets simulate a long-running operation. Listing 17-22 introduces a `slow`
function. It uses `std::thread::sleep` instead of `trpl::sleep` so that calling
`slow` will block the current thread for some number of milliseconds. We can use
`slow` to stand in for real-world operations which are both long-running and
blocking.
Filename: src/main.rs
```
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
```
Listing 17-22: Using `thread::sleep` to simulate slow operations
In Listing 17-23, we use `slow` to emulate doing this kind of CPU-bound work in
a pair of futures. To begin, each future only hands control back to the runtime
*after* carrying out a bunch of slow operations.
Filename: src/main.rs
```
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::race(a, b).await;
```
Listing 17-23: Using `thread::sleep` to simulate slow operations
If you run this, you will see this output:
```
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
```
As with our earlier example, `race` still finishes as soon as `a` is done.
Theres no interleaving between the two futures, though. The `a` future does all
of its work until the `trpl::sleep` call is awaited, then the `b` future does
all of its work until its own `trpl::sleep` call is awaited, and then the `a`
future completes. To allow both futures to make progress between their slow
tasks, we need await points so we can hand control back to the runtime. That
means we need something we can await!
We can already see this kind of handoff happening in Listing 17-23: if we
removed the `trpl::sleep` at the end of the `a` future, it would complete
without the `b` future running *at all*. Maybe we could use the `sleep` function
as a starting point?
Filename: src/main.rs
```
let one_ms = Duration::from_millis(1);
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::sleep(one_ms).await;
slow("a", 10);
trpl::sleep(one_ms).await;
slow("a", 20);
trpl::sleep(one_ms).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::sleep(one_ms).await;
slow("b", 10);
trpl::sleep(one_ms).await;
slow("b", 15);
trpl::sleep(one_ms).await;
slow("b", 35);
trpl::sleep(one_ms).await;
println!("'b' finished.");
};
```
Listing 17-24: Using `sleep` to let operations switch off making progress
In Listing 17-24, we add `trpl::sleep` calls with await points between each call
to `slow`. Now the two futures work is interleaved:
```
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
```
The `a` future still runs for a bit before handing off control to `b`, because
it calls `slow` before ever calling `trpl::sleep`, but after that the futures
swap back and forth each time one of them hits an await point. In this case, we
have done that after every call to `slow`, but we could break up the work
however makes the most sense to us.
We dont really want to *sleep* here, though: we want to make progress as fast
as we can. We just need to hand back control to the runtime. We can do that
directly, using the `yield_now` function. In Listing 17-25, we replace all those
`sleep` calls with `yield_now`.
Filename: src/main.rs
```
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await;
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await;
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 35);
trpl::yield_now().await;
println!("'b' finished.");
};
```
Listing 17-25: Using `yield_now` to let operations switch off making progress
This is both clearer about the actual intent and can be significantly faster
than using `sleep`, because timers such as the one used by `sleep` often have
limits to how granular they can be. The version of `sleep` we are using, for
example, will always sleep for at least a millisecond, even if we pass it a
`Duration` of one nanosecond. Again, modern computers are *fast*: they can do a
lot in one millisecond!
You can see this for yourself by setting up a little benchmark, such as the one
in Listing 17-26. (This isnt an especially rigorous way to do performance
testing, but it suffices to show the difference here.) Here, we skip all the
status printing, pass a one-nanosecond `Duration` to `trpl::sleep`, and let
each future run by itself, with no switching between the futures. Then we run
for 1,000 iterations and see how long the future using `trpl::sleep` takes
compared to the future using `trpl::yield_now`.
Filename: src/main.rs
```
let one_ns = Duration::from_nanos(1);
let start = Instant::now();
async {
for _ in 1..1000 {
trpl::sleep(one_ns).await;
}
}
.await;
let time = Instant::now() - start;
println!(
"'sleep' version finished after {} seconds.",
time.as_secs_f32()
);
let start = Instant::now();
async {
for _ in 1..1000 {
trpl::yield_now().await;
}
}
.await;
let time = Instant::now() - start;
println!(
"'yield' version finished after {} seconds.",
time.as_secs_f32()
);
```
Listing 17-26: Comparing the performance of `sleep` and `yield_now`
The version with `yield_now` is *way* faster!
This means that async can be useful even for compute-bound tasks, depending on
what else your program is doing, because it provides a useful tool for
structuring the relationships between different parts of the program. This is a
form of *cooperative multitasking*, where each future has the power to determine
when it hands over control via await points. Each future therefore also has the
responsibility to avoid blocking for too long. In some Rust-based embedded
operating systems, this is the *only* kind of multitasking!
In real-world code, you wont usually be alternating function calls with await
points on every single line, of course. While yielding control in this way is
relatively inexpensive, its not free! In many cases, trying to break up a
compute-bound task might make it significantly slower, so sometimes its better
for *overall* performance to let an operation block briefly. You should always
measure to see what your codes actual performance bottlenecks are. The
underlying dynamic is an important one to keep in mind if you *are* seeing a
lot of work happening in serial that you expected to happen concurrently,
though!
### Building Our Own Async Abstractions
We can also compose futures together to create new patterns. For example, we can
build a `timeout` function with async building blocks we already have. When
were done, the result will be another building block we could use to build up
yet further async abstractions.
Listing 17-27 shows how we would expect this `timeout` to work with a slow
future.
Filename: src/main.rs
```
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
```
Listing 17-27: Using our imagined `timeout` to run a slow operation with a time
limit
Lets implement this! To begin, lets think about the API for `timeout`:
* It needs to be an async function itself so we can await it.
* Its first parameter should be a future to run. We can make it generic to allow
it to work with any future.
* Its second parameter will be the maximum time to wait. If we use a `Duration`,
that will make it easy to pass along to `trpl::sleep`.
* It should return a `Result`. If the future completes successfully, the
`Result` will be `Ok` with the value produced by the future. If the timeout
elapses first, the `Result` will be `Err` with the duration that the timeout
waited for.
Listing 17-28 shows this declaration.
Filename: src/main.rs
```
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
```
Listing 17-28: Defining the signature of `timeout`
That satisfies our goals for the types. Now lets think about the *behavior* we
need: we want to race the future passed in against the duration. We can use
`trpl::sleep` to make a timer future from the duration, and use `trpl::race` to
run that timer with the future the caller passes in.
We also know that `race` is not fair, and polls arguments in the order they are
passed. Thus, we pass `future_to_try` to `race` first so it gets a chance to
complete even if `max_time` is a very short duration. If `future_to_try`
finishes first, `race` will return `Left` with the output from `future`. If
`timer` finishes first, `race` will return `Right` with the timers output of
`()`.
In Listing 17-29, we match on the result of awaiting `trpl::race`. If the
`future_to_try` succeeded and we get a `Left(output)`, we return `Ok(output)`.
If the sleep timer elapsed instead and we get a `Right(())`, we ignore the `()`
with `_` and return `Err(max_time)` instead.
Filename: src/main.rs
```
use trpl::Either;
// --snip--
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
match trpl::race(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
```
Listing 17-29: Defining `timeout` with `race` and `sleep`
With that, we have a working `timeout`, built out of two other async helpers. If
we run our code, it will print the failure mode after the timeout:
```
Failed after 2 seconds
```
Because futures compose with other futures, you can build really powerful tools
using smaller async building blocks. For example, you can use this same
approach to combine timeouts with retries, and in turn use those with things
such as network callsone of the examples from the beginning of the chapter!
In practice, you will usually work directly with `async` and `await`, and
secondarily with functions and macros such as `join`, `join_all`, `race`, and
so on. Youll only need to reach for `pin` now and again to use them with those
APIs.
Weve now seen a number of ways to work with multiple futures at the same
time. Up next, well look at how we can work with multiple futures in a
sequence over time, with *streams*. Here are a couple more things you might want
to consider first, though:
* We used a `Vec` with `join_all` to wait for all of the futures in some group
to finish. How could you use a `Vec` to process a group of futures in
sequence, instead? What are the tradeoffs of doing that?
* Take a look at the `futures::stream::FuturesUnordered` type from the `futures`
crate. How would using it be different from using a `Vec`? (Dont worry about
the fact that it is from the `stream` part of the crate; it works just fine
with any collection of futures.)
## Streams
So far in this chapter, we have mostly stuck to individual futures. The one big
exception was the async channel we used. Recall how we used the receiver for
our async channel in the Message Passing section of this chapter on page XX.
The async `recv` method produces a sequence of items over time. This is an
instance of a much more general pattern, often called a *stream*.
A sequence of items is something weve seen before, when we looked at the
`Iterator` trait in The `Iterator` Trait and the `next` Method section of
Chapter 13 on page XX, but there are two differences between iterators and the
async channel receiver. The first difference is the element of time: iterators
are synchronous, while the channel receiver is asynchronous. The second
difference is the API. When working directly with an `Iterator`, we call its
synchronous `next` method. With the `trpl::Receiver` stream in particular, we
called an asynchronous `recv` method instead, but these APIs otherwise feel
very similar.
That similarity isnt a coincidence. A stream is similar to an asynchronous
form of iteration. Whereas the `trpl::Receiver` specifically waits to receive
messages, though, the general-purpose stream API is much more general: it
provides the next item the way `Iterator` does, but asynchronously. The
similarity between iterators and streams in Rust means we can actually create a
stream from any iterator. As with an iterator, we can work with a stream by
calling its `next` method and then awaiting the output, as in Listing 17-30.
Filename: src/main.rs
```
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
```
Listing 17-30: Creating a stream from an iterator and printing its values
We start with an array of numbers, which we convert to an iterator and then call
`map` on to double all the values. Then we convert the iterator into a stream
using the `trpl::stream_from_iter` function. Then we loop over the items in the
stream as they arrive with the `while let` loop.
Unfortunately, when we try to run the code, it doesnt compile. Instead, as we
can see in the output, it reports that there is no `next` method available.
```
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:8:40
|
8 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to '~/projects/hello-async/target/debug/deps/async_await-bbd5bb8f6851cb5f.long-type-18426562901668632191.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
1 + use trpl::StreamExt;
|
help: there is a method `try_next` with a similar name
|
8 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
For more information about this error, try `rustc --explain E0599`.
```
As the output suggests, the reason for the compiler error is that we need the
right trait in scope to be able to use the `next` method. Given our discussion
so far, you might reasonably expect that to be `Stream`, but the trait we need
here is actually `StreamExt`. The `Ext` there is for extension”: this is a
common pattern in the Rust community for extending one trait with another.
Why do we need `StreamExt` instead of `Stream`, and what does the `Stream` trait
itself do? Briefly, the answer is that throughout the Rust ecosystem, the
`Stream` trait defines a low-level interface which effectively combines the
`Iterator` and `Future` traits. The `StreamExt` trait supplies a higher-level
set of APIs on top of `Stream`, including the `next` method as well as other
utility methods similar to those provided by the `Iterator` trait. Well return
to the `Stream` and `StreamExt` traits in a bit more detail at the end of the
chapter. For now, this is enough to let us keep moving.
The fix to the compiler error is to add a `use` statement for `trpl::StreamExt`,
as in Listing 17-31.
Filename: src/main.rs
```
use trpl::StreamExt;
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
```
Listing 17-31: Successfully using an iterator as the basis for a stream
With all those pieces put together, this code works the way we want! Whats
more, now that we have `StreamExt` in scope, we can use all of its utility
methods, just as with iterators. For example, in Listing 17-32, we use the
`filter` method to filter out everything but multiples of three and five.
Filename: src/main.rs
```
use trpl::StreamExt;
fn main() {
trpl::run(async {
let values = 1..101;
let iter = values.map(|n| n * 2);
let stream = trpl::stream_from_iter(iter);
let mut filtered =
stream.filter(|value| value % 3 == 0 || value % 5 == 0);
while let Some(value) = filtered.next().await {
println!("The value was: {value}");
}
});
}
```
Listing 17-32: Filtering a `Stream` with the `StreamExt::filter` method
Of course, this isnt very interesting. We could do that with normal iterators
and without any async at all. So lets look at some of the other things we can
do which are unique to streams.
### Composing Streams
Many concepts are naturally represented as streams: items becoming available in
a queue, or working with more data than can fit in a computers memory by only
pulling chunks of it from the file system at a time, or data arriving over the
network over time. Because streams are futures, we can use them with any other
kind of future, too, and we can combine them in interesting ways. For example,
we can batch up events to avoid triggering too many network calls, set timeouts
on sequences of long-running operations, or throttle user interface events to
avoid doing needless work.
Lets start by building a little stream of messages, as a stand-in for a stream
of data we might see from a WebSocket or another real-time communication
protocol. In Listing 17-33, we create a function `get_messages` which returns
`impl Stream<Item = String>`. For its implementation, we create an async
channel, loop over the first ten letters of the English alphabet, and send them
across the channel.
We also use a new type: `ReceiverStream`, which converts the `rx` receiver from
the `trpl::channel` into a `Stream` with a `next` method. Back in `main`, we use
a `while let` loop to print all the messages from the stream.
Filename: src/main.rs
```
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages = get_messages();
while let Some(message) = messages.next().await {
println!("{message}");
}
});
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for message in messages {
tx.send(format!("Message: '{message}'")).unwrap();
}
ReceiverStream::new(rx)
}
```
Listing 17-33: Using the `rx` receiver as a `ReceiverStream`
When we run this code, we get exactly the results we would expect:
```
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
```
We could do this with the regular `Receiver` API, or even the regular `Iterator`
API, though. Lets add something that requires streams: adding a timeout
which applies to every item in the stream, and a delay on the items we emit.
In Listing 17-34, we start by adding a timeout to the stream with the `timeout`
method, which comes from the `StreamExt` trait. Then we update the body of the
`while let` loop, because the stream now returns a `Result`. The `Ok` variant
indicates a message arrived in time; the `Err` variant indicates that the
timeout elapsed before any message arrived. We `match` on that result and either
print the message when we receive it successfully, or print a notice about the
timeout. Finally, notice that we pin the messages after applying the timeout to
them, because the timeout helper produces a stream which needs to be pinned to
be polled.
Filename: src/main.rs
```
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages =
pin!(get_messages().timeout(Duration::from_millis(200)));
while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
```
Listing 17-34: Using the `StreamExt::timeout` method to set a time limit on the
items in a stream
However, because there are no delays between messages, this timeout does not
change the behavior of the program. Lets add a variable delay to the messages
we send. In `get_messages`, we use the `enumerate` iterator method with the
`messages` array so that we can get the index of each item we are sending along
with the item itself. Then we apply a 100 millisecond delay to even-index items
and a 300 millisecond delay to odd-index items, to simulate the different delays
we might see from a stream of messages in the real world. Because our timeout is
for 200 milliseconds, this should affect half of the messages.
Filename: src/main.rs
```
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
```
Listing 17-35: Sending messages through `tx` with an async delay without making
`get_messages` an async function
To sleep between messages in the `get_messages` function without blocking, we
need to use async. However, we cant make `get_messages` itself into an async
function, because then wed return a `Future<Output = Stream<Item = String>>`
instead of a `Stream<Item = String>>`. The caller would have to await
`get_messages` itself to get access to the stream. But remember: everything in a
given future happens linearly; concurrency happens *between* futures. Awaiting
`get_messages` would require it to send all the messages, including sleeping
between sending each message, before returning the receiver stream. As a result,
the timeout would end up useless. There would be no delays in the stream itself:
the delays would all happen before the stream was even available.
Instead, we leave `get_messages` as a regular function which returns a stream,
and spawn a task to handle the async `sleep` calls.
> Note: calling `spawn_task` in this way works because we already set up our
> runtime. Calling this particular implementation of `spawn_task` *without*
> first setting up a runtime will cause a panic. Other implementations choose
> different tradeoffs: they might spawn a new runtime and so avoid the panic but
> end up with a bit of extra overhead, or simply not provide a standalone way to
> spawn tasks without reference to a runtime. You should make sure you know what
> tradeoff your runtime has chosen and write your code accordingly!
Now our code has a much more interesting result! Between every other pair of
messages, we see an error reported: `Problem: Elapsed(())`.
```
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
```
The timeout doesnt prevent the messages from arriving in the endwe still get
all of the original messages. This is because our channel is unbounded: it can
hold as many messages as we can fit in memory. If the message doesnt arrive
before the timeout, our stream handler will account for that, but when it polls
the stream again, the message may now have arrived.
You can get different behavior if needed by using other kinds of channels, or
other kinds of streams more generally. Lets see one of those in practice in our
final example for this section, by combining a stream of time intervals with
this stream of messages.
### Merging Streams
First, lets create another stream, which will emit an item every millisecond if
we let it run directly. For simplicity, we can use the `sleep` function to send
a message on a delay, and combine it with the same approach of creating a stream
from a channel we used in `get_messages`. The difference is that this time,
were going to send back the count of intervals which has elapsed, so the return
type will be `impl Stream<Item = u32>`, and we can call the function
`get_intervals`.
In Listing 17-36, we start by defining a `count` in the task. (We could define
it outside the task, too, but it is clearer to limit the scope of any given
variable.) Then we create an infinite loop. Each iteration of the loop
asynchronously sleeps for one millisecond, increments the count, and then sends
it over the channel. Because this is all wrapped in the task created by
`spawn_task`, all of it will get cleaned up along with the runtime, including
the infinite loop.
Filename: src/main.rs
```
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
```
Listing 17-36: Creating a stream with a counter that will be emitted once every
millisecond
This kind of infinite loop, which only ends when the whole runtime gets torn
down, is fairly common in async Rust: many programs need to keep running
indefinitely. With async, this doesnt block anything else, as long as there is
at least one await point in each iteration through the loop.
Back in our main functions async block, we start by calling `get_intervals`.
Then we merge the `messages` and `intervals` streams with the `merge` method,
which combines multiple streams into one stream that produces items from any of
the source streams as soon as the items are available, without imposing any
particular ordering. Finally, we loop over that combined stream instead of over
`messages` (Listing 17-37).
Filename: src/main.rs
```
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
```
Listing 17-37: Attempting to merge streams of messages and intervals
At this point, neither `messages` nor `intervals` needs to be pinned or
mutable, because both will be combined into the single `merged` stream.
However, this call to `merge` does not compile! (Neither does the `next` call
in the `while let` loop, but well come back to that after fixing this.) The
two streams have different types. The `messages` stream has the type
`Timeout<impl Stream<Item = String>>`, where `Timeout` is the type which
implements `Stream` for a `timeout` call. Meanwhile, the `intervals` stream has
the type `impl Stream<Item = u32>`. To merge these two streams, we need to
transform one of them to match the other.
In Listing 17-38, we rework the `intervals` stream, because `messages` is
already in the basic format we want and has to handle timeout errors. First, we
can use the `map` helper method to transform the `intervals` into a string.
Second, we need to match the `Timeout` from `messages`. Because we dont
actually *want* a timeout for `intervals`, though, we can just create a timeout
which is longer than the other durations we are using. Here, we create a
10-second timeout with `Duration::from_secs(10)`. Finally, we need to make
`stream` mutable, so that the `while let` loops `next` calls can iterate
through the stream, and pin it so that its safe to do so.
Filename: src/main.rs
```
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
```
Listing 17-38: Aligning the types of the the `intervals` stream with the type
of the `messages` stream
That gets us *almost* to where we need to be. Everything type checks. If you run
this, though, there will be two problems. First, it will never stop! Youll
need to stop it with <span class="keystroke">ctrl-c</span>. Second, the
messages from the English alphabet will be buried in the midst of all the
interval counter messages:
```
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
```
Listing 17-39 shows one way to solve these last two problems. First, we use the
`throttle` method on the `intervals` stream, so that it doesnt overwhelm the
`messages` stream. Throttling is a way of limiting the rate at which a function
will be calledor, in this case, how often the stream will be polled. Once every
hundred milliseconds should do, because that is in the same ballpark as how
often our messages arrive.
To limit the number of items we will accept from a stream, we can use the `take`
method. We apply it to the *merged* stream, because we want to limit the final
output, not just one stream or the other.
Filename: src/main.rs
```
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
```
Listing 17-39: Using `throttle` and `take` to manage the merged streams
Now when we run the program, it stops after pulling twenty items from the
stream, and the intervals dont overwhelm the messages. We also dont get
`Interval: 100` or `Interval: 200` or so on, but instead get `Interval: 1`,
`Interval: 2`, and so oneven though we have a source stream which *can*
produce an event every millisecond. Thats because the `throttle` call
produces a new stream, wrapping the original stream, so that the original
stream only gets polled at the throttle rate, not its own native rate. We
dont have a bunch of unhandled interval messages were choosing to
ignore. Instead, we never produce those interval messages in the first place!
This is the inherent laziness of Rusts futures at work again, allowing us to
choose our performance characteristics.
```
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
Theres one last thing we need to handle: errors! With both of these
channel-based streams, the `send` calls could fail when the other side of the
channel closesand thats just a matter of how the runtime executes the futures
which make up the stream. Up until now we have ignored this by calling `unwrap`,
but in a well-behaved app, we should explicitly handle the error, at minimum by
ending the loop so we dont try to send any more messages! Listing 17-40 shows
a simple error strategy: print the issue and then `break` from the loops. As
usual, the correct way to handle a message send error will varyjust make sure
you have a strategy.
```
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
```
Listing 17-40: Handling errors and shutting down the loops
Now that weve seen a bunch of async in practice, lets take a step back and
dig into a few of the details of how `Future`, `Stream`, and the other key
traits which Rust uses to make async work.
## Digging Into the Traits for Async
Throughout the chapter, weve used the `Future`, `Pin`, `Unpin`, `Stream`, and
`StreamExt` traits in various ways. So far, though, weve avoided digging too
far into the details of how they work or how they fit together. Much of the time
when writing Rust day to day, this is fine. Sometimes, though, youll hit
situations where understanding a few more of these details matters. In this
section, well dig down *enough* further to help with those situationswhile
still leaving the *really* deep dive for other documentation!
### Future
Back in the Futures and the Async Syntax section of this chapter on page XX,
we noted that `Future` is a trait. Lets start by taking a closer look at how
it works. Here is how Rust defines a `Future`:
```
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
That trait definition includes a bunch of new types and also some syntax we
havent seen before, so lets walk through the definition piece by piece.
First, `Future`s associated type `Output` says what the future resolves to.
This is analogous to the `Item` associated type for the `Iterator` trait.
Second, `Future` also has the `poll` method, which takes a special `Pin`
reference for its `self` parameter and a mutable reference to a `Context` type,
and returns a `Poll<Self::Output>`. Well talk a little more about `Pin` and
`Context` later in the section. For now, lets focus on what the method returns,
the `Poll` type:
```
enum Poll<T> {
Ready(T),
Pending,
}
```
This `Poll` type is similar to an `Option`: it has one variant which has a value
(`Ready(T)`), and one which does not (`Pending`). It means something quite
different, though! The `Pending` variant indicates that the future still has
work to do, so the caller will need to check again later. The `Ready` variant
indicates that the `Future` has finished its work and the `T` value is
available.
> Note: With most futures, the caller should not call `poll` again after the
> future has returned `Ready`. Many futures will panic if polled again after
> becoming ready! Futures which are safe to poll again will say so explicitly in
> their documentation. This is similar to how `Iterator::next` behaves!
Under the hood, when you see code which uses `await`, Rust compiles that to code
which calls `poll`. If you look back at Listing 17-4, where we printed out the
page title for a single URL once it resolved, Rust compiles it into something
kind of (although not exactly) like this:
```
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
```
What should we do when the `Future` is still `Pending`? We need some way to try
again and again, and again, until the future is finally ready. In other words,
a loop:
```
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
```
If Rust compiled it to exactly that code, though, every `await` would be
blockingexactly the opposite of what we were going for! Instead, Rust needs
makes sure that the loop can hand off control to something which can pause work
on this future and work on other futures and check this one again later. That
something is an async runtime, and this scheduling and coordination work is
one of the main jobs for a runtime.
Recall our description (in the Counting section of this chapter on page XX)
of waiting on `rx.recv`. The `recv` call returns a `Future`, and awaiting it
polls it. In our initial discussion, we noted that a runtime will pause the
future until its ready with either `Some(message)` or `None` when the channel
closes. With our deeper understanding of `Future` in place, and specifically
`Future::poll`, we can see how that works. The runtime knows the future isnt
ready when it returns `Poll::Pending`. Conversely, the runtime knows the future
is ready and advances it when `poll` returns `Poll::Ready(Some(message))` or
`Poll::Ready(None)`.
The exact details of how a runtime does that are more than we will cover in even
this deep dive section. The key here is to see the basic mechanic of futures: a
runtime *polls* each future it is responsible for, putting it back to sleep when
it is not yet ready.
### Pinning and the Pin and Unpin Traits
When we introduced the idea of pinning while working on Listing 17-17, we ran
into a very gnarly error message. Here is the relevant part of it again:
```
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:33
|
46 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
```
When we read this error message carefully, it not only tells us that we need to
pin the values, but also tells us why pinning is required. The `trpl::join_all`
function returns a struct called `JoinAll`. That struct is generic over a type
`F`, which is constrained to implement the `Future` trait. Directly awaiting a
future with `await` pins the future implicitly. Thats why we dont need to use
`pin!` everywhere we want to await futures.
However, were not directly awaiting a future here. Instead, we construct a new
future, `JoinAll`, by passing a collection of futures to the `join_all`
function. The signature for `join_all` produces requires that the type of the
items in the collection all implement the `Future` trait, and `Box<T>` only
implements `Future` if the `T` that it wraps is a future which implements the
`Unpin` trait.
Thats a lot! But we can understand it, if we dive a little further into how the
`Future` type actually works, in particular around *pinning*.
Lets look again at the definition of `Future`:
```
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
The `cx` parameter and its `Context` type is the key to how a runtime actually
knows when to check any given future, while still being lazy. The details of how
that works are beyond the scope of this chapter, though: you generally only need
to worry about it when writing a custom `Future` implementation.
Instead, well focus on the type for `self`. This is the first time weve seen
a method where `self` has a type annotation. A type annotation for `self` is
similar to type annotations for other function parameters, with two key
differences. First, when we specify the type of `self` in this way, were
telling Rust what type `self` must be to call this method. Second, a type
annotation on `self` cant be just any type. Its only allowed to be the type
on which the method is implemented, a reference or smart pointer to that type,
or a `Pin` wrapping a reference to that type. Well see more on this syntax in
Chapter 18. For now, its enough to know that if we want to poll a future (to
check whether it is `Pending` or `Ready(Output)`), we need a mutable reference
to the type, which is wrapped in a `Pin`.
`Pin` is a wrapper type. In some ways, its similar to the `Box`, `Rc`, and
other smart pointer types we saw in Chapter 15, which also wrap other types.
Unlike those, however, `Pin` only works with *pointer types* such as references
(`&` and `&mut`) and smart pointers (`Box`, `Rc`, and so on). To be precise,
`Pin` works with types which implement the `Deref` or `DerefMut` traits, which
we covered in the Treating Smart Pointers Like Regular References with the
Deref Trait section of Chapter 15 on page XX. You can think of this restriction
as equivalent to only working with pointers, though, because implementing
`Deref` or `DerefMut` means your type behaves similarly to a pointer type. `Pin`
is also not a pointer itself, and it doesnt have any behavior of its own the
way `Rc` and `Arc` do with ref counting. Its purely a tool the compiler can use
to uphold the relevant guarantees, by wrapping pointers in the type.
Recalling that `await` is implemented in terms of calls to `poll`, this starts
to explain the error message we saw abovebut that was in terms of `Unpin`, not
`Pin`. So what exactly are `Pin` and `Unpin`, how do they relate, and why does
`Future` need `self` to be in a `Pin` type to call `poll`?
In the “”ur First Async Program section of this chapter on page XX, we
described how a series of await points in a future get compiled into a state
machineand noted how the compiler helps make sure that state machine follows
all of Rusts normal rules around safety, including borrowing and ownership. To
make that work, Rust looks at what data is needed between each await point and
the next await point or the end of the async block. It then creates a
corresponding variant in the state machine it creates. Each variant gets the
access it needs to the data that will be used in that section of the source
code, whether by taking ownership of that data or by getting a mutable or
immutable reference to it.
So far so good: if we get anything wrong about the ownership or references in a
given async block, the borrow checker will tell us. When we want to move around
the future that corresponds to that blocklike moving it into a `Vec` to pass to
`join_all`, the way we did back inthings get trickier.
When we move a futurewhether by pushing into a data structure to use as an
iterator with `join_all`, or returning them from a functionthat actually means
moving the state machine Rust creates for us. And unlike most other types in
Rust, the futures Rust creates for async blocks can end up with references to
themselves in the fields of any given variant, as in Figure 17-4 (a simplified
illustration to help you get a feel for the idea, rather than digging into what
are often fairly complicated details).
<img alt="Concurrent work flow" src="img/trpl17-04.svg" />
Figure 17-4: A self-referential data type.
By default, though, any object which has a reference to itself is unsafe to
move, because references always point to the actual memory address of the thing
they refer to. If you move the data structure itself, those internal references
will be left pointing to the old location. However, that memory location is now
invalid. For one thing, its value will not be updated when you make changes to
the data structure. For anotherand more importantly!—the computer is now free
to reuse that memory for other things! You could end up reading completely
unrelated data later.
<img alt="Concurrent work flow" src="img/trpl17-05.svg" />
Figure 17-5: The unsafe result of moving a self-referential data type.
In principle, the Rust compiler could try to update every reference to an object
every time it gets moved. That would potentially be a lot of performance
overhead, especially given there can be a whole web of references that need
updating. On the other hand, if we could make sure the data structure in
question *doesnt move in memory*, we dont have to update any references.
This is exactly what Rusts borrow checker requires: you cant move an item
which has any active references to it using safe code.
`Pin` builds on that to give us the exact guarantee we need. When we *pin* a
value by wrapping a pointer to that value in `Pin`, it can no longer move. Thus,
if you have `Pin<Box<SomeType>>`, you actually pin the `SomeType` value, *not*
the `Box` pointer. Figure 17-6 illustrates this:
<img alt="Concurrent work flow" src="img/trpl17-06.svg" />
Figure 17-6: Pinning a `Box` which points to a self-referential future type.
In fact, the `Box` pointer can still move around freely. Remember: we care about
making sure the data ultimately being referenced stays in its place. If a
pointer moves around, but the data it points to is in the same place, as in
Figure 17-7, theres no potential problem. (How you would do this with a `Pin`
wrapping a `Box` is more than well get into in this particular discussion,
but it would make for a good exercise! If you look at the docs for the types as
well as the `std::pin` module, you might be able to work out how you would do
that.) The key is that the self-referential type itself cannot move, because it
is still pinned.
<img alt="Concurrent work flow" src="img/trpl17-07.svg" />
Figure 17-7: Moving a `Box` which points to a self-referential future type.
However, most types are perfectly safe to move around, even if they happen to
be behind a `Pin` pointer. We only need to think about pinning when items have
internal references. Primitive values such as numbers and booleans dont have
any internal references, so theyre obviously safe. Neither do most types you
normally work with in Rust. A `Vec`, for example, doesnt have any internal
references it needs to keep up to date this way, so you can move it around
without worrying. If you have a `Pin<Vec<String>>`, youd have to do everything
via the safe but restrictive APIs provided by `Pin`, even though a
`Vec<String>` is always safe to move if there are no other references to it. We
need a way to tell the compiler that its actually just fine to move items
around in cases such as these. For that, we have `Unpin`.
`Unpin` is a marker trait, similar to the `Send` and `Sync` traits we saw in the
Extensible Concurrency with the `Sync` and `Send` Traits section of Chapter
16 on page XX. Recall that marker traits have no functionality of their own.
They exist only to tell the compiler that its safe to use the type which
implements a given trait in a particular context. `Unpin` informs the compiler
that a given type does *not* need to uphold any particular guarantees about
whether the value in question can be moved.
Just as with `Send` and `Sync`, the compiler implements `Unpin` automatically
for all types where it can prove it is safe. The special case, again similar to
`Send` and `Sync`, is the case where `Unpin` is *not* implemented for a type.
The notation for this is `impl !Unpin for SomeType`, where `SomeType` is the
name of a type which *does* need to uphold those guarantees to be safe whenever
a pointer to that type it is used in a `Pin`.
In other words, there are two things to keep in mind about the relationship
between `Pin` and `Unpin`. First, `Unpin` is the normal case, and `!Unpin` is
the special case. Second, whether a type implements `Unpin` or `!Unpin` *only*
matters when using a pinned pointer to that type like `Pin<&mut SomeType>`.
To make that concrete, think about a `String`: it has a length and the Unicode
characters which make it up. We can wrap a `String` in `Pin`, as seen in Figure
17-8. However, `String` automatically implements `Unpin`, the same as most other
types in Rust.
<img alt="Concurrent work flow" src="img/trpl17-08.svg" />
Figure 17-8: Pinning a String, with a dotted line indicating that the String
implements the `Unpin` trait, so it is not pinned.
As a result, we can do things which would be illegal if `String` implemented
`!Unpin` instead, such as replace one string with another at the exact same
location in memory as in Figure 17-9. This doesnt violate the `Pin` contract,
because `String` has no internal references that make it unsafe to move around!
That is precisely why it implements `Unpin` rather than `!Unpin`.
<img alt="Concurrent work flow" src="img/trpl17-09.svg" />
Figure 17-9: Replacing the String with an entirely different String in memory.
Now we know enough to understand the errors reported for that `join_all` call
from back in Listing 17-17. We originally tried to move the futures produced by
async blocks into a `Vec<Box<dyn Future<Output = ()>>>`, but as weve seen,
those futures may have internal references, so they dont automatically
implement `Unpin`. Once we pin them, we can pass the resulting `Pin` type into
the `Vec`, confident that the underlying data in the futures will *not* be
moved.
`Pin` and `Unpin` are mostly important for building lower-level libraries, or
when youre building a runtime itself, rather than for day to day Rust code.
When you see these traits in error messages, though, now youll have a better
idea of how to fix the code!
> Note: This combination of `Pin` and `Unpin` allows a whole class of complex
> types to be safe in Rust which are otherwise difficult to implement because
> theyre self-referential. Types which require `Pin` show up *most* commonly
> in async Rust today, but you mightvery rarely!—see it in other contexts, too.
>
> The specifics of how `Pin` and `Unpin` work, and the rules theyre required
> to uphold, are covered extensively in the API documentation for `std::pin`, so
> if youd like to understand them more deeply, thats a great place to start.
>
> If you want to understand how things work under the hood in even more
> detail, the official *Asynchronous Programming in Rust* book available at
> *https://rust-lang.github.io/async-book/* has you covered:
>
> * Chapter 2: Under the Hood: Executing Futures and Tasks
> * Chapter 4: Pinning
### The Stream Trait
Now that we have a deeper grasp on the `Future`, `Pin`, and `Unpin` traits, we
can turn our attention to the `Stream` trait. As described in the section
introducing streams, streams are similar to asynchronous iterators. Unlike
`Iterator` and `Future`, there is no definition of a `Stream` trait in the
standard library as of the time of writing, but there *is* a very common
definition from the `futures` crate used throughout the ecosystem.
Lets review the definitions of the `Iterator` and `Future` traits, so we can
build up to how a `Stream` trait that merges them together might look. From
`Iterator`, we have the idea of a sequence: its `next` method provides an
`Option<Self::Item>`. From `Future`, we have the idea of readiness over time:
its `poll` method provides a `Poll<Self::Output>`. To represent a sequence of
items which become ready over time, we define a `Stream` trait which puts those
features together:
```
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
```
The `Stream` trait defines an associated type `Item` for the type of the items
produced by the stream. This is similar to `Iterator`: there may be zero to
many of these, and unlike `Future`, where there is always a single `Output`
(even if its the unit type `()`).
`Stream` also defines a method to get those items. We call it `poll_next`, to
make it clear that it polls in the same way `Future::poll` does and produces a
sequence of items in the same way `Iterator::next` does. Its return type
combines `Poll` with `Option`. The outer type is `Poll`, because it has to be
checked for readiness, just as a future does. The inner type is `Option`,
because it needs to signal whether there are more messages, just as an iterator
does.
Something very similar to this will likely end up standardized as part of Rusts
standard library. In the meantime, its part of the toolkit of most runtimes,
so you can rely on it, and everything we cover below should generally apply!
In the example we saw in the section on streaming, though, we didnt use
`poll_next` *or* `Stream`, but instead used `next` and `StreamExt`. We *could*
work directly in terms of the `poll_next` API by hand-writing our own `Stream`
state machines, of course, just as we *could* work with futures directly via
their `poll` method. Using `await` is much nicer, though, so the `StreamExt`
trait supplies the `next` method so we can do just that.
```
trait StreamExt: Stream {
async fn next(&mut self) -> Option<Self::Item>
where
Self: Unpin;
// other methods...
}
```
> Note: The actual definition we used earlier in the chapter looks slightly
> different than this, because it supports versions of Rust which did not yet
> support using async functions in traits. As a result, it looks like this:
>
> ```
> fn next(&mut self) -> Next<'_, Self> where Self: Unpin;
> ```
>
> That `Next` type is a `struct` which implements `Future` and gives a way to
> name the lifetime of the reference to `self` with `Next<'_, Self>`, so that
> `await` can work with this method!
The `StreamExt` trait is also the home of all the interesting methods available
to use with streams. `StreamExt` is automatically implemented for every type
which implements `Stream`, but these traits are defined separately so that the
community can iterate on the foundational trait distinctly from the convenience
APIs.
In the version of `StreamExt` used in the `trpl` crate, the trait not only
defines the `next` method, it also supplies an implementation of `next`, which
correctly handles the details of calling `Stream::poll_next`. This means that
even when you need to write your own streaming data type, you *only* have to
implement `Stream`, and then anyone who uses your data type can use `StreamExt`
and its methods with it automatically.
Thats all were going to cover for the lower-level details on these traits. To
wrap up, lets consider how futures (including streams), tasks, and threads all
fit together!
## Futures, Tasks, and Threads
As we saw in the Using Threads to Run Code Simultaneously section of Chapter
16 on page XX, threads provide one approach to concurrency. Weve seen another
approach to concurrency in this chapter, using async with futures and streams.
You might be wondering why you would choose one or the other. The answer is: it
depends! And in many cases, the choice isnt threads *or* async but rather
threads *and* async.
Many operating systems have supplied threading-based concurrency models for
decades now, and many programming languages have support for them as a result.
However, they are not without their tradeoffs. On many operating systems, they
use a fair bit of memory for each thread, and they come with some overhead for
starting up and shutting down. Threads are also only an option when your
operating system and hardware support them! Unlike mainstream desktop and mobile
computers, some embedded systems dont have an OS at all, so they also dont
have threads!
The async model provides a differentand ultimately complementaryset of
tradeoffs. In the async model, concurrent operations dont require their own
threads. Instead, they can run on tasks, as when we used `trpl::spawn_task` to
kick off work from a synchronous function throughout the streams section. A task
is similar to a thread, but instead of being managed by the operating system,
its managed by library-level code: the runtime.
In the previous section, we saw that we could build a `Stream` by using an async
channel and spawning an async task which we could call from synchronous code. We
could do the exact same thing with a thread! In Listing 17-40, we used
`trpl::spawn_task` and `trpl::sleep`. In Listing 17-41, we replace those with
the `thread::spawn` and `thread::sleep` APIs from the standard library in the
`get_intervals` function.
Filename: src/main.rs
```
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
// This is *not* `trpl::spawn` but `std::thread::spawn`!
thread::spawn(move || {
let mut count = 0;
loop {
// Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
thread::sleep(Duration::from_millis(1));
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
```
Listing 17-41: Using the `std::thread` APIs instead of the async `trpl` APIs
for the `get_intervals` function
If you run this, the output is identical. And notice how little changes here
from the perspective of the calling code! Whats more, even though one of our
functions spawned an async task on the runtime and the other spawned an
OS thread, the resulting streams were unaffected by the differences.
Despite the similarities, these two approaches behave very differently, although
we might have a hard time measuring it in this very simple example. We could
spawn millions of async tasks on any modern personal computer. If we tried to do
that with threads, we would literally run out of memory!
However, theres a reason these APIs are so similar. Threads act as a boundary
for sets of synchronous operations; concurrency is possible *between* threads.
Tasks act as a boundary for sets of *asynchronous* operations; concurrency is
possible both *between* and *within* tasks, because a task can switch between
futures in its body. Finally, futures are Rusts most granular unit of
concurrency, and each future may represent a tree of other futures. The
runtimespecifically, its executormanages tasks, and tasks manage futures. In
that regard, tasks are similar to lightweight, runtime-managed threads with
added capabilities that come from being managed by a runtime instead of by the
operating system.
This doesnt mean that async tasks are always better than threads, any more than
that threads are always better than tasks.
Concurrency with threads is in some ways a simpler programming model than
concurrency with `async`. That can be a strength or a weakness. Threads are
somewhat fire and forget,” they have no native equivalent to a future, so they
simply run to completion, without interruption except by the operating system
itself. That is, they have no built-in support for *intra-task concurrency* the
way futures do. Threads in Rust also have no mechanisms for cancellationa
subject we havent covered in depth in this chapter, but which is implicit in
the fact that whenever we ended a future, its state got cleaned up correctly.
These limitations also make threads harder to compose than futures. Its much
more difficult, for example, to use threads to build helpers such as the
`timeout` we built in the Building Our Own Async Abstractions section of this
chapter on page XX or the `throttle` method we used with streams in the
Composing Streams section of this chapter on page XX. The fact that futures
are richer data structures means they can be composed together more naturally,
as we have seen.
Tasks then give *additional* control over futures, allowing you to choose where
and how to group the futures. And it turns out that threads and tasks often
work very well together, because tasks can (at least in some runtimes) be moved
around between threads. We havent mentioned it up until now, but under the
hood the `Runtime` we have been using, including the `spawn_blocking` and
`spawn_task` functions, is multithreaded by default! Many runtimes use an
approach called *work stealing* to transparently move tasks around between
threads based on the current utilization of the threads, with the aim of
improving the overall performance of the system. To build that actually requires
threads *and* tasks, and therefore futures.
As a default way of thinking about which to use when:
* If the work is *very parallelizable*, such as processing a bunch of data where
each part can be processed separately, threads are a better choice.
* If the work is *very concurrent*, such as handling messages from a bunch of
different sources which may come in a different intervals or different rates,
async is a better choice.
And if you need some mix of parallelism and concurrency, you dont have to
choose between threads and async. You can use them together freely, letting each
one serve the part it is best at. For example, Listing 17-42 shows a fairly
common example of this kind of mix in real-world Rust code.
Filename: src/main.rs
```
use std::{thread, time::Duration};
fn main() {
let (tx, mut rx) = trpl::channel();
thread::spawn(move || {
for i in 1..11 {
tx.send(i).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
trpl::run(async {
while let Some(message) = rx.recv().await {
println!("{message}");
}
});
}
```
Listing 17-42: Sending messages with blocking code in a thread and awaiting the
messages in an async block
We begin by creating an async channel. Then we spawn a thread which takes
ownership of the sender side of the channel. Within the thread, we send the
numbers 1 through 10, and sleep for a second in between each. Finally, we run a
future created with an async block passed to `trpl::run` just as we have
throughout the chapter. In that future, we await those messages, just as in
the other message-passing examples we have seen.
To return to the examples we opened the chapter with: you could imagine running
a set of video encoding tasks using a dedicated thread, because video encoding
is compute bound, but notifying the UI that those operations are done with an
async channel. Examples of this kind of mix abound!
## Summary
This isnt the last youll see of concurrency in this book: the project in
Chapter 21 will use the concepts in this chapter in a more realistic situation
than the smaller examples discussed hereand compare more directly what it looks
like to solve these kinds of problems with threading vs. with tasks and futures.
Whether with threads, with futures and tasks, or with the combination of them
all, Rust gives you the tools you need to write safe, fast, concurrent
codewhether for a high-throughput web server or an embedded operating system.
Next, well talk about idiomatic ways to model problems and structure solutions
as your Rust programs get bigger. In addition, well discuss how Rusts idioms
relate to those you might be familiar with from object-oriented programming.