Struggling porting sync code to tokio runtime

I just started experimenting with async Rust and I'm having a bit of trouble with it.

I have the following piece of code which runs synchronously on a single thread:

use tokio::runtime::Runtime;
use tokio::sync::mpsc::{Receiver, Sender};

struct CompletionItem {
    // ...
}

type Completions = Vec<CompletionItem>;

trait CompletionSource {
    fn complete(&self) -> Completions;
}

type Sources = Vec<Arc<dyn CompletionSource>>;

struct State {
    completions: Completions,
    sources: Sources,

    runtime: Runtime,
    tx: Arc<Sender<Completions>>,
    rx: Receiver<Completions>,
}

// I can't make this an `async fn`.
fn has_completions(state: &mut State) -> bool {
    let completions = &mut state.completions;
    completions.clear();
    for source in &state.sources {
        completions.append(&mut source.complete());
    }
    !completions.is_empty()
}

I'd like to make this code multithreaded by using tokio's Runtime thread pool. I modified the has_completions function to:

fn has_completions(state: &mut State) -> bool {
    let runtime = &state.runtime;
    let tx = &state.tx;
    let rx = &mut state.rx;

    let completions = &mut state.completions;
    completions.clear();

    runtime.block_on(async {
        for source in &state.sources {
            let s = source.clone();
            let t = tx.clone();
            let _handle = runtime.spawn(async move {
                let comps = s.complete().await;
                if let Err(_) = t.send(comps).await {
                    println!("receiver dropped");
                    return;
                }
            });
        }

        // This awaits forever.
        while let Some(comps) = &mut rx.recv().await {
            completions.append(comps);
        }
    });

    !completions.is_empty()
}

and I also made the complete function of the CompletionSource trait an async fn so that I can .await it.

Unfortunately just blocks forever because rx.recv().await keeps listening for new messages.

What I'd like to accomplish is:

  1. the completion items are computed concurrently by running s.complete() on separate threads;
  2. every time a s.complete() finishes executing, the returned Completions are appended to the completions vector;
  3. when all the sources have finished executing I return the value of !completions.is_empty().

Am I even supposed to use channels here? I know streams are the async cousins of iterators, are those a better option?

Thanks a lot.

Are you using tokio channels?

Yes

Since your complete function is not async, I assume that it performs a blocking CPU-bound computation (read here for why I conclude this). Therefore, the following quote on Tokio's website applies:

When not to use Tokio

Speeding up CPU-bound computations by running them in parallel on several threads. Tokio is designed for IO-bound applications where each individual task spends most of its time waiting for IO. If the only thing your application does is run computations in parallel, you should be using rayon. That said, it is still possible to "mix & match" if you need to do both.

source

As described above, you should be using rayon for this. You can implement your desired operation in parallel in the following way:

use rayon::prelude::*;

struct State {
    completions: Completions,
    sources: Sources,
}

fn complete_all(state: &mut State) {
    let par_iter = self.sources
        .par_drain(..)
        .map(|source| source.complete())
        .flatten();
    
    self.completions.par_extend(par_iter);
}

The rayon library takes care of all of the parallelization for you.

Explanation for why this works:

  1. The par_drain function returns a parallel iterator that takes ownership of all items in self.sources, leaving the vector empty at the end.
  2. The map operation tells the parallel iterator what should happen with each item in the sequence. It returns a new parallel iterator whose item type is Vec<CompletionItem>.
  3. The flatten_iter operation will flatten the vectors so that the resulting item type of the parallel iterator is CompletionItem.
  4. The par_extend operation takes the parallel iterator and pushes all of the items to the self.completions vector.

Note that I used flatten_iter rather than flatten because I did not need to perform any parallel operations on the flattened items, and flatten_iter is faster when that is not necessary.

Note that I used par_drain rather than into_par_iter because into_par_iter destroys the vector it is called on. I used par_drain rather than par_iter to take ownership of the items. If you don't want self.sources to be empty at the end, then you should be able to use par_iter instead.

If my assumption that you didn't need to do anything parallel was wrong, then let me know and we can discuss how to parallelize async operations.

1 Like

Hi Alice, thanks for your answer.

complete may be CPU or IO bound depending on who implements it, however it's safe to assume most implementations will be IO bound, that's why I turned it into a async fn complete(&self) -> Completions in the second example.

I wanted to reuse the threadpool of the tokio runtime without using rayon because in another part of the code I need to perform basically the same operation, without the constrait of having to block the main thread.

This is what I have now:

use futures::prelude::*;
use futures::stream::FuturesUnordered;

// -- snip --

fn has_completions(state: &mut State) -> bool {
    let runtime = &state.runtime;

    let completions = &mut state.completions;
    completions.clear();

    let mut handles = state
        .sources
        .iter()
        .map(|source| {
            let s = source.clone();
            runtime.spawn(async move { s.complete().await })
        })
        .collect::<FuturesUnordered<_>>();

    runtime.block_on(async {
        while let Some(Ok(mut res)) = handles.next().await {
            completions.append(&mut res);
        }
    });

    !completions.is_empty()
}

which I believe does everything I want. Is there something that could be improved with this approach?

It is better to spawn them on the runtime than to use FuturesUnordered. This is for several reasons: One is that FuturesUnordered doesn't allow for parallelism (they will all run on the same thread), and another is that even on a single thread, the performance of FuturesUnordered is worse than just spawning on an actual single-threaded runtime.

To implement this, just spawn them all, resulting in a Vec<JoinHandle<Completions>>. Then iterator over the vector, await each join handle, and append the results to the output vector.

Sidenote: You should use extend rather than append for adding them to your vector.

1 Like

I tested that with 2 sources by placing a std::thread::sleep(std::time::Duration::from_secs(5)) in the body of both their complete functions. The has_completions function ran in 5 seconds, not 10 as I would expect if both futures were running on the same thread.

Anyways, I turned the block_on block into

    runtime.block_on(async {
        for handle in handles {
            if let Ok(comps) = handle.await {
                completions.extend(comps);
            }
        }
    });

and it seems to work great. Thanks a lot!

Ah, I see. I didn't read your code closely enough. You are spawning them. You are just using FuturesUnordered<JoinHandle<Completions>> as a replacement for Vec<JoinHandle<Completions>>. In this case, your code will work, but is less efficient than just using Vec<JoinHandle<Completions>>.

I note that you are currently ignoring errors. You will get an error if one of the completions panics.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.