Joining Tokio futures across thread boundary

main:

let arcmut = Arc::new(Mutex::new(Vec::new()));

separate task:

tokio::spawn( async move {
    let futures = arcmut.lock().unwrap();
    let results = join_all(futures).await;
});

Error message MutexGuard<Vec<JoinHandle<Vec<T>>>> is not an iterator, as well as a note that MutexGuard is not send.

What is the recommended way to get results?

Posted in another thread, which suggested tokio::sync::Mutex (which is Send), but still getting same error messages and unclear whether it will work for futures coming from task::spawn_blocking that need to move across threads. The other suggestion there was to use std::mem::replace to replace the Vec with an empty Vec and drop the mutex guard before the await, but, struggling to understand why this would work and have not been able to make it work either.

What does the code look like that you’re using to append futures to the vec?

let fut = task::spawn(encode_frame(frame)));
arcmut.lock().unwrap().push(fut);

Using mem::replace should indeed work if you make sure to drop the Mutex guard before the await. This is because you are not allowed to keep an ordinary Mutex locked across an await. Note that you have to actually make the Mutex guard go out of scope - it's not enough to use drop due to how the compiler detects what variables are live at an await.

As for the recommendation of Tokio's Mutex, it is somewhat relevant as you are allowed to keep it locked across an await, but it is not the solution you want in this case.

1 Like

To show what @alice is talking about. Here's an example of code that does that.

#[tokio::main]
async fn main() {
    let arcmut = std::sync::Arc::new(std::sync::Mutex::new(vec![
        futures::future::ready(1),
        futures::future::ready(2),
    ]));

    tokio::spawn(async move {
        let mut futures = {
            let mut guard = arcmut.lock().unwrap();
            std::mem::take(&mut *guard)
        };
        let results = futures::future::join_all(futures.iter_mut()).await;
        println!("results = {:?}",results)
    });
}

Playground Link

2 Likes

Note that since the vector contains join handles, I do not recommend using join_all. Just loop through the vector and await the handles sequentially.

There's no reason to pay for concurrency twice.

3 Likes

Okay super awesome that this all worked. Thank you both: don't think would have figured this out without the example and using a for loop is indeed far more performant. Unsure which to mark as solution, though.

Would you mind explaining how this works? Understood dropping MutexGuard since it can't be sent across threads, but why is it .await that causes this since we are already in a separate async thread? Previously was using std::mem::drop, but, as mentioned above, clearly not understanding how the compiler works with await.

Also curious why using join_all leads to doubling concurrency cost, but happy to close without an answer.

Without forcing it to drop, the lock is held until join_all.await returns. When you call await, the task is effectively paused until something wakes it again. When its woken, it might be moved to another thread to resume its work. This requires any associated variables to be Send. The forced drop means you are only taking the lock on whichever thread the async task is currently running on, just long enough to take the contents of the underlying Vec so it doesn't need to be Send.

I don't understand the inner workings of async well enough to give you a good answer for your last question.

1 Like

Async/await makes use of what's called cooperative scheduling, which means that the tasks themselves decide when to yield control to other tasks. In the case of async/await, this happens only at .await points, which means that values that never exist at an .await point are never moved across threads.

Note that this also means that code spending a lot of time without reaching an .await will prevent other tasks from running. This is called "blocking the thread".

Regarding join_all, its implementation is rather poor, and has quadratic complexity in the number of futures. Concurrency by spawning is much cheaper.

2 Likes

This is extremely useful. So much so that rethinking use of task::spawn(encode_frame(frame))) since they return JoinHandle.

Saw that tokio-util includes a module called codec that provides utilities for encoding/decoding to frame streams. The encode_frame fn above just returns a raw frame Vec<u8>. What would be the benefits to refactoring to conform to codec utilities so that it goes from Bytes -> Stream instead of u8 -> JoinHandle?

You should probably explain in more detail what encode_frame does. It doesn't appear anywhere in this thread.

Thought otherwise, apologize. Right now it's compression, specifically a wrapper around flate2.

Have:

async pub fn enc(frame: Wrapper<Vec<u8>>, sink: Vec<u8>) -> Vec<u8> {
      let frame = tokio::task::spawn_blocking(move || {

            let buf = do_preprocessing(frame);

            encode_with_flate(
                   ..
                   &sink 
                   &buf
            );
      
            sink 
     }).await.unwrap();
    
     frame
}

Reason for using spawn_blocking is that work is CPU bound.

Once processed, should be sent over the network; but, experimenting on how to do this..

Using codec for this does indeed seem to be a good idea. You do not need to spawn_blocking it if it's just a small operation, which I assume the flate thing is (assuming that frames are not massive).

Does the flate thing care about the size of chunks?

Does the flate thing care about the size of chunks?

Spent a little while looking into this, and think the answer is no. Not totally sure though. Edit: To get a simple example working where returning a stream, okay to assume answer is no for now.

assuming that frames are not massive

Not exactly sure what massive means, but for some context, current example each frame capacity in hundred-thousands.

Have you checked the async_compression crate?

Looked into the stream::DeflateEncoder, which seem like it could be useful and does appear to be similar to Tokio's codec. Will definitely give this a look if unable to get codec to work, but, since a lot of DeflateEncoder is in macros that don't understand yet, and the encoding logic seemed fairly similar to codec, think it's probably better to stick with the codec traits for now.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.