For a Rust program I'm writing, tokio::task::JoinSet
is perfect for my needs, despite being marked "unstable." I'm using it roughly as shown in the following minimal example:
use chrono::prelude::Local;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;
#[allow(dead_code)]
#[derive(Debug)]
struct SomeError {
msg: String,
}
fn hms() -> String {
Local::now().format("%H:%M:%S").to_string()
}
async fn operate(time: u64, result: Option<i64>) -> Result<i64, SomeError> {
match result {
Some(x) => {
println!("{}: Spending {time} seconds doing operations ...", hms());
sleep(Duration::from_secs(time)).await;
println!("{}: Operations done after {time} seconds!", hms());
Ok(x)
}
None => {
println!("{}: Spending {time} seconds doing risky things ...", hms());
sleep(Duration::from_secs(time)).await;
Err(SomeError {
msg: format!("Operation failed after {time} seconds!"),
})
}
}
}
#[tokio::main]
async fn main() -> Result<(), SomeError> {
let mut tasks = JoinSet::new();
tasks.spawn(operate(5, Some(42)));
tasks.spawn(operate(1, Some(17)));
tasks.spawn(operate(3, None));
tasks.spawn(operate(2, Some(23)));
tasks.spawn(operate(4, Some(3)));
while let Some(r) = tasks.join_next().await {
match r {
Ok(Ok(x)) => println!("Got: {x}"),
Ok(Err(e)) => {
tasks.shutdown().await;
return Err(e);
}
Err(e) => {
println!("Error: {e}");
tasks.shutdown().await;
break;
}
}
}
Ok(())
}
This works (or at least appears to), with the remaining in-progress tasks being cancelled as soon as the third task returns Err
.
In my actual code, I'm going to be doing the while let Some(r) = tasks.join_next().await
loop several times, so I'd like to put it into a function. This is my attempt:
use async_stream::stream;
use chrono::prelude::Local;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_stream::{Stream, StreamExt};
#[allow(dead_code)]
#[derive(Debug)]
struct SomeError {
msg: String,
}
fn hms() -> String {
Local::now().format("%H:%M:%S").to_string()
}
async fn operate(time: u64, result: Option<i64>) -> Result<i64, SomeError> {
match result {
Some(x) => {
println!("{}: Spending {time} seconds doing operations ...", hms());
sleep(Duration::from_secs(time)).await;
println!("{}: Operations done after {time} seconds!", hms());
Ok(x)
}
None => {
println!("{}: Spending {time} seconds doing risky things ...", hms());
sleep(Duration::from_secs(time)).await;
Err(SomeError {
msg: format!("Operation failed after {time} seconds!"),
})
}
}
}
#[tokio::main]
async fn main() -> Result<(), SomeError> {
let mut tasks = JoinSet::new();
tasks.spawn(operate(5, Some(42)));
tasks.spawn(operate(1, Some(17)));
tasks.spawn(operate(3, None));
tasks.spawn(operate(2, Some(23)));
tasks.spawn(operate(4, Some(3)));
let mut stream = aiter_until_error(tasks);
while let Some(r) = stream.next().await {
match r {
Ok(x) => println!("Got: {x}"),
Err(e) => {
return Err(e);
}
}
}
Ok(())
}
fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
stream! {
while let Some(r) = tasks.join_next().await {
match r {
Ok(Ok(r)) => yield Ok(r),
Ok(Err(e)) => {
tasks.shutdown().await;
yield Err(e);
break;
},
Err(_) => {
println!("Error: {e}");
tasks.shutdown().await;
break;
}
}
}
}
}
Unfortunately, this fails to build with some very lengthy errors that I don't know how to handle:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>` cannot be unpinned
--> src/main.rs:45:32
|
45 | while let Some(r) = stream.next().await {
| ^^^^ within `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>`
...
56 | fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
| -------------------------------- within this `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
|
= note: consider using `Box::pin`
= note: required because it appears within the type `impl std::future::Future<Output = ()>`
= note: required because it appears within the type `async_stream::AsyncStream<std::result::Result<i64, SomeError>, impl std::future::Future<Output = ()>>`
= note: required because it appears within the type `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
note: required by a bound in `tokio_stream::StreamExt::next`
--> /Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-stream-0.1.9/src/stream_ext.rs:139:15
|
139 | Self: Unpin,
| ^^^^^ required by this bound in `tokio_stream::StreamExt::next`
error[E0277]: `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>` cannot be unpinned
--> src/main.rs:45:38
|
45 | while let Some(r) = stream.next().await {
| ^^^^^^ within `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@/Users/jwodder/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.3/src/lib.rs:201:9: 201:67]>`
...
56 | fn aiter_until_error<T: 'static, E: 'static>(mut tasks: JoinSet<Result<T, E>>) -> impl Stream<Item = Result<T, E>> {
| -------------------------------- within this `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
|
= note: consider using `Box::pin`
= note: required because it appears within the type `impl std::future::Future<Output = ()>`
= note: required because it appears within the type `async_stream::AsyncStream<std::result::Result<i64, SomeError>, impl std::future::Future<Output = ()>>`
= note: required because it appears within the type `impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>`
= note: required because of the requirements on the impl of `std::future::Future` for `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>>`
= note: required because of the requirements on the impl of `std::future::IntoFuture` for `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream<Item = std::result::Result<i64, SomeError>>>`
help: remove the `.await`
|
45 - while let Some(r) = stream.next().await {
45 + while let Some(r) = stream.next() {
|
For more information about this error, try `rustc --explain E0277`.
Little help?