How to pass variable to "Stream::then"

I am struggling to compile a simple program where I am using stream's then. I understand that I can't pass ownership as then may be called multiple times and at different lifetimes. How can I pass a non-Copy variable to inside then?

use futures::stream::{self, StreamExt};

#[derive(Clone)]
struct A {
    pub a: usize
}

async fn a() {
    let a = A {a: 3};

    let stream = stream::iter(1..=3);

    // here: `A` may be complex; can I capture a clone of it?
    let stream = stream.then(|x| async move { x + a.a });

    assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
}

#[tokio::main]
async fn main() {
    a().await;
}

Well, in this case you can just do:

    let aa = a.a;
    let stream = stream.then(|x| async move { x + aa });

This also works:

    use futures::future;

    let stream = stream.then(|x| future::ready(x + a.a));

But at this point it's more straightforward to use map instead:

    let stream = stream.map(|x| x + a.a);

Depending on your real case, the workaround may vary.

Thank you. Yeah, the use-case is a bit more complex. Let me try another minimal example:

async fn d(v: usize, a: A) -> usize {
    // more complex; it spawns and awaits
    v + a.a
}

async fn a() {
    let a = A {a: 1};

    let stream = stream::iter(1..=3);
    let stream = stream.then(move |x| async move { d(x, a).await });

    assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
}

essentially, I am trying to use a partial of d using an argument from the current scope.

The main thing to understand is that an async closure is not a single thing, but rather a combination of an async block and an ordinary closure. Every call to the closure will result in the creation of a new future, and any values needed by the future must be created when it is called.

For example, you could clone the a each time:

let stream = stream.then(move |x| {
    let a_clone = a.clone();
    async move { d(x, a_clone).await }
});

or if the function takes a reference:

async fn d(v: usize, a: &A) -> usize {
    // more complex; it spawns and awaits
    v + a.a
}

let a_ref = &a;
let stream = stream.then(move |x| {
    async move { d(x, a_ref).await }
});
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.