Read from multiple streams simultaneously using async/await

I'd like to read from two streams simultaneously using async. Here's an example:

use async_std::os::unix::net::UnixStream;
use async_std::io::BufReader;
use async_std::prelude::*;
use async_std::task;
use std::sync::Arc;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

// handle log connection
async fn read_log(id: &str) -> Result<()> {
    let mut stream = UnixStream::connect(format!("/logger{}.sock", id)).await?;

    let reader = BufReader::new(&stream);
    let mut lines = reader.lines();

    while let Some(line) = lines.next().await {
        let line = line.unwrap();
        println!("{}={}", id, line); // print or send to redis
    }

    Ok(())
}

// start 2 concurrent persistent connections
async fn start_logs() -> Result<()> {
    task::spawn(read_log("1"));
    task::spawn(read_log("2"));
    Ok(())
}

// start app
fn main() -> Result<()> {
    task::block_on(
        start_logs()
    )  
}

I'd like to simulate multiple threads where all write to the same stdout. In the example above, the program stops immediately and does not wait for tasks to complete. I could call task::spawn(read_log("1")).await; instead but it would never complete and start the task::spawn(read_log("2")). What's the right way to handle multiple streams concurrently using async/await?

Your program stops immediately because you don't wait for the two spawned tasks. You can do that by awaiting the JoinHandle that the spawn function returns.

async fn start_logs() -> Result<()> {
    let a = task::spawn(read_log("1"));
    let b = task::spawn(read_log("2"));
    a.await;
    b.await;
    Ok(())
}
1 Like

Yes, that's basically what I wrote. It turned out I didn't have any logs in 2 logger and thus didn't see the output :).

The block_on function waits only for the provided tasks, and not for spawned tasks. Additionally returning from main always immediately kills the executable.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.