blob: 196da5ca1bbbd7a92a5209abac8e29f54ee39d52 [file] [log] [blame] [view]
## Streams: Futures in Sequence
<!-- Old headings. Do not remove or links may break. -->
<a id="streams"></a>
So far in this chapter, we’ve mostly stuck to individual futures. The one big
exception was the async channel we used. Recall how we used the receiver for our
async channel earlier in this chapter in the [“Message
Passing”][17-02-messages]<!-- ignore --> section. The async `recv` method
produces a sequence of items over time. This is an instance of a much more
general pattern known as a _stream_.
We saw a sequence of items back in Chapter 13, when we looked at the `Iterator`
trait in [The Iterator Trait and the `next` Method][iterator-trait]<!-- ignore
--> section, but there are two differences between iterators and the async
channel receiver. The first difference is time: iterators are synchronous, while
the channel receiver is asynchronous. The second is the API. When working
directly with `Iterator`, we call its synchronous `next` method. With the
`trpl::Receiver` stream in particular, we called an asynchronous `recv` method
instead. Otherwise, these APIs feel very similar, and that similarity
isn’t a coincidence. A stream is like an asynchronous form of iteration. Whereas
the `trpl::Receiver` specifically waits to receive messages, though, the
general-purpose stream API is much broader: it provides the next item the
way `Iterator` does, but asynchronously.
The similarity between iterators and streams in Rust means we can actually
create a stream from any iterator. As with an iterator, we can work with a
stream by calling its `next` method and then awaiting the output, as in Listing
17-30.
<Listing number="17-30" caption="Creating a stream from an iterator and printing its values" file-name="src/main.rs">
```rust,ignore,does_not_compile
{{#rustdoc_include ../listings/ch17-async-await/listing-17-30/src/main.rs:stream}}
```
</Listing>
We start with an array of numbers, which we convert to an iterator and then call
`map` on to double all the values. Then we convert the iterator into a stream
using the `trpl::stream_from_iter` function. Next, we loop over the items in the
stream as they arrive with the `while let` loop.
Unfortunately, when we try to run the code, it doesn’t compile, but instead it
reports that there’s no `next` method available:
<!-- manual-regeneration
cd listings/ch17-async-await/listing-17-30
cargo build
copy only the error output
-->
```console
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
```
As this output explains, the reason for the compiler error is that we need the
right trait in scope to be able to use the `next` method. Given our discussion
so far, you might reasonably expect that trait to be `Stream`, but it’s actually
`StreamExt`. Short for _extension_, `Ext` is a common pattern in the
Rust community for extending one trait with another.
We’ll explain the `Stream` and `StreamExt` traits in a bit more detail at the
end of the chapter, but for now all you need to know is that the `Stream` trait
defines a low-level interface that effectively combines the `Iterator` and
`Future` traits. `StreamExt` supplies a higher-level set of APIs on top of
`Stream`, including the `next` method as well as other utility methods similar
to those provided by the `Iterator` trait. `Stream` and `StreamExt` are not yet
part of Rust’s standard library, but most ecosystem crates use the same
definition.
The fix to the compiler error is to add a `use` statement for `trpl::StreamExt`,
as in Listing 17-31.
<Listing number="17-31" caption="Successfully using an iterator as the basis for a stream" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-31/src/main.rs:all}}
```
</Listing>
With all those pieces put together, this code works the way we want! What’s
more, now that we have `StreamExt` in scope, we can use all of its utility
methods, just as with iterators. For example, in Listing 17-32, we use the
`filter` method to filter out everything but multiples of three and five.
<Listing number="17-32" caption="Filtering a stream with the `StreamExt::filter` method" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-32/src/main.rs:all}}
```
</Listing>
Of course, this isn’t very interesting, since we could do the same with normal
iterators and without any async at all. Let’s look at what
we can do that _is_ unique to streams.
### Composing Streams
Many concepts are naturally represented as streams: items becoming available in
a queue, chunks of data being pulled incrementally from the filesystem when the
full data set is too large for the computer’s memory, or data arriving over the
network over time. Because streams are futures, we can use them with any other
kind of future and combine them in interesting ways. For example, we can batch
up events to avoid triggering too many network calls, set timeouts on sequences
of long-running operations, or throttle user interface events to avoid doing
needless work.
Let’s start by building a little stream of messages as a stand-in for a stream
of data we might see from a WebSocket or another real-time communication
protocol, as shown in Listing 17-33.
<Listing number="17-33" caption="Using the `rx` receiver as a `ReceiverStream`" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-33/src/main.rs:all}}
```
</Listing>
First, we create a function called `get_messages` that returns `impl Stream<Item
= String>`. For its implementation, we create an async channel, loop over the
first 10 letters of the English alphabet, and send them across the channel.
We also use a new type: `ReceiverStream`, which converts the `rx` receiver from
the `trpl::channel` into a `Stream` with a `next` method. Back in `main`, we use
a `while let` loop to print all the messages from the stream.
When we run this code, we get exactly the results we would expect:
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
changes in the compiler -->
```text
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
```
Again, we could do this with the regular `Receiver` API or even the regular
`Iterator` API, though, so let’s add a feature that requires streams: adding a
timeout that applies to every item in the stream, and a delay on the items we
emit, as shown in Listing 17-34.
<Listing number="17-34" caption="Using the `StreamExt::timeout` method to set a time limit on the items in a stream" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-34/src/main.rs:timeout}}
```
</Listing>
We start by adding a timeout to the stream with the `timeout` method, which
comes from the `StreamExt` trait. Then we update the body of the `while let`
loop, because the stream now returns a `Result`. The `Ok` variant indicates a
message arrived in time; the `Err` variant indicates that the timeout elapsed
before any message arrived. We `match` on that result and either print the
message when we receive it successfully or print a notice about the timeout.
Finally, notice that we pin the messages after applying the timeout to them,
because the timeout helper produces a stream that needs to be pinned to be
polled.
However, because there are no delays between messages, this timeout does not
change the behavior of the program. Let’s add a variable delay to the messages
we send, as shown in Listing 17-35.
<Listing number="17-35" caption="Sending messages through `tx` with an async delay without making `get_messages` an async function" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-35/src/main.rs:messages}}
```
</Listing>
In `get_messages`, we use the `enumerate` iterator method with the `messages`
array so that we can get the index of each item we’re sending along with the
item itself. Then we apply a 100-millisecond delay to even-index items and a
300-millisecond delay to odd-index items to simulate the different delays we
might see from a stream of messages in the real world. Because our timeout is
for 200 milliseconds, this should affect half of the messages.
To sleep between messages in the `get_messages` function without blocking, we
need to use async. However, we can’t make `get_messages` itself into an async
function, because then we’d return a `Future<Output = Stream<Item = String>>`
instead of a `Stream<Item = String>>`. The caller would have to await
`get_messages` itself to get access to the stream. But remember: everything in a
given future happens linearly; concurrency happens _between_ futures. Awaiting
`get_messages` would require it to send all the messages, including the sleep
delay between each message, before returning the receiver stream. As a result,
the timeout would be useless. There would be no delays in the stream itself;
they would all happen before the stream was even available.
Instead, we leave `get_messages` as a regular function that returns a stream,
and we spawn a task to handle the async `sleep` calls.
> Note: Calling `spawn_task` in this way works because we already set up our
> runtime; had we not, it would cause a panic. Other implementations choose
> different tradeoffs: they might spawn a new runtime and avoid the panic but
> end up with a bit of extra overhead, or they may simply not provide a
> standalone way to spawn tasks without reference to a runtime. Make sure you
> know what tradeoff your runtime has chosen and write your code accordingly!
Now our code has a much more interesting result. Between every other pair of
messages, a `Problem: Elapsed(())` error.
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
changes in the compiler -->
```text
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
```
The timeout doesn’t prevent the messages from arriving in the end. We still get
all of the original messages, because our channel is _unbounded_: it can hold as
many messages as we can fit in memory. If the message doesn’t arrive before the
timeout, our stream handler will account for that, but when it polls the stream
again, the message may now have arrived.
You can get different behavior if needed by using other kinds of channels or
other kinds of streams more generally. Let’s see one of those in practice by
combining a stream of time intervals with this stream of messages.
### Merging Streams
First, let’s create another stream, which will emit an item every millisecond if
we let it run directly. For simplicity, we can use the `sleep` function to send
a message on a delay and combine it with the same approach we used in
`get_messages` of creating a stream from a channel. The difference is that this
time, we’re going to send back the count of intervals that have elapsed, so the
return type will be `impl Stream<Item = u32>`, and we can call the function
`get_intervals` (see Listing 17-36).
<Listing number="17-36" caption="Creating a stream with a counter that will be emitted once every millisecond" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-36/src/main.rs:intervals}}
```
</Listing>
We start by defining a `count` in the task. (We could define it outside the
task, too, but it’s clearer to limit the scope of any given variable.) Then we
create an infinite loop. Each iteration of the loop asynchronously sleeps for
one millisecond, increments the count, and then sends it over the channel.
Because this is all wrapped in the task created by `spawn_task`, all of
it—including the infinite loop—will get cleaned up along with the runtime.
This kind of infinite loop, which ends only when the whole runtime gets torn
down, is fairly common in async Rust: many programs need to keep running
indefinitely. With async, this doesn’t block anything else, as long as there is
at least one await point in each iteration through the loop.
Now, back in our main function’s async block, we can attempt to merge the
`messages` and `intervals` streams, as shown in Listing 17-37.
<Listing number="17-37" caption="Attempting to merge the `messages` and `intervals` streams" file-name="src/main.rs">
```rust,ignore,does_not_compile
{{#rustdoc_include ../listings/ch17-async-await/listing-17-37/src/main.rs:main}}
```
</Listing>
We start by calling `get_intervals`. Then we merge the `messages` and
`intervals` streams with the `merge` method, which combines multiple streams
into one stream that produces items from any of the source streams as soon as
the items are available, without imposing any particular ordering. Finally, we
loop over that combined stream instead of over `messages`.
At this point, neither `messages` nor `intervals` needs to be pinned or mutable,
because both will be combined into the single `merged` stream. However, this
call to `merge` doesn’t compile! (Neither does the `next` call in the `while
let` loop, but we’ll come back to that.) This is because the two streams have
different types. The `messages` stream has the type `Timeout<impl Stream<Item =
String>>`, where `Timeout` is the type that implements `Stream` for a `timeout`
call. The `intervals` stream has the type `impl Stream<Item = u32>`. To merge
these two streams, we need to transform one of them to match the other. We’ll
rework the intervals stream, because messages is already in the basic format we
want and has to handle timeout errors (see Listing 17-38).
<!-- We cannot directly test this one, because it never stops. -->
<Listing number="17-38" caption="Aligning the type of the the `intervals` stream with the type of the `messages` stream" file-name="src/main.rs">
```rust,ignore
{{#rustdoc_include ../listings/ch17-async-await/listing-17-38/src/main.rs:main}}
```
</Listing>
First, we can use the `map` helper method to transform the `intervals` into a
string. Second, we need to match the `Timeout` from `messages`. Because we don’t
actually _want_ a timeout for `intervals`, though, we can just create a timeout
which is longer than the other durations we are using. Here, we create a
10-second timeout with `Duration::from_secs(10)`. Finally, we need to make
`stream` mutable, so that the `while let` loop’s `next` calls can iterate
through the stream, and pin it so that it’s safe to do so. That gets us _almost_
to where we need to be. Everything type checks. If you run this, though, there
will be two problems. First, it will never stop! You’ll need to stop it with
<span class="keystroke">ctrl-c</span>. Second, the messages from the English
alphabet will be buried in the midst of all the interval counter messages:
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the tasks running differently rather than
changes in the compiler -->
```text
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
```
Listing 17-39 shows one way to solve these last two problems.
<Listing number="17-39" caption="Using `throttle` and `take` to manage the merged streams" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-39/src/main.rs:throttle}}
```
</Listing>
First, we use the `throttle` method on the `intervals` stream so that it doesn’t
overwhelm the `messages` stream. _Throttling_ is a way of limiting the rate at
which a function will be called—or, in this case, how often the stream will be
polled. Once every 100 milliseconds should do, because that’s roughly how often
our messages arrive.
To limit the number of items we will accept from a stream, we apply the `take`
method to the `merged` stream, because we want to limit the final output, not
just one stream or the other.
Now when we run the program, it stops after pulling 20 items from the stream,
and the intervals don’t overwhelm the messages. We also don’t get `Interval:
100` or `Interval: 200` or so on, but instead get `Interval: 1`, `Interval: 2`,
and so on—even though we have a source stream that _can_ produce an event every
millisecond. That’s because the `throttle` call produces a new stream that wraps
the original stream so that the original stream gets polled only at the throttle
rate, not its own “native” rate. We don’t have a bunch of unhandled interval
messages we’re choosing to ignore. Instead, we never produce those interval
messages in the first place! This is the inherent “laziness” of Rust’s futures
at work again, allowing us to choose our performance characteristics.
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
changes in the compiler -->
```text
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
There’s one last thing we need to handle: errors! With both of these
channel-based streams, the `send` calls could fail when the other side of the
channel closes—and that’s just a matter of how the runtime executes the futures
that make up the stream. Up until now, we’ve ignored this possibility by calling
`unwrap`, but in a well-behaved app, we should explicitly handle the error, at
minimum by ending the loop so we don’t try to send any more messages. Listing
17-40 shows a simple error strategy: print the issue and then `break` from the
loops.
<Listing number="17-40" caption="Handling errors and shutting down the loops">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-40/src/main.rs:errors}}
```
</Listing>
As usual, the correct way to handle a message send error will vary; just make
sure you have a strategy.
Now that we’ve seen a bunch of async in practice, let’s take a step back and dig
into a few of the details of how `Future`, `Stream`, and the other key traits
Rust uses to make async work.
[17-02-messages]: ch17-02-concurrency-with-async.html#message-passing
[iterator-trait]: ch13-02-iterators.html#the-iterator-trait-and-the-next-method