Can't exit after run all codes

I want to run parallel many calculations by thread and channel and I have complete my code as below ; it seems work but can't exit after run all code.

use std::sync::mpsc;
use std::thread;
use std::error::Error;

async fn run_once(
    unique_num: f32,
) -> f32 {
    println!("lines_{}", unique_num);
    let mut x = 0;
    for i in 0..(10.0_f32.powf(unique_num) as i64) {
        x = x + i;
    }
    println!("unique_num: {}, sum: {}", unique_num, x);
    unique_num
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let t = std::time::Instant::now();
    let (tx, rx) = mpsc::channel();
    let unique_num_list = vec![8.0, 2.0, 3.0, 1.0, 8.0];
    let mut handles = vec![];
    for uni in unique_num_list {
        let txi = tx.clone();
        let handlei = thread::spawn(move || {
            let rt = tokio::runtime::Runtime::new().unwrap();
            rt.block_on(async {
                let re = run_once(uni).await;
                txi.send(re).unwrap();
            });
        });
        handles.push(handlei);
    }

    for ri in rx {
        println!("Got: {}", ri);
    }
    for hdi in handles {
        hdi.join().unwrap();
    }

    dbg!(t.elapsed());
    Ok(())
}

Your threads are executing and sending values to tx correctly, but you've got a problem in the for ri in rx loop.

The way a channel receiver's IntoIterator implementation works is that it'll keep waiting for more values and yielding them until all send handles are dropped. The txi passed into each thread gets dropped when that thread finishes, which is fine, but the problem is our original tx is still alive when we start for ri in rx. That means the loop will never end because there's always going to be a live tx handle.

What you want to do is drop tx after the for uni in unique_num_list loop (updated version on the playground).

Also keep in mind that starting a tokio runtime is pretty expensive and it will create its own thread pool in the background, so I would expect the code to be a lot slower than if we dropped the async and just ran run_once() directly from each thread. For example, on the playground it ran in 4.19 seconds with the tokio runtime and 3.6 seconds without it.

Hi @Michael-F-Bryan

Thanks for your explaination and I am clear about this problem.

the code to be a lot slower than if we dropped the async and just ran run_once() directly from each thread.
Yes I note this but this is just a demo and in my real code run_once must be async because of 'request web data' by await; so we must use async run_once, yes?

If your real code is async, I would skip the threads and do something like this:

use futures::stream::StreamExt;

async fn run_once(...) { ... }

#[tokio::main]
fn main() {
  let unique_num_list = vec![8.0, 2.0, 3.0, 1.0, 8.0];
 
  let results: Vec<_> = futures::stream::iter(unique_num_list)
    .and_then(|n| run_once(n))
    .collect()
    .await;

  for result in results {
    println!("{result}");
  }
}

Depending on your real use case, you could skip the collect() and do a for_each() to consume the results.

You should also make sure expensive, compute-bound code is executed with something like tokio::task::spawn_blocking() so you don't lock up your runtime.

2 Likes

you mean I shoul use futures::stream to replace thread::spawn ?

In the fact I had completed it by futures::stream before, but I find it needs more time; I guess there are too much pure calculations in run_once, so I switch to thread::spawn.

It's not perfect, but a good rule of thumb is

  • if you have a sequence of things and want to run an async function on each item, you should use futures::stream
  • if your async function contains something that takes a long time (e.g. pure calculations), use something like tokio::task::spawn_blocking() to run that code in the background on a thread pool dedicated for expensive, blocking operations.

Running something that blocks (e.g. pure calculations) inside an async function is a bit of a code smell because it will block the main async runtime from servicing any other tasks.

This includes waiting on values from a std::sync::mpsc channel, by the way because it puts the current thread to sleep until a value arrives. If you want to read from a channel in an async function you should use the async-aware futures::channel::mpsc channel.

2 Likes

Hi @Michael-F-Bryan
Thank you for your help;
I am very interesting about what you advice above(use tokio::task::spawn_blocking()), could you give me more details about this. my code is like below:

async fn process_once(web_data: Vec<f64>) {
    // A(consume 15 seconds): many codes here, including load data from local and their calculations; 

    // B(consume 1 seconds): here use `web_data` and request other `web data`

    // C(consume 2 seconds): other calculations;

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let refer_list = vec![1, 2, 3, 4, 5];

    // here get `web_data` from web;
    let web_data = ...;
    
    let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
        zig_once(web_data.clone())
    }));
    futures.try_collect().await?;
    
    Ok(())
}

Part A, B, C have many variables which means many 'variables' should be use in A, B, and C; if I split them sperately, it would be very tedious I think and in the fact part A is very similar with __init__ function in 'Python Class'.

It might feel tedious, but running long-lived blocking calculations in your async function can end up causing your app to lock up because all of tokio's IO threads will be busy running the calculations instead of allowing all other async code to make progress.

Sometimes you can get away with this because you haven't saturated the IO threads or know there aren't any other futures running at the time, but that's not a guarantee. If you want to see for yourself, try modifying the #[tokio::main] attribute to use #[tokio::main(flavor = "current_thread")] instead.

For the record, you have the same problem in every other language that has explicit async/await. If you had a long-running calculation in JavaScript code it would cause the UI thread to lock up and your browser would show that "this page is unresponsive" prompt.

@alice wrote up a really nice article about blocking code and async.

In particular, she says:

If you remember only one thing from this article, this should be it:

Async code should never spend a long time without reaching an .await.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.