.await for an implemented Stream not clear to me

I’m looking at the fahrenheit repo to learn how to implement a simple executor for asynchronous IO. There is one line that I don’t understand for an example.

while let Some(stream) = incoming.next().await {
    fahrenheit::spawn(process(stream));
}

I have two question:

I looked and looked for where the next() function call is coming from, but I can’t find where in any documentation. futures::Stream as a trait doesn’t have it and it doesn’t implement any iterators that I could find that give next().

One other thing. What does .await actually do there? I read the async book, and it explains that

.await doesn’t block the current thread, but instead asynchronously waits for the future to complete,

I don’t know what that means in concrete terms. I thought only the executor polled futures? That I would have to executor::spawn/run on futures. What is .await actually doing in context of the executor and how does .await get hooked into a custom implementation of an executor to actually do something?

1 Like

The futures crate has a rather complicated history, as it went through many iterations of Rust’s async design story. At some point, its maintainers decided to momentarily stop constantly breaking its API, and instead iterate the design in another crate, futures-preview. This is what fahrenheit uses.

The next() method of Stream is provided via an extension trait, StreamExt, presumably in an attempt to keep the Stream trait itself minimal.

It can be helpful to think about .await in async fn as syntaxic sugar for future combinators like Future::then(). This is not a 100% accurate description as .await allows things that future combinators cannot do, such as borrowing data across “await points”, but hopefully it will be a good enough intuition to get you started.

Your executor is not silently invoked, async fn is just building a complex future that you will ultimately feed to your executor once you’re satisfied with it.

More details about async fn

What async fn actually does is to build a state machine (more precisely a coroutine) that implements the Future trait by internally constructing and polling a certain number of inner Futures in a sequence, yielding to the outer executor when it cannot make progress.

The action to be taken on the next poll (inner future to be polled, code to be executed…) varies depending on the current state, and the rules for state transitions are specified using normal Rust code, which is definitely nicer than future combinators for complex programs.

Because it is built using Compiler Magic™, this coroutine-based Future is able to hold borrows to its inner data, which is not normally allowed by the Rust borrows checker due to the fact that it is dangerous to move self-borrowing objects around. Safety is preserved by using the pinning mechanism to prevent safe code from moving the future around while it’s borrowing itself.

Overall, async fn does not query the inner executor unless you ask it to do so. All it does is to build an arbitrarily complex future that you can ultimately feed to your executor.

3 Likes

Great explanation! If I understand correctly, the following code should make sense.

If I have

async fn foo() {
    some_non_async_work();
}

Then this pre compiles to a future that implements poll as follows

fn poll(....) -> ... {
    some_non_async_work();
    return Poll::Ready;
}

But if I have this

async fn foo() {
    some_async_work().await;
}

then

fn poll(....) -> ... {
    // inner_work() is the future that was produced by some_async_work().await
    match self.inner_work().poll(...) {
        Poll::Ready(result) => return Poll::Ready(result),
        Poll::NotReady => return Poll::NotReady
    }
}

If we have multiple .await statements, then the match above would check each future sequentially. Is that about right?

I figured next would have to come from some kind of of super trait. I always feel iffy about super traits for this exact reason of hidden source code dependency. I’ve done this. I’ve actually “extended” Iterator in the past and I found it useful that I could do that. But I always felt awful that using the super trait wasn’t more restricted.

I could be using an implementation that impl’s a TraitA in one of the 32 libs that I’m using without realizing somewhere else, miles away, not in the source file of TraitA, there is a TraitExt I have no idea about replacing functions or adding new ones. It seems like a security issue. Plus code completion doesn’t work so the only way to know its happening or that its there is to grep the source code. I always thought that to use TraitExt, I should have to import it with use somemodule::TraitExt. That way, its opt in, or there is a chain of trust from one source import down to the TraitExt source import.

To use the source code, I must first see the source code

1 Like

Not quite. The future generated by async fn holds some internal state (like an enum) that tells it which inner future to poll, so that you don’t get O(N) polling overhead if you sequentially poll N futures in your async fn, and so that you stop polling futures which have already completed.

So this…

async fn foo() {
    some_async_work().await;
    some_other_async_work.await;
}

…should ultimately compile down to something like this:

fn poll(...) -> ... {
    match self.state {
        State::First(fut1) => fut1.poll(),
        State::Second(fut2) => fut2.poll(),
    }
}

They are definitely a bit magical / unergonomic, but on the other hand, crates like rayon do showcase how useful they can be… As usual, I guess, great power = great responsibility…

Indeed, you need to import the extension trait in order to use it. It’s just that some crates export their extension trait as part of a “prelude” module which you are expected (but not forced) to import via use some_crate::prelude::*;.

IIRC, conflicting methods coming from extension traits are handled by emitting a compiler error, asking you to disambiguate via <T as Trait>::method() syntax. So silently replacing methods from an object with an extension trait without the user noticing is not possible.

I would consider code completion not displaying methods from extension traits as a bug in the code completion engine that should be reported.

1 Like

Ok, I cross-checked and the rule for method selection actually is…

  • If you have an inherent method Object::foo() and an extension Trait::foo(), then x.foo() will silently pick the inherent method Object::foo().
  • If you have two different extensions Trait1::bar() and Trait2::bar(), then the compiler will give up and refuse to compile x.bar() with E0034, asking you to disambiguate.
1 Like

Ah okay. I suppose then when poll executes this line

State::First(fut1) => fut1.poll(),

And if fut1.poll() returns Ready, it will “remember” not to poll it again and move on to fut2. If fut1 returns NotReady, then it will continue on to poll fut2 as before. This way, inner futures are “progressing” their work at the “same time”, thought not quite due to their asynchronous nature, but same time as far as asynchronous goes. Unless, we spawn a new actual thread for each inner future.

async fn foo() {
    let fut1 = async_work().await;
    let fut2 = async_work2().await;
    executor_thread_pool::spawn(fut1);
    executor_thread_pool::spawn(fut1);

If that’s wrong, then I imagine the only other way for sequential futures to execute would be one after another. That is, State::Second(fut2) => fut2.poll() won’t actually happen until State::First(fut1) => fut1.poll() returns Ready. But I don’t think this is what’s happening? I think it’s the first, above way, right?

Indeed, you need to import the extension trait in order to use it. It’s just that some crates export their extension trait as part of a “prelude” module which you are expected (but not forced) to import via use some_crate::prelude::*; .
Ok, I cross-checked and the rule for method selection actually is…

  • If you have an inherent method Object::foo() and an extension Trait::foo() , then x.foo() will silently pick the inherent method Object::foo() .
  • If you have two different extensions Trait1::bar() and Trait2::bar() , then the compiler will give up and refuse to compile x.bar() with E0034, asking you to disambiguate.

Okay I feel 100% better about that. I wen’t and looked into my old Iterator Ext and seen that I was indeed explicitly opting in by importing the super traits. The disambiguation I didn’t know about because I never tried using the same method signature, but that is also super good to hear.

1 Like

Ah, yes, I forgot to add this part to my mock-up code.

Nope, it just stops and yields immediately, strictly replicating the semantics of blocking code.

There should be some syntax to async fn in order to allow concurrently polling multiple futures, things like all!(fut1, fut2).await or any!(fut1, fut2).await. But it probably shouldn’t be the default. Otherwise, ergonomics arguments aside, what should infinite/very large loops do?

async fn foo() {
    while let Some(stuff) = infinite_stream.next().await {
        /* ... */
    }
}

Note that most executors do not actually spawn one OS thread per future in flight, they rather dispatch future-related tasks on a finite thread pool. This is usually more efficient because typical OS schedulers have trouble dealing with many threads, but it also has implications for correctness as it means that you can deadlock the executor if you put mutually-dependent blocking tasks on it.

1 Like

Nope, it just stops and yields immediately, strictly replicating the semantics of blocking code.

Oh I see, so the second way, more or less. fut1.await then fut2.await2 so that .await truly does simulate flowing synchronously. I should have reasoned about that, that is is obviously what await means.

There should be some syntax to async fn in order to allow concurrently polling multiple futures, things like all!(fut1, fut2).await or any!(fut1, fut2).await . But it probably shouldn’t be the default. Otherwise, ergonomics arguments aside, what should infinite/very large loops do?

How about this? : p

async fn foo() {
    while let Some(stuff) = infinite_stream.next().anticipate {
        /* ... */
    }
}

Thank you for your help. I feel like the mind fog around async/await is starting to clear up.

1 Like

Maybe a stream wasn’t the best example (as AFAIK you need to wait for an item because you can request the next one) and a vector/iterator of futures would have been a better example.

But my intuitive understanding is that if futures take a long time to complete, such a loop could create a large amount of futures and have to cache them in the state machine before any of the created future completes, leading to unbounded growth of the working set.

So you would definitely need to set some “anticipation window size”.

1 Like

Which does exist in futures-preview :wink:

async fn foo() {
    let mut stream = infinite_stream.buffered(10);
    while let Some(stuff) = stream.next().await {
        /* ... */
    }
}
2 Likes