Pinning issues when creating a stream from tokio's JoinSet

For a Rust program I'm writing, tokio::task::JoinSet is perfect for my needs, despite being marked "unstable." I'm using it roughly as shown in the following minimal example:

use chrono::prelude::Local;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;

#[allow(dead_code)]
#[derive(Debug)]
struct SomeError {
    msg: String,
}

fn hms() -> String {
    Local::now().format("%H:%M:%S").to_string()
}

async fn operate(time: u64, result: Option<i64>) -> Result<i64, SomeError> {
    match result {
        Some(x) => {
            println!("{}: Spending {time} seconds doing operations ...", hms());
            sleep(Duration::from_secs(time)).await;
            println!("{}: Operations done after {time} seconds!", hms());
            Ok(x)
        }
        None => {
            println!("{}: Spending {time} seconds doing risky things ...", hms());
            sleep(Duration::from_secs(time)).await;
            Err(SomeError {
                msg: format!("Operation failed after {time} seconds!"),
            })
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), SomeError> {
    let mut tasks = JoinSet::new();
    tasks.spawn(operate(5, Some(42)));
    tasks.spawn(operate(1, Some(17)));
    tasks.spawn(operate(3, None));
    tasks.spawn(operate(2, Some(23)));
    tasks.spawn(operate(4, Some(3)));
    while let Some(r) = tasks.join_next().await {
        match r {
            Ok(Ok(x)) => println!("Got: {x}"),
            Ok(Err(e)) => {
                tasks.shutdown().await;
                return Err(e);
            }
            Err(e) => {
                println!("Error: {e}");
                tasks.shutdown().await;
                break;
            }
        }
    }
    Ok(())
}

This works (or at least appears to), with the remaining in-progress tasks being cancelled as soon as the third task returns Err.

In my actual code, I'm going to be doing the while let Some(r) = tasks.join_next().await loop several times, so I'd like to put it into a function. This is my attempt:

use async_stream::stream;
use chrono::prelude::Local;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_stream::{Stream, StreamExt};

#[allow(dead_code)]
#[derive(Debug)]
struct SomeError {
    msg: String,
}

fn hms() -> String {
    Local::now().format("%H:%M:%S").to_string()
}

async fn operate(time: u64, result: Option<i64>) -> Result<i64, SomeError> {
    match result {
        Some(x) => {
            println!("{}: Spending {time} seconds doing operations ...", hms());
            sleep(Duration::from_secs(time)).await;
            println!("{}: Operations done after {time} seconds!", hms());
            Ok(x)
        }
        None => {
            println!("{}: Spending {time} seconds doing risky things ...", hms());
            sleep(Duration::from_secs(time)).await;
            Err(SomeError {
                msg: format!("Operation failed after {time} seconds!"),
            })
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), SomeError> {
    let mut tasks = JoinSet::new();
    tasks.spawn(operate(5, Some(42)));
    tasks.spawn(operate(1, Some(17)));
    tasks.spawn(operate(3, None));
    tasks.spawn(operate(2, Some(23)));
    tasks.spawn(operate(4, Some(3)));
    let mut stream = aiter_until_error(tasks);
    while let Some(r) = stream.next().await {
        match r {
            Ok(x) => println!("Got: {x}"),
            Err(e) => {
                return Err(e);
            }
        }
    }
    Ok(())
}

fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
    stream! {
        while let Some(r) = tasks.join_next().await {
            match r {
                Ok(Ok(r)) => yield Ok(r),
                Ok(Err(e)) => {
                    tasks.shutdown().await;
                    yield Err(e);
                    break;
                },
                Err(_) => {
                    println!("Error: {e}");
                    tasks.shutdown().await;
                    break;
                }
            }
        }
    }
}

Unfortunately, this fails to build with some very lengthy errors that I don't know how to handle:

error[E0277]: `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>` cannot be unpinned
   --> src/main.rs:45:32
    |
45  |     while let Some(r) = stream.next().await {
    |                                ^^^^ within `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>`
...
56  | fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
    |                                                                                   -------------------------------- within this `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl std::future::Future<Output = ()>`
    = note: required because it appears within the type `async_stream::AsyncStream<std::result::Result<i64, SomeError>, impl std::future::Future<Output = ()>>`
    = note: required because it appears within the type `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
note: required by a bound in `tokio_stream::StreamExt::next`
   --> /Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-stream-0.1.9/src/stream_ext.rs:139:15
    |
139 |         Self: Unpin,
    |               ^^^^^ required by this bound in `tokio_stream::StreamExt::next`

error[E0277]: `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>` cannot be unpinned
  --> src/main.rs:45:38
   |
45 |     while let Some(r) = stream.next().await {
   |                                      ^^^^^^ within `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>`
...
56 | fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
   |                                                                                   -------------------------------- within this `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
   |
   = note: consider using `Box::pin`
   = note: required because it appears within the type `impl std::future::Future<Output = ()>`
   = note: required because it appears within the type `async_stream::AsyncStream<std::result::Result<i64, SomeError>, impl std::future::Future<Output = ()>>`
   = note: required because it appears within the type `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
   = note: required because of the requirements on the impl of `std::future::Future` for `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>>`
   = note: required because of the requirements on the impl of `std::future::IntoFuture` for `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>>`
help: remove the `.await`
   |
45 -     while let Some(r) = stream.next().await {
45 +     while let Some(r) = stream.next() {
   | 

For more information about this error, try `rustc --explain E0277`.

Little help?

You can do this:

let stream = aiter_until_error(tasks);
tokio::pin!(stream);
while let Some(r) = stream.next().await {
3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.