Hello! I'm trying to send data over channels to async tasks. I'm sure I'm doing something wrong but no idea how to make it work.
use async_std::task;
use futures::channel::mpsc::{channel, Receiver};
type Error = Box<(dyn std::error::Error + Send + Sync + 'static)>;
async fn generator() -> Result<Receiver<u32>, Error> {
let (mut tx, rx) = channel(0);
let nums = vec![3, 6, 2, 7, 5, 2, 5, 8, 4, 2, 5, 7];
task::spawn(async move {
for num in nums {
tx.try_send(num)
.expect("Could not send the generated number over the channel")
}
});
Ok(rx)
}
async fn fan_out(mut rx_gen: Receiver<u32>) -> Result<(), Error> {
let mut tasks = vec![];
while let Some(num) = rx_gen.try_next()? {
let task = task::spawn(async move { println!("the received number is {}", num) });
tasks.push(task);
}
for task in tasks.into_iter() {
task.await;
}
Ok(())
}
#[async_std::main]
async fn main() -> Result<(), Error> {
fan_out(generator().await?).await?;
Ok(())
}
I'm getting:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.03s
Running `target/debug/test_fanout`
thread 'Error: async-std/executorTryRecvError' panicked at '
Could not send the generated number over the channel: TrySendError { kind: Disconnected }', src/main.rs:12:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Your fan_out method calls try_next, which immediately returns None, as the thing you spawned in generator as not had time to send any messages yet. The fan_out function then skips the for loop, as tasks is empty, and then it returns.
Once it returns, the receiver is dropped, causing a panic in the spawned task, as try_send returns an error if the receiver has been disconnected.
You should probably be using send and recv instead of the try variants.
Thank you Alice. I could not make it work with futures::channel, therefore I moved to std::sync::mpsc::channel, which works well. Initially, I thought that it is not possible to mix sync channels with async code.
Using the std channels is the sort of thing that appears to work in simple cases, just like e.g. std::thread::sleep, but both of these will completely destroy your performance if used in async code when you start running more tasks at once.