How can i make tokio spawn tasks at the same time

Hello everyone, when i spawn tasks asynchronously i noticed that after a number of tasks, the last ones, run after a task is finished. probably tokio uses a queue to run tasks, how can i tell tokio that i want to spawn tasks as soon as they a new one exists, without waiting..

the code

use std::time::Instant;

use tokio::net::TcpListener;
use tokio::io::Result;
use tokio::io::BufReader;
use tokio::io::AsyncReadExt;

use mixed::time;

#[tokio::main]
async fn main() -> Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:2020").await?;

    println!("[ {} ] Waiting for files..", console::style("START").green().bold());

    loop {
        let (stream, addr) = listener.accept().await?;
        let now = Instant::now();

        println!(
            "[  {}  ] Processing file from {}",
            console::style("NEW").green(),
            console::style(addr).yellow()
        );

        tokio::spawn(async move {
            let mut reader = BufReader::new(stream);
            let mut buf = [0u8; 8 * 1024];
            let mut _nb_alphas = 0;

            while let Ok(len) = reader.read(&mut buf).await {
                if len == 0 { break }

                for b in &buf[..len] {
                    if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
                        _nb_alphas += 1;
                    }
                }
            }

            time!(now, addr);
        });
    }
}

the output

[ START ] Waiting for files..
[  NEW  ] Processing file from 127.0.0.1:42836
[  NEW  ] Processing file from 127.0.0.1:42838
[  NEW  ] Processing file from 127.0.0.1:42828
[  NEW  ] Processing file from 127.0.0.1:42832
[  NEW  ] Processing file from 127.0.0.1:42834
[  NEW  ] Processing file from 127.0.0.1:42830
[  NEW  ] Processing file from 127.0.0.1:42870
[  NEW  ] Processing file from 127.0.0.1:42872
[ 42838 ] FINISH after     6.248263595  secs   6248263595 nanos
[  NEW  ] Processing file from 127.0.0.1:42874
[  NEW  ] Processing file from 127.0.0.1:42876
[ 42836 ] FINISH after     6.255094505  secs   6255094505 nanos
[ 42832 ] FINISH after     6.257284565  secs   6257284565 nanos
[ 42828 ] FINISH after     6.268471828  secs   6268471828 nanos
[ 42872 ] FINISH after     12.476897331 secs  12476897331 nanos
[ 42830 ] FINISH after     12.48180191  secs  12481801910 nanos
[ 42834 ] FINISH after     12.49408727  secs  12494087270 nanos
[ 42870 ] FINISH after     12.526076067 secs  12526076067 nanos
[ 42876 ] FINISH after     12.445826284 secs  12445826284 nanos
[ 42874 ] FINISH after     12.462071506000001 secs  12462071506 nanos
1 Like

Tokio doesn't have some sort of queue where tasks wait for others to finish — this kind of behaviour is an indication that you are blocking in async code. For some intuition for what it means to block in async code: If it is not near-instant to go from one .await to another, you are blocking.

I can't quite tell what is causing the blocking in this case. What does your time! macro do? Are you perhaps not posting the correct snippet?

@alice i have only two files, one is main.rs and the content of it is on top, and lib.rs which contains time! macro

#[macro_export]
macro_rules! time {
    ($now:expr, $addr:expr) => {
        let elapsed = $now.elapsed();

        println!(
            "[ {} ] {} after     {:<12} {} {:>12} {}",
            console::style($addr.port()).dim(),
            console::style("FINISH").red(),
            console::style(elapsed.as_secs_f64()).green(),
            console::style("secs").bold(),
            console::style(elapsed.as_nanos()).green(),
            console::style("nanos").bold()
        );
    }
}

i tried to run with --release flag, but i received the same output, the 5 task onward is double the time, the previous files took.

just for info, i did split a big file 1GB into equal sizes of 200mb and i send them to this program using socat

i changed the code to use async_std so i can compare the results,

use std::time::Instant;

use async_std::task;
use async_std::net::TcpListener;
use async_std::io::Result;
use async_std::io::BufReader;
use async_std::io::ReadExt;

use console::style;

use mixed::time;

#[async_std::main]
async fn main() -> Result<()> {
    let listener = TcpListener::bind("127.0.0.1:2020").await?;

    println!("[ {} ] Waiting for files..", style("START").green().bold());

    loop {
        let (stream, addr) = listener.accept().await?;
        let now = Instant::now();

        task::spawn(async move {
            let mut reader = BufReader::new(stream);
            let mut buf = [0u8; 8*1024];
            let mut _nb_alphas = 0usize;

            while let Ok(len) = reader.read(&mut buf).await {
                if len == 0 { break }

                for b in &buf[..len] {
                    if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
                        _nb_alphas += 1;
                    }
                }
            }

            time!(now, addr);
        });
    }
}

the output was

[ START ] Waiting for files..
[ 43176 ] FINISH after     11.002829602 secs  11002829602 nanos
[ 43164 ] FINISH after     11.038157373 secs  11038157373 nanos
[ 43174 ] FINISH after     11.097669984 secs  11097669984 nanos
[ 43182 ] FINISH after     11.137937226 secs  11137937226 nanos
[ 43162 ] FINISH after     11.178168638 secs  11178168638 nanos
[ 43178 ] FINISH after     11.205169906 secs  11205169906 nanos
[ 43166 ] FINISH after     11.322661753 secs  11322661753 nanos
[ 43180 ] FINISH after     11.380813708 secs  11380813708 nanos
[ 43184 ] FINISH after     17.319071485 secs  17319071485 nanos
[ 43186 ] FINISH after     17.346556001 secs  17346556001 nanos

the files here are of equal size 200mb and are the exact same one i used in tokio above, and sent with the exact order, the time difference is big ~5 seconds between tokio and async_std in debug mode, but i also noticed that the last files took longer, when i sent these files alone, they took only ~6 seconds..

Ah... I know why. The data is sent over localhost, so it arrives fast enough for your loop to always have data ready when it asks for it in your while loop. This means that even though you have an .await on the call to read, it never actually yields. Testing locally, this will complete all the files simultaneously:

while let Ok(len) = reader.read(&mut buf).await {
    if len == 0 { break }

    for b in &buf[..len] {
        if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
            _nb_alphas += 1;
        }
    }

    tokio::time::delay_for(tokio::time::Duration::from_millis(10)).await;
}

There is some work in Tokio on detecting always-ready tasks like this one and yielding, but it is ongoing, and I am unsure on the current state.

@alice i added delay_for inside the while loop, but it makes it really slow, i also tried from_nanos but still too slow, i changed the output a bit, to make sure that the last task actually spawned after a spot is available (maybe await is done blocking..)

and this is the new output (notice [ 43558 ] Processing started after 6.220336009 secs)

[ START ] Waiting for files..
[ 43552 ] Processing started after  0.000162125 secs
[ 43550 ] Processing started after  0.000819948 secs
[ 43560 ] Processing started after  0.000225599 secs
[ 43556 ] Processing started after  0.000189614 secs
[ 43552 ] 6.220525728  secs   6220525728 nanos
[ 43558 ] Processing started after  6.220336009 secs
[ 43550 ] 6.25330095   secs   6253300950 nanos
[ 43560 ] 6.258060068  secs   6258060068 nanos
[ 43556 ] 6.269005652  secs   6269005652 nanos
[ 43558 ] 12.210194317 secs  12210194317 nanos

You can use yield_now instead of sleeping:

tokio::task::yield_now().await;

It doesn't quite give as equal timings as the sleep, but once the data stops coming from localhost at super speed, it should be quite good.

1 Like

You can try to run the program with this main, which shows the percentage of every connection as it runs. Remember to adjust the TOTAL_LEN variable, which is the file size.

#[tokio::main]
async fn main() -> Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:2020").await?;

    println!("[ {} ] Waiting for files..", console::style("START").green().bold());

    use std::sync::{mpsc, atomic::{AtomicU32, Ordering}, Arc};

    let (send, recv) = mpsc::channel();
    std::thread::spawn(move || {
        let mut counters: Vec<Arc<AtomicU32>> = vec![];
        const TOTAL_LEN: u32 = 20000000;
        loop {
            std::thread::sleep(std::time::Duration::from_millis(100));
            while let Ok(counter) = recv.try_recv() {
                counters.push(counter);
            }
            for counter in &counters {
                print!("{:>3} ", counter.load(Ordering::Relaxed) * 100 / TOTAL_LEN);
            }
            println!();
        }
    });

    loop {
        let (stream, addr) = listener.accept().await?;
        let now = Instant::now();

        println!(
            "[  {}  ] Processing file from {}",
            console::style("NEW").green(),
            console::style(addr).yellow()
        );

        let counter = Arc::new(AtomicU32::new(0));
        send.send(counter.clone()).unwrap();

        tokio::spawn(async move {
            let mut reader = BufReader::new(stream);
            let mut buf = [0u8; 1024];
            let mut _nb_alphas = 0;

            while let Ok(len) = reader.read(&mut buf).await {
                if len == 0 { break }

                for b in &buf[..len] {
                    if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
                        _nb_alphas += 1;
                    }
                }

                counter.fetch_add(len as u32, Ordering::Relaxed);

                tokio::task::yield_now().await;
            }

            time!(now, addr);
        });
    }
}
1 Like

@alice is tokio::task::yield_now().await cause any delay when processing files?

1 Like

The yield_now function will merely yield control to the executor once to allow other tasks time to run. It will not perform any sleeping.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.