Propagating errors from tokio::tasks II

Hi all,

this topic is somewhat related to this topic (Propagating errors from tokio tasks).

Consider the (simplified) code block: A dedicated task is spawned for each service found in the vector. Because I do not simply want to unwrap and panic, I incorporated the error propagation & handling from the cited topic.

Consequently, I can react to the different errors (the "inner" one and the JoinError) and bubble them up if necessary.

Now to my question: Because each service tasks is (right now) running for "infinity", the loop - used for awaiting the future collection of the service tasks - will only be left in the error cases.

Consequently, the offer_services() "completes" only in that case as well. Blocking the program flow, if there are no errors at all.

Is there a way to use this error handling, without blocking the entire flow? And not using unwrap()s?

Thanks in advance

pub async fn offer_services(
    services: Vec<ServiceDescription>
) -> Result<(), ServiceDiscoveryError> {
    let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), ServiceDiscoveryError>>> =
        FuturesUnordered::new();

    for service in services.iter() {
        service_handlers.push(tokio::spawn(async move {
            // Currently runs for "infinity"
            loop {
                tokio::select! {
                    // Omitted: Listen for incoming packets, propagate ServiceDiscoveryError if an error occurs

                    // Omitted: Listen for timeouts, propagate ServiceDiscoveryError if an error occurs

                    // etc.
                }
            }
        }));
    }

    loop {
        match service_handlers.next().await {
            Some(result) => match result {
                // React to "inner_result", i.e. propagated ServiceDiscoveryError
                Ok(inner_result) => match inner_result {
                    Ok(()) => (),
                    Err(e) => return Err(e),
                },
                // React to JoinError
                Err(e) => return Err(ServiceDiscoveryError::JoinHandler(e)),
            },
            None => {
                debug!("service handlers are all done with work!");
                return Ok(());
            }
        }
    }
}

Instead of using a Vec to store the futures, you can use FuturesUnordered. This will return the tasks as they are finished instead of in the original order that they were created.

Typically the way you avoid blocking the flow is to spawn tasks. Then I suspect you will ultimately have some code that you want to interrupt if something fails. You could do that with a tokio::select! in the right place.

Also, you should consider aborting the other tasks if one of them fails.

I already use FuturesUnordered ;). The vector, which I mentioned, holds only different service descriptions. For each service description then I spawn a task, which is then pushed to the FuturesUnordered.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.