blob: e2fdaedf7a845c511d3ada940874c350e59428dc [file] [log] [blame] [view] [edit]
## Streams
So far in this chapter, we have mostly stuck to individual futures. The one big
exception was the async channel we used. Recall how we used the receiver for our
async channel in the [“Message Passing”][17-02-messages]<!-- ignore --> earlier
in the chapter. The async `recv` method produces a sequence of items over time.
This is an instance of a much more general pattern, often called a _stream_.
A sequence of items is something we’ve seen before, when we looked at the
`Iterator` trait in Chapter 13. There are two differences between iterators and
the async channel receiver, though. The first is the element of time: iterators
are synchronous, while the channel receiver is asynchronous. The second is the
API. When working directly with an `Iterator`, we call its synchronous `next`
method. With the `trpl::Receiver` stream in particular, we called an
asynchronous `recv` method instead. These APIs otherwise feel very similar.
That similarity isn’t a coincidence. A stream is similar to an asynchronous form
of iteration. Whereas the `trpl::Receiver` specifically waits to receive
messages, though, the general-purpose stream API is much more general: it
provides the next item the way `Iterator` does, but asynchronously. The
similarity between iterators and streams in Rust means we can actually create a
stream from any iterator. As with an iterator, we can work with a stream by
calling its `next` method and then awaiting the output, as in Listing 17-30.
<Listing number="17-30" caption="Creating a stream from an iterator and printing its values" file-name="src/main.rs">
```rust,ignore,does_not_compile
{{#rustdoc_include ../listings/ch17-async-await/listing-17-30/src/main.rs:stream}}
```
</Listing>
We start with an array of numbers, which we convert to an iterator and then call
`map` on to double all the values. Then we convert the iterator into a stream
using the `trpl::stream_from_iter` function. Then we loop over the items in the
stream as they arrive with the `while let` loop.
Unfortunately, when we try to run the code, it doesn’t compile. Instead, as we
can see in the output, it reports that there is no `next` method available.
<!-- manual-regeneration
cd listings/ch17-async-await/listing-17-30
cargo build
copy only the error output
-->
```console
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
```
As the output suggests, the reason for the compiler error is that we need the
right trait in scope to be able to use the `next` method. Given our discussion
so far, you might reasonably expect that to be `Stream`, but the trait we need
here is actually `StreamExt`. The `Ext` there is for “extension”: this is a
common pattern in the Rust community for extending one trait with another.
Why do we need `StreamExt` instead of `Stream`, and what does the `Stream` trait
itself do? Briefly, the answer is that throughout the Rust ecosystem, the
`Stream` trait defines a low-level interface which effectively combines the
`Iterator` and `Future` traits. The `StreamExt` trait supplies a higher-level
set of APIs on top of `Stream`, including the `next` method as well as other
utility methods similar to those provided by the `Iterator` trait. We’ll return
to the `Stream` and `StreamExt` traits in a bit more detail at the end of the
chapter. For now, this is enough to let us keep moving.
The fix to the compiler error is to add a `use` statement for `trpl::StreamExt`,
as in Listing 17-31.
<Listing number="17-31" caption="Successfully using an iterator as the basis for a stream" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-31/src/main.rs:all}}
```
</Listing>
With all those pieces put together, this code works the way we want! What’s
more, now that we have `StreamExt` in scope, we can use all of its utility
methods, just as with iterators. For example, in Listing 17-32, we use the
`filter` method to filter out everything but multiples of three and five.
<Listing number="17-32" caption="Filtering a `Stream` with the `StreamExt::filter` method" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-32/src/main.rs:all}}
```
</Listing>
Of course, this isn’t very interesting. We could do that with normal iterators
and without any async at all. So let’s look at some of the other things we can
do which are unique to streams.
### Composing Streams
Many concepts are naturally represented as streams: items becoming available in
a queue, or working with more data than can fit in a computer’s memory by only
pulling chunks of it from the file system at a time, or data arriving over the
network over time. Because streams are futures, we can use them with any other
kind of future, too, and we can combine them in interesting ways. For example,
we can batch up events to avoid triggering too many network calls, set timeouts
on sequences of long-running operations, or throttle user interface events to
avoid doing needless work.
Let’s start by building a little stream of messages, as a stand-in for a stream
of data we might see from a WebSocket or another real-time communication
protocol. In Listing 17-33, we create a function `get_messages` which returns
`impl Stream<Item = String>`. For its implementation, we create an async
channel, loop over the first ten letters of the English alphabet, and send them
across the channel.
We also use a new type: `ReceiverStream`, which converts the `rx` receiver from
the `trpl::channel` into a `Stream` with a `next` method. Back in `main`, we use
a `while let` loop to print all the messages from the stream.
<Listing number="17-33" caption="Using the `rx` receiver as a `ReceiverStream`" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-33/src/main.rs:all}}
```
</Listing>
When we run this code, we get exactly the results we would expect:
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
changes in the compiler -->
```text
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
```
We could do this with the regular `Receiver` API, or even the regular `Iterator`
API, though. Let’s add something that requires streams: adding a timeout which
applies to every item in the stream, and a delay on the items we emit.
In Listing 17-34, we start by adding a timeout to the stream with the `timeout`
method, which comes from the `StreamExt` trait. Then we update the body of the
`while let` loop, because the stream now returns a `Result`. The `Ok` variant
indicates a message arrived in time; the `Err` variant indicates that the
timeout elapsed before any message arrived. We `match` on that result and either
print the message when we receive it successfully, or print a notice about the
timeout. Finally, notice that we pin the messages after applying the timeout to
them, because the timeout helper produces a stream which needs to be pinned to
be polled.
<Listing number="17-34" caption="Using the `StreamExt::timeout` method to set a time limit on the items in a stream" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-34/src/main.rs:timeout}}
```
</Listing>
However, because there are no delays between messages, this timeout does not
change the behavior of the program. Let’s add a variable delay to the messages
we send. In `get_messages`, we use the `enumerate` iterator method with the
`messages` array so that we can get the index of each item we are sending along
with the item itself. Then we apply a 100 millisecond delay to even-index items
and a 300 millisecond delay to odd-index items, to simulate the different delays
we might see from a stream of messages in the real world. Because our timeout is
for 200 milliseconds, this should affect half of the messages.
<Listing number="17-35" caption="Sending messages through `tx` with an async delay without making `get_messages` an async function" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-35/src/main.rs:messages}}
```
</Listing>
To sleep between messages in the `get_messages` function without blocking, we
need to use async. However, we can’t make `get_messages` itself into an async
function, because then we’d return a `Future<Output = Stream<Item = String>>`
instead of a `Stream<Item = String>>`. The caller would have to await
`get_messages` itself to get access to the stream. But remember: everything in a
given future happens linearly; concurrency happens _between_ futures. Awaiting
`get_messages` would require it to send all the messages, including sleeping
between sending each message, before returning the receiver stream. As a result,
the timeout would end up useless. There would be no delays in the stream itself:
the delays would all happen before the stream was even available.
Instead, we leave `get_messages` as a regular function which returns a stream,
and spawn a task to handle the async `sleep` calls.
> Note: calling `spawn_task` in this way works because we already set up our
> runtime. Calling this particular implementation of `spawn_task` _without_
> first setting up a runtime will cause a panic. Other implementations choose
> different tradeoffs: they might spawn a new runtime and so avoid the panic but
> end up with a bit of extra overhead, or simply not provide a standalone way to
> spawn tasks without reference to a runtime. You should make sure you know what
> tradeoff your runtime has chosen and write your code accordingly!
Now our code has a much more interesting result! Between every other pair of
messages, we see an error reported: `Problem: Elapsed(())`.
<!-- manual-regeneration
cd listings/ch17-async-await/listing-17-35
cargo run
copy only the program output, *not* the compiler output
-->
```text
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
```
The timeout doesn’t prevent the messages from arriving in the end—we still get
all of the original messages. This is because our channel is unbounded: it can
hold as many messages as we can fit in memory. If the message doesn’t arrive
before the timeout, our stream handler will account for that, but when it polls
the stream again, the message may now have arrived.
You can get different behavior if needed by using other kinds of channels, or
other kinds of streams more generally. Let’s see one of those in practice in our
final example for this section, by combining a stream of time intervals with
this stream of messages.
### Merging Streams
First, let’s create another stream, which will emit an item every millisecond if
we let it run directly. For simplicity, we can use the `sleep` function to send
a message on a delay, and combine it with the same approach of creating a stream
from a channel we used in `get_messages`. The difference is that this time,
we’re going to send back the count of intervals which has elapsed, so the return
type will be `impl Stream<Item = u32>`, and we can call the function
`get_intervals`.
In Listing 17-36, we start by defining a `count` in the task. (We could define
it outside the task, too, but it is clearer to limit the scope of any given
variable.) Then we create an infinite loop. Each iteration of the loop
asynchronously sleeps for one millisecond, increments the count, and then sends
it over the channel. Because this is all wrapped in the task created by
`spawn_task`, all of it will get cleaned up along with the runtime, including
the infinite loop.
<Listing number="17-36" caption="Creating a stream with a counter that will be emitted once every millisecond" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-36/src/main.rs:intervals}}
```
</Listing>
This kind of infinite loop, which only ends when the whole runtime gets torn
down, is fairly common in async Rust: many programs need to keep running
indefinitely. With async, this doesn’t block anything else, as long as there is
at least one await point in each iteration through the loop.
Back in our main function’s async block, we start by calling `get_intervals`.
Then we merge the `messages` and `intervals` streams with the `merge` method,
which combines multiple streams into one stream that produces items from any of
the source streams as soon as the items are available, without imposing any
particular ordering. Finally, we loop over that combined stream instead of over
`messages` (Listing 17-37).
<Listing number="17-37" caption="Attempting to merge streams of messages and intervals" file-name="src/main.rs">
```rust,ignore,does_not_compile
{{#rustdoc_include ../listings/ch17-async-await/listing-17-37/src/main.rs:main}}
```
</Listing>
At this point, neither `messages` nor `intervals` needs to be pinned or mutable,
because both will be combined into the single `merged` stream. However, this
call to `merge` does not compile! (Neither does the `next` call in the
`while
let` loop, but we’ll come back to that after fixing this.) The two streams
have different types. The `messages` stream has the type
`Timeout<impl
Stream<Item = String>>`, where `Timeout` is the type which
implements `Stream` for a `timeout` call. Meanwhile, the `intervals` stream has
the type `impl
Stream<Item = u32>`. To merge these two streams, we need to
transform one of them to match the other.
In Listing 17-38, we rework the `intervals` stream, because `messages` is
already in the basic format we want and has to handle timeout errors. First, we
can use the `map` helper method to transform the `intervals` into a string.
Second, we need to match the `Timeout` from `messages`. Because we don’t
actually _want_ a timeout for `intervals`, though, we can just create a timeout
which is longer than the other durations we are using. Here, we create a
10-second timeout with `Duration::from_secs(10)`. Finally, we need to make
`stream` mutable, so that the `while let` loop’s `next` calls can iterate
through the stream, and pin it so that it’s safe to do so.
<!-- We cannot directly test this one, because it never stops. -->
<Listing number="17-38" caption="Aligning the types of the the `intervals` stream with the type of the `messages` stream" file-name="src/main.rs">
```rust,ignore
{{#rustdoc_include ../listings/ch17-async-await/listing-17-38/src/main.rs:main}}
```
</Listing>
That gets us _almost_ to where we need to be. Everything type checks. If you run
this, though, there will be two problems. First, it will never stop! You’ll need
to stop it with <span class="keystroke">ctrl-c</span>. Second, the messages from
the English alphabet will be buried in the midst of all the interval counter
messages:
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the tasks running differently rather than
changes in the compiler -->
```text
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
```
Listing 17-39 shows one way to solve these last two problems. First, we use the
`throttle` method on the `intervals` stream, so that it doesn’t overwhelm the
`messages` stream. Throttling is a way of limiting the rate at which a function
will be called—or, in this case, how often the stream will be polled. Once every
hundred milliseconds should do, because that is in the same ballpark as how
often our messages arrive.
To limit the number of items we will accept from a stream, we can use the `take`
method. We apply it to the _merged_ stream, because we want to limit the final
output, not just one stream or the other.
<Listing number="17-39" caption="Using `throttle` and `take` to manage the merged streams" file-name="src/main.rs">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-39/src/main.rs:throttle}}
```
</Listing>
Now when we run the program, it stops after pulling twenty items from the
stream, and the intervals don’t overwhelm the messages. We also don’t get
`Interval: 100` or `Interval: 200` or so on, but instead get `Interval: 1`,
`Interval: 2`, and so on—even though we have a source stream which _can_ produce
an event every millisecond. That’s because the `throttle` call produces a new
stream, wrapping the original stream, so that the original stream only gets
polled at the throttle rate, not its own “native” rate. We don’t have a bunch of
unhandled interval messages we’re choosing to ignore. Instead, we never produce
those interval messages in the first place! This is the inherent “laziness” of
Rust’s futures at work again, allowing us to choose our performance
characteristics.
<!-- manual-regeneration
cd listings/ch17-async-await/listing-17-39
cargo run
copy and paste only the program output
-->
```text
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
There’s one last thing we need to handle: errors! With both of these
channel-based streams, the `send` calls could fail when the other side of the
channel closes—and that’s just a matter of how the runtime executes the futures
which make up the stream. Up until now we have ignored this by calling `unwrap`,
but in a well-behaved app, we should explicitly handle the error, at minimum by
ending the loop so we don’t try to send any more messages! Listing 17-40 shows a
simple error strategy: print the issue and then `break` from the loops. As
usual, the correct way to handle a message send error will vary—just make sure
you have a strategy.
<Listing number="17-40" caption="Handling errors and shutting down the loops">
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-40/src/main.rs:errors}}
```
</Listing>
Now that we’ve seen a bunch of async in practice, let’s take a step back and dig
into a few of the details of how `Future`, `Stream`, and the other key traits
which Rust uses to make async work.
[17-02-messages]: ch17-02-concurrency-with-async.html#message-passing