Await seems to be blocking

Hi, new rust user exploring async io functionality of rust.

goal: a toy scheduler that fires some lightweight tasks mainly waiting on IO completion.

use std::{time::Duration};

use async_std::task::sleep;

use chrono::prelude::*;

use std::sync::atomic::{AtomicU32, Ordering};

use std::thread;

use async_std::stream;

use async_std::prelude::*;

static COUNTER: AtomicU32 = AtomicU32::new(0);

const FIVESECOND: Duration = Duration::from_secs(5);

const TENSECOND: Duration = Duration::from_secs(10);

const TWOSECOND: Duration = Duration::from_secs(2);

async fn io_task(name: String, exec_time_delay: Duration)

{

    let starttime = get_current_time_string();

    COUNTER.fetch_add(1, Ordering::Relaxed);

    sleep(exec_time_delay).await;

    println!("{:?} : {} lasting {} seconds starts at {} and finished at {}", thread::current().id(),

                name, exec_time_delay.as_secs(), starttime, get_current_time_string());

}

fn get_current_time_string() -> String {

    Local::now().format("%H:%M:%S.%f").to_string()

}

#[tokio::main]

async fn main() {

    let s1 = stream::repeat(1u8).take(4).throttle(FIVESECOND);

    let s2 = stream::repeat(2u8).take(2).throttle(TENSECOND);

    let mut merged_stream = futures::stream::select(s1, s2);

    while let Some(task_num) = merged_stream.next().await {

        match task_num {

            1..=3 => {println!("Got {} at {} on {:?}", task_num, get_current_time_string(), thread::current().id());

                      io_task(task_num.to_string(), TWOSECOND).await;}

            _ => println!("Unmatched task {}", task_num)

        }

    }

}

Code output:

Got 1 at 18:59:54.536289300 on ThreadId(1)
ThreadId(1) : 1 lasting 2 seconds starts at 18:59:54.538308600 and finished at 18:59:56.543767000
Got 2 at 18:59:56.543767000 on ThreadId(1)
ThreadId(1) : 2 lasting 2 seconds starts at 18:59:56.546297600 and finished at 18:59:58.561440100
Got 1 at 18:59:59.544360000 on ThreadId(1)
ThreadId(1) : 1 lasting 2 seconds starts at 18:59:59.544360000 and finished at 19:00:01.545209200

so it looks like the first item from s2 stream only gets triggered after the io_task completes (which sleeps for 2 seconds asynchronously). I've done some other test on async_std::task::sleep, and async/await that confirms the await behavior is expected in all other scenarios I tested them.

My guess on why this is not working probably have to do with the stream merge:

    let s1 = stream::repeat(1u8).take(4).throttle(FIVESECOND);

    let s2 = stream::repeat(2u8).take(2).throttle(TENSECOND);

    let mut merged_stream = futures::stream::select(s1, s2);

What is a good way to create a async stream that fires with a non-constant delay between each? I could see one possible implementation would just be having a single loop with logic figuring out how long to sleep before yielding the next item.

Something like this using a different thread for execution / scheduling works.

use futures::{StreamExt, channel::mpsc};
use std::thread;
use chrono::prelude::*;
use std::time::Duration;
use async_std::task::sleep;
use futures::executor::block_on;
use lazy_static::lazy_static;
use clap::Clap;


const FIVESECOND: Duration = Duration::from_secs(5);
const TENSECOND: Duration = Duration::from_secs(10);
const TWOSECOND: Duration = Duration::from_secs(2);

lazy_static! {
    static ref ARGS: Opts = {
        Opts::parse()
    };
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = mpsc::channel::<u32>(50000);

    let future = receiver.for_each_concurrent(
        ARGS.concurrent_limit,
        |rx| async move {
            let starttime = get_current_time_string();
            println!("{:?} rx got {} at {}.", thread::current().id(), rx, starttime);
            io_task(rx, TWOSECOND).await;
            println!("{} tasks started at {} and finished at {}.", rx, starttime, get_current_time_string());
        }
    );

    thread::spawn(move || { block_on(other_thread(sender)); });
    future.await;
}

async fn other_thread(mut tx: futures::channel::mpsc::Sender<u32>) {
    use async_std::prelude::*;
    use async_std::stream;

    let s1 = stream::repeat(1u8).throttle(FIVESECOND);
    let s2 = stream::repeat(2u8).throttle(TENSECOND);
    let mut merged_stream = futures::stream::select(s1, s2);

    while let Some(task_num) = stream::StreamExt::next(&mut merged_stream).await {
        match task_num {
            1..=3 => {println!("{:?} Emitting {} at {}.", thread::current().id(), task_num, get_current_time_string());
                      tx.try_send(task_num as u32).unwrap();}
            _ => println!("{:?} Unmatched task {}.", thread::current().id(), task_num)
        }
    }
}

async fn io_task(name: u32, exec_delay: Duration) {
    let starttime = get_current_time_string();
    sleep(exec_delay).await;
    println!("{:?} {} started at {} and finished at {}.", thread::current().id(), name, starttime, get_current_time_string());
}

fn get_current_time_string() -> String {
    Local::now().format("%H:%M:%S.%f").to_string()
}

#[derive(Clap)]
#[clap(version = "0.1", author = "-")]
struct Opts {
    #[clap(short, long, default_value = "12")]
    concurrent_limit: usize,
}


with expected output

ThreadId(2) Emitting 1 at 21:09:23.662333900.
ThreadId(2) Emitting 2 at 21:09:23.662333900.
ThreadId(1) rx got 1 at 21:09:23.662333900.
ThreadId(1) rx got 2 at 21:09:23.674434100.
ThreadId(1) 1 started at 21:09:23.674434100 and finished at 21:09:25.677448400.
1 tasks started at 21:09:23.662333900 and finished at 21:09:25.677448400.
ThreadId(1) 2 started at 21:09:23.674434100 and finished at 21:09:25.681352400.
2 tasks started at 21:09:23.674434100 and finished at 21:09:25.681352400.
ThreadId(2) Emitting 1 at 21:09:28.678743800.
ThreadId(1) rx got 1 at 21:09:28.678743800.
ThreadId(1) 1 started at 21:09:28.683369000 and finished at 21:09:30.698023000.
1 tasks started at 21:09:28.678743800 and finished at 21:09:30.700576700.
ThreadId(2) Emitting 2 at 21:09:33.665542500.
ThreadId(1) rx got 2 at 21:09:33.665542500.
ThreadId(2) Emitting 1 at 21:09:33.681319200.
ThreadId(1) rx got 1 at 21:09:33.697181100.

The original question remains - why did the io_task await seems to be blocking in the first post? How to write it properly such that the scheduling and the io work can be done in single thread?

Yes, .await waits until the thing it is used on completes. If you want to run multiple things, you should spawn a new task or use some other tool for concurrency.

is there a way do a "fire and forget" on the same thread without waiting for futures completion?

You do that by spawning the future with tokio::spawn (or the equivalent in whatever runtime you are using).

1 Like

Thanks!

Just realized I've been mixing tokio and async std which are two complete and separate async runtimes in the same project. Miraculously they seem to be working along with one another except for some stream extension method name conflicts.

Will go through their apis and docs.

The async-std runtime starts itself automatically on first use, so if you use it from inside Tokio, it does work. That said, you end up with two runtimes running at the same time, which isn't ideal for performance. Tokio doesn't start automatically, so it doesn't work if you mix them up the other way.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.