How to run fallible async functions in parallel, returning on the first error?

I have two async functions:

async fn renderer(opt: Opt, mut rx: mpsc::Receiver<Command>) -> eyre::Result<()> { ... }
async fn listener(opt: Opt, tx: mpsc::Sender<Command>) -> eyre::Result<()> { ... }

I need them to run in parallel, so I use tokio::spawn:

#[tokio::main]
async fn main() -> Result<(), eyre::Report> {
    color_eyre::install()?;
    let opt = Opt::from_args();
    let (tx, rx): (mpsc::Sender<Command>, _) = mpsc::channel(100);
    let renderer = tokio::spawn(renderer(opt.clone(), rx));
    let listener = tokio::spawn(listener(opt.clone(), tx));

I want to exit the program as soon as either renderer or listener returns an Err, but I can't work out how to do so.

I've tried using tokio::try_join! like this:

    match tokio::try_join!(renderer, listener) {
        Ok(val) => {
            println!("success: {:?}", val);
            Ok(())
        }
        Err(val) => {
            println!("failure: {:?}", val);
            Err(val.into())
        }
    }

but that prints "success" with a tuple of two errors after it (for the sake of testing, both functions currently return an error), so I think if only one of the functions errors out, the other will keep running.

I've tried using tokio::select! like this:

    tokio::select! {
        result = renderer.await => {
            result?
        }
        result = listener.await => {
            result?
        }
    }

but I get error messages I don't understand from the compiler, no matter where I put the question marks:

error[E0599]: no method named `poll` found for struct `Pin<&mut std::result::Result<std::result::Result<(), ErrReport>, JoinError>>` in the current scope
   --> src/main.rs:336:5
    |
336 | /     tokio::select! {
337 | |         result = renderer.await => {
338 | |             result?
339 | |         }
...   |
342 | |         }
343 | |     }
    | |_____^ method not found in `Pin<&mut std::result::Result<std::result::Result<(), ErrReport>, JoinError>>`

It's been a while since I last touched this project, so I've forgotten most of the things I read about Rust async. What should I be doing to run these functions in parallel, and why isn't what I'm doing currently working?

This is because, the return type of something you spawn is wrapped in Result<T, JoinError> in case the task panics or is aborted. To fix this, define a helper:

async fn flatten<T>(handle: JoinHandle<Result<T, SomeError>>) -> Result<T, SomeError> {
    match handle.await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(err)) => Err(err),
        Err(err) => Err(err.into()),
    }
}

You can now use try_join!.

match tokio::try_join!(flatten(renderer), flatten(listener)) {
    Ok(val) => {
        println!("success: {:?}", val);
        Ok(())
    }
    Err(val) => {
        println!("failure: {:?}", val);
        Err(val.into())
    }
}

Regarding tokio::select!, you should not include the call to .await in the selection. The macro automatically awaits them.

1 Like

Ahh, right – that's because JoinHandle implements Future like this, right?

impl<T> Future for JoinHandle<T> {
    type Output = super::Result<T>; // elsewhere: pub(crate) type Result<T> = std::result::Result<T, JoinError>;

Thanks! In that case (given I don't actually need to match on the result, I can just return it from main) I think this also works:

    tokio::select! {
        result = renderer => result??,
        result = listener => result??,
    }
    Ok(())

(I think I could alternatively Ok(tokio::select! { ... }), but that seems to make clippy unhappy)

That select would also kill the other task if it completes successfully.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.