Trying to understand how to use StreamExt::for_each

I'm learning Rust and getting really frustrated trying to use async-tungstenite to make a program which simultaneously:

  1. Receives and handles messages from a server that arrive via WebSocket.
  2. Monitor the local filesystem and send messages to the server when local files change.

The client example in async-tungstenite demonstrates how to use StreamExt::for_each with a Source to asynchronously receive and handle incoming messages. It takes a closure which returns a future. I've been trying all day to get this working, but running into issues either constructing the closure or a function which returns the closure to be passed to for_each.

The following MVCE illustrates the problem I'm running into right now. It's probably something I misunderstand about Rust, so I could use help understanding this properly. What I'd like to know how to do is write a function which returns a closure which returns a future. I'm trying to write the closure such that it delegates to an asynchronous function. The compiler doesn't seem to get it that the asynchronous function, which returns a future, is not compatible with what the closure is expected to return, also a future, with the same output. Is this a shortcoming of Rust, or bad syntax, or a misunderstanding of mine on how to implement this in Rust?

async fn foo() {
    println!("foo");
}

// // This is preferred, but currently results in this error:
// //
// // error[E0562]: `impl Trait` not allowed outside of function and inherent method return types
// //
// fn make_foo_closure() -> impl FnMut() -> impl futures::Future<Output = ()> {
//     println!("make_foo_closure");
//     || foo()
// }

fn make_foo_closure<'a, Fut>() -> impl FnMut() -> Fut + 'a where
    Fut: futures::Future<Output = ()>
{
    println!("make_foo_closure");
    || foo()
}

fn main() {
    println!("Making a closure which calls foo");
    let foo_closure = make_foo_closure();
    println!("Calling the closure");
    futures::executor::block_on(
        foo_closure()
    );
    println!("Done");
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0308]: mismatched types
  --> src/main.rs:18:8
   |
1  | async fn foo() {
   |                - the `Output` of this `async fn`'s found opaque type
...
14 | fn make_foo_closure<'a, Fut>() -> impl FnMut() -> Fut + 'a where
   |                         --- this type parameter
...
18 |     || foo()
   |        ^^^^^ expected type parameter `Fut`, found opaque type
   |
   = note: expected type parameter `Fut`
                 found opaque type `impl core::future::future::Future`
   = help: type parameters must be constrained to match other types
   = note: for more information, visit https://doc.rust-lang.org/book/ch10-02-traits.html#traits-as-parameters

error[E0282]: type annotations needed
  --> src/main.rs:23:23
   |
23 |     let foo_closure = make_foo_closure();
   |         -----------   ^^^^^^^^^^^^^^^^ cannot infer type for type parameter `Fut` declared on the function `make_foo_closure`
   |         |
   |         consider giving `foo_closure` a type

error: aborting due to 2 previous errors

Some errors have detailed explanations: E0282, E0308.
For more information about an error, try `rustc --explain E0282`.
error: could not compile `playground`.

To learn more, run the command again with --verbose.

First, the reason your make_foo_closure function doesn't compile is that the type that the function it returns, returns (i.e. the type of foo) doesn't necessarily match the type parameter Fut like its function signature says it will. For example, if I call make_foo_closure like this:

make_foo_closure<'_, SomeFutureStruct>()

where SomeFutureStruct implements Future<Output = ()>, we'd expect make_foo_closure to give us a function that returns SomeFutureStruct when called but instead we get a function that gives us back the unnamed type that async fn foo corresponds to. This is what the type error you get is complaining about.

When manually writing futures (i.e. if you do the above without using the async keyword) this isn't a problem since you can explicitly name the type:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct FooFuture;

impl Future for FooFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        println!("foo");
        Poll::Ready(())
    }
}

fn foo() -> FooFuture {
    FooFuture
}

// The above is ~roughly~ equivalent to:
// ```rust
// async fn foo() { println!("foo") }
// ```

fn make_foo_closure<'a>() -> impl FnMut() -> FooFuture {
    || foo()
}

fn main() {
    futures::executor::block_on(make_foo_closure()());
}

(Playground)

However, resorting to writing all our async functions manually if far from an ideal solution.

To fix the type error you'd have to name the corresponding future type of foo explicitly (since, as you mention, you can't use impl Trait there) which, afaik, there isn't a way to do or have some kind of existential type that lets us pin down the type of foo without being able to name it explicitly (besides impl Trait I don't know a way to do this on stable Rust).

Fortunately there is something of a solution, though it also isn't ideal.

We've got another tool that let's us abstract over concrete types: trait objects. We can use a trait object (dyn Future<Output = ()) to talk about foo's return value which lets us more or less do what you want, I think:

use std::pin::Pin;

async fn foo() { println!("foo"); }

fn make_foo_closure<'a>() -> impl FnMut() -> Pin<Box<dyn futures::Future<Output = ()> + 'a>> {
    || Box::pin(foo())
}

fn main() {
    futures::executor::block_on(make_foo_closure()());
}

(Playground)

The downside is that unlike impl Trait, using trait objects to abstract over types usually has runtime cost.

The upside (though I'm not sure this is needed for your use case) is that you could have, for example, make_foo_closure's return function produce one of several different Futures that produce (), even though they have different concrete types.

Sorry, I missed the part about StreamExt::for_each.

If you don't need the extra generality of having a function produce the function that for_each takes, then something like this should work:

use futures::stream::{self, StreamExt};

async fn consume(x: u16) {
    println!("{}", x);
}

fn main() {
    let fut = stream::repeat(1).take(3).for_each(|item| {
        consume(item)
    });

    futures::executor::block_on(fut)
}

(Playground)

Thank you for the very thorough explanation! I'm slowly picking up the nuances of Rust lifetimes, async/.await and futures, and closures, and you've helped me along a bit, which I really appreciate!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.