Hello everyone, when i spawn tasks asynchronously i noticed that after a number of tasks, the last ones, run after a task is finished. probably tokio uses a queue to run tasks, how can i tell tokio that i want to spawn tasks as soon as they a new one exists, without waiting..
the code
use std::time::Instant;
use tokio::net::TcpListener;
use tokio::io::Result;
use tokio::io::BufReader;
use tokio::io::AsyncReadExt;
use mixed::time;
#[tokio::main]
async fn main() -> Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:2020").await?;
println!("[ {} ] Waiting for files..", console::style("START").green().bold());
loop {
let (stream, addr) = listener.accept().await?;
let now = Instant::now();
println!(
"[ {} ] Processing file from {}",
console::style("NEW").green(),
console::style(addr).yellow()
);
tokio::spawn(async move {
let mut reader = BufReader::new(stream);
let mut buf = [0u8; 8 * 1024];
let mut _nb_alphas = 0;
while let Ok(len) = reader.read(&mut buf).await {
if len == 0 { break }
for b in &buf[..len] {
if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
_nb_alphas += 1;
}
}
}
time!(now, addr);
});
}
}
the output
[ START ] Waiting for files..
[ NEW ] Processing file from 127.0.0.1:42836
[ NEW ] Processing file from 127.0.0.1:42838
[ NEW ] Processing file from 127.0.0.1:42828
[ NEW ] Processing file from 127.0.0.1:42832
[ NEW ] Processing file from 127.0.0.1:42834
[ NEW ] Processing file from 127.0.0.1:42830
[ NEW ] Processing file from 127.0.0.1:42870
[ NEW ] Processing file from 127.0.0.1:42872
[ 42838 ] FINISH after 6.248263595 secs 6248263595 nanos
[ NEW ] Processing file from 127.0.0.1:42874
[ NEW ] Processing file from 127.0.0.1:42876
[ 42836 ] FINISH after 6.255094505 secs 6255094505 nanos
[ 42832 ] FINISH after 6.257284565 secs 6257284565 nanos
[ 42828 ] FINISH after 6.268471828 secs 6268471828 nanos
[ 42872 ] FINISH after 12.476897331 secs 12476897331 nanos
[ 42830 ] FINISH after 12.48180191 secs 12481801910 nanos
[ 42834 ] FINISH after 12.49408727 secs 12494087270 nanos
[ 42870 ] FINISH after 12.526076067 secs 12526076067 nanos
[ 42876 ] FINISH after 12.445826284 secs 12445826284 nanos
[ 42874 ] FINISH after 12.462071506000001 secs 12462071506 nanos
Tokio doesn't have some sort of queue where tasks wait for others to finish — this kind of behaviour is an indication that you are blocking in async code. For some intuition for what it means to block in async code: If it is not near-instant to go from one .await to another, you are blocking.
I can't quite tell what is causing the blocking in this case. What does your time! macro do? Are you perhaps not posting the correct snippet?
i changed the code to use async_std so i can compare the results,
use std::time::Instant;
use async_std::task;
use async_std::net::TcpListener;
use async_std::io::Result;
use async_std::io::BufReader;
use async_std::io::ReadExt;
use console::style;
use mixed::time;
#[async_std::main]
async fn main() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:2020").await?;
println!("[ {} ] Waiting for files..", style("START").green().bold());
loop {
let (stream, addr) = listener.accept().await?;
let now = Instant::now();
task::spawn(async move {
let mut reader = BufReader::new(stream);
let mut buf = [0u8; 8*1024];
let mut _nb_alphas = 0usize;
while let Ok(len) = reader.read(&mut buf).await {
if len == 0 { break }
for b in &buf[..len] {
if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
_nb_alphas += 1;
}
}
}
time!(now, addr);
});
}
}
the output was
[ START ] Waiting for files..
[ 43176 ] FINISH after 11.002829602 secs 11002829602 nanos
[ 43164 ] FINISH after 11.038157373 secs 11038157373 nanos
[ 43174 ] FINISH after 11.097669984 secs 11097669984 nanos
[ 43182 ] FINISH after 11.137937226 secs 11137937226 nanos
[ 43162 ] FINISH after 11.178168638 secs 11178168638 nanos
[ 43178 ] FINISH after 11.205169906 secs 11205169906 nanos
[ 43166 ] FINISH after 11.322661753 secs 11322661753 nanos
[ 43180 ] FINISH after 11.380813708 secs 11380813708 nanos
[ 43184 ] FINISH after 17.319071485 secs 17319071485 nanos
[ 43186 ] FINISH after 17.346556001 secs 17346556001 nanos
the files here are of equal size 200mb and are the exact same one i used in tokio above, and sent with the exact order, the time difference is big ~5 seconds between tokio and async_std in debug mode, but i also noticed that the last files took longer, when i sent these files alone, they took only ~6 seconds..
Ah... I know why. The data is sent over localhost, so it arrives fast enough for your loop to always have data ready when it asks for it in your while loop. This means that even though you have an .await on the call to read, it never actually yields. Testing locally, this will complete all the files simultaneously:
while let Ok(len) = reader.read(&mut buf).await {
if len == 0 { break }
for b in &buf[..len] {
if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
_nb_alphas += 1;
}
}
tokio::time::delay_for(tokio::time::Duration::from_millis(10)).await;
}
There is some work in Tokio on detecting always-ready tasks like this one and yielding, but it is ongoing, and I am unsure on the current state.
@alice i added delay_for inside the while loop, but it makes it really slow, i also tried from_nanos but still too slow, i changed the output a bit, to make sure that the last task actually spawned after a spot is available (maybe await is done blocking..)
and this is the new output (notice [ 43558 ] Processing started after 6.220336009 secs)
[ START ] Waiting for files..
[ 43552 ] Processing started after 0.000162125 secs
[ 43550 ] Processing started after 0.000819948 secs
[ 43560 ] Processing started after 0.000225599 secs
[ 43556 ] Processing started after 0.000189614 secs
[ 43552 ] 6.220525728 secs 6220525728 nanos
[ 43558 ] Processing started after 6.220336009 secs
[ 43550 ] 6.25330095 secs 6253300950 nanos
[ 43560 ] 6.258060068 secs 6258060068 nanos
[ 43556 ] 6.269005652 secs 6269005652 nanos
[ 43558 ] 12.210194317 secs 12210194317 nanos
You can try to run the program with this main, which shows the percentage of every connection as it runs. Remember to adjust the TOTAL_LEN variable, which is the file size.
#[tokio::main]
async fn main() -> Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:2020").await?;
println!("[ {} ] Waiting for files..", console::style("START").green().bold());
use std::sync::{mpsc, atomic::{AtomicU32, Ordering}, Arc};
let (send, recv) = mpsc::channel();
std::thread::spawn(move || {
let mut counters: Vec<Arc<AtomicU32>> = vec![];
const TOTAL_LEN: u32 = 20000000;
loop {
std::thread::sleep(std::time::Duration::from_millis(100));
while let Ok(counter) = recv.try_recv() {
counters.push(counter);
}
for counter in &counters {
print!("{:>3} ", counter.load(Ordering::Relaxed) * 100 / TOTAL_LEN);
}
println!();
}
});
loop {
let (stream, addr) = listener.accept().await?;
let now = Instant::now();
println!(
"[ {} ] Processing file from {}",
console::style("NEW").green(),
console::style(addr).yellow()
);
let counter = Arc::new(AtomicU32::new(0));
send.send(counter.clone()).unwrap();
tokio::spawn(async move {
let mut reader = BufReader::new(stream);
let mut buf = [0u8; 1024];
let mut _nb_alphas = 0;
while let Ok(len) = reader.read(&mut buf).await {
if len == 0 { break }
for b in &buf[..len] {
if matches!(*b, b'a'..=b'z' | b'A'..=b'Z') {
_nb_alphas += 1;
}
}
counter.fetch_add(len as u32, Ordering::Relaxed);
tokio::task::yield_now().await;
}
time!(now, addr);
});
}
}