Tokio_stream .then() usage

Hello,
I'm trying to return a Stream from then() function. Unfortunately a have some difficulties to implement my async in map() manner.

but the following code complains about lifetime (and borrow in pin!)

use tokio_stream::{self as stream, Stream, StreamExt};


fn map(stream: impl Stream<Item=u32>) -> impl Stream<Item=u32> {
    stream.map(|x| x * x)
}

async fn test(value: u32) -> u32 {
    // use functions that requires async
    value * value
}

fn then(stream: impl Stream<Item=u32>) -> impl Stream<Item=u32> {
    // https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.then
    let stream = stream.then(test);
    tokio::pin!(stream);
    stream
}

#[tokio::main]
async fn main() {
    // OK
    let stream = stream::iter(vec![0, 1, 2]);
    let mut mp = map(stream);
    while let Some(value) = mp.next().await {
        dbg!(&value);
    }
    
    // NOT OK
    let stream = stream::iter(vec![0, 1, 2]);
    let mut mp = then(stream);
    // cannot be unpinned  consider using the `pin!` macro  consider using `Box::pin`
    while let Some(value) = mp.next().await {
        dbg!(&value);
    }
}
   Compiling test_stream v0.1.0 (/Users/h0neybadger/Documents/ezcli/test_stream)
error[E0310]: the parameter type `impl Stream<Item = u32>` may not live long enough
  --> src/main.rs:17:5
   |
17 |     stream
   |     ^^^^^^ ...so that the type `impl Stream<Item = u32>` will meet its required lifetime bounds
   |
help: consider adding an explicit lifetime bound...
   |
13 | fn then(stream: impl Stream<Item=u32> + 'static) -> impl Stream<Item=u32> {
   |                                       +++++++++

error[E0310]: the parameter type `impl Stream<Item = u32>` may not live long enough
  --> src/main.rs:16:5
   |
16 |     tokio::pin!(stream);
   |     ^^^^^^^^^^^^^^^^^^^ ...so that the type `impl Stream<Item = u32>` will meet its required lifetime bounds
   |
   = note: this error originates in the macro `tokio::pin` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider adding an explicit lifetime bound...
   |
13 | fn then(stream: impl Stream<Item=u32> + 'static) -> impl Stream<Item=u32> {
   |                                       +++++++++

error[E0597]: `stream` does not live long enough
  --> src/main.rs:16:5
   |
16 |     tokio::pin!(stream);
   |     ^^^^^^^^^^^^^^^^^^^
   |     |
   |     borrowed value does not live long enough
   |     binding `stream` declared here
17 |     stream
   |     ------ opaque type requires that `stream` is borrowed for `'static`
18 | }
   | - `stream` dropped here while still borrowed
   |
   = note: this error originates in the macro `tokio::pin` (in Nightly builds, run with -Z macro-backtrace for more info)

Some errors have detailed explanations: E0310, E0597.
For more information about an error, try `rustc --explain E0310`.
error: could not compile `test_stream` (bin "test_stream") due to 4 previous errors

I do not know what's wrong exactly. especially with (non-static) stream as input.
i will be very grateful for your help

You can't use tokio::pin! inside of then. You have to do it in main instead:

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![0, 1, 2]);
    let mut mp = then(stream);
    tokio::pin!(mp);
    while let Some(value) = mp.next().await {
        dbg!(&value);
    }
}
3 Likes

Thank you very much.
:wink: