Sending data over channels with async_std

Hello! I'm trying to send data over channels to async tasks. I'm sure I'm doing something wrong but no idea how to make it work.

use async_std::task;
use futures::channel::mpsc::{channel, Receiver};

type Error = Box<(dyn std::error::Error + Send + Sync + 'static)>;

async fn generator() -> Result<Receiver<u32>, Error> {
    let (mut tx, rx) = channel(0);

    let nums = vec![3, 6, 2, 7, 5, 2, 5, 8, 4, 2, 5, 7];
    task::spawn(async move {
        for num in nums {
            tx.try_send(num)
                .expect("Could not send the generated number over the channel")
        }
    });
    Ok(rx)
}

async fn fan_out(mut rx_gen: Receiver<u32>) -> Result<(), Error> {
    let mut tasks = vec![];
    while let Some(num) = rx_gen.try_next()? {
        let task = task::spawn(async move { println!("the received number is {}", num) });
        tasks.push(task);
    }

    for task in tasks.into_iter() {
        task.await;
    }

    Ok(())
}

#[async_std::main]
async fn main() -> Result<(), Error> {
    fan_out(generator().await?).await?;

    Ok(())
}

I'm getting:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target/debug/test_fanout`
thread 'Error: async-std/executorTryRecvError' panicked at '
Could not send the generated number over the channel: TrySendError { kind: Disconnected }', src/main.rs:12:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Your fan_out method calls try_next, which immediately returns None, as the thing you spawned in generator as not had time to send any messages yet. The fan_out function then skips the for loop, as tasks is empty, and then it returns.

Once it returns, the receiver is dropped, causing a panic in the spawned task, as try_send returns an error if the receiver has been disconnected.

You should probably be using send and recv instead of the try variants.

Thank you Alice. I could not make it work with futures::channel, therefore I moved to std::sync::mpsc::channel, which works well. Initially, I thought that it is not possible to mix sync channels with async code.

That's a really really bad idea because you are blocking the thread. You should definitely be using the async channel.

Your code will grind to a halt once you have more tasks than you have cpu cores on your system.

1 Like

You have to import the StreamExt and SinkExt traits to use the channels like that:

use async_std::task;
use futures::channel::mpsc::{channel, Receiver};
use futures::stream::StreamExt;
use futures::sink::SinkExt;

type Error = Box<(dyn std::error::Error + Send + Sync + 'static)>;

async fn generator() -> Result<Receiver<u32>, Error> {
    let (mut tx, rx) = channel(0);

    let nums = vec![3u32, 6, 2, 7, 5, 2, 5, 8, 4, 2, 5, 7];
    task::spawn(async move {
        for num in nums {
            tx.send(num).await
                .expect("Could not send the generated number over the channel")
        }
    });
    Ok(rx)
}

async fn fan_out(mut rx_gen: Receiver<u32>) -> Result<(), Error> {
    let mut tasks = vec![];
    while let Some(num) = rx_gen.next().await {
        let task = task::spawn(async move { println!("the received number is {}", num) });
        tasks.push(task);
    }

    for task in tasks.into_iter() {
        task.await;
    }

    Ok(())
}

#[async_std::main]
async fn main() -> Result<(), Error> {
    fan_out(generator().await?).await?;

    Ok(())
}

Using the std channels is the sort of thing that appears to work in simple cases, just like e.g. std::thread::sleep, but both of these will completely destroy your performance if used in async code when you start running more tasks at once.

Thanks again Alice, you saved my day. I was not aware of the StreamExt and SinkExt traits.

To see why the sync channel is bad, consider this code:

use async_std::task::{self, JoinHandle};
use std::time::Duration;
use std::sync::mpsc::channel;

type Error = Box<(dyn std::error::Error + Send + Sync + 'static)>;

fn spawn_pair(i: i32, handles: &mut Vec<JoinHandle<()>>) {
    let (tx, rx) = channel();

    let h1 = task::spawn(async move {
        let num = rx.recv().unwrap();
        println!("the received number is {}", num);
    });

    let h2 = task::spawn(async move {
        task::sleep(Duration::from_secs(1)).await;
        println!("sending {}", i);
        tx.send(i).unwrap();
    });

    handles.push(h1);
    handles.push(h2);
}

#[async_std::main]
async fn main() -> Result<(), Error> {
    let mut handles = Vec::new();
    for i in 0..100 {
        spawn_pair(i, &mut handles);
    }

    for handle in handles {
        handle.await;
    }

    Ok(())
}

This would work fine with async channels, but using std channels it will deadlock and never complete.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.