Tokio async parallel futures

Hello!

i am junior dev and very new to rust but i did manage to write a small program using tokio.
it runs forever and executes some code at intervals in parallel.
it looks something like this:

fn repeat<W: Send + Fn() -> ()>(work: W, interval: u64) -> impl Future<Item = (), Error = ()> + Send {
    Interval::new(Instant::now(), Duration::from_secs(interval))
        .for_each(move |_| {
            work();
            Ok(())
        }).map_err(|e| panic!("task error: {:?}", e))
}

fn work1() -> impl Future<Item = (), Error = ()> + Send {
    repeat(|| {
        println!("work");
    }, 5)
}

fn work2() -> impl Future<Item = (), Error = ()> + Send {
    repeat(|| {
        println!("more work");
    }, 10)
}

// fn main()

tokio::run(lazy(|| {
    tokio::spawn(work1());
	tokio::spawn(work2());
    Ok(())
}));

then i decided to do some testing to leverage/learn async/await:

async fn repeat<W: Send + 'static + Fn() -> ()>(work: W, interval: u64) {
     tokio::time::interval_at(tokio::time::Instant::now(), Duration::from_secs(interval))
        .for_each(move |_| {
            work();
            ready(())
        }).await
}

async fn work1() {
    repeat(|| {
        println!("work");
    }, 5).await
}

async fn work2() {
    repeat(|| {
        println!("more work");
    }, 10).await
}

//#[tokio::main]
//async fn main() -> ()

let tasks = vec![
    tokio::spawn(async move { work1().await }),
    tokio::spawn(async move { work2().await })
];

futures::future::join_all(tasks).await;

but this does not run in parallel =(
only way i can get it to run in parallel (so far) is like this:

let runtime1 = tokio::runtime::Runtime::new().unwrap();
let runtime2 = tokio::runtime::Runtime::new().unwrap();

runtime1.spawn(async move { work1().await });
runtime2.spawn(async move { work2().await });

loop {}

but this is bananas, what am i missing and doing wrong?

edit for clarity: I want the intervals in work1 and work2 to execute in parallel, not concurrently, which is what I observe when using future::join_all(). one tokio::runtime per work() method achieves parallelism but does not look/feel right. how to do it properly?

Where is interval_at coming from? Are you using tokio::time::Instant or std::time::Instant? And do you mean you want it to run in parallel or concurrently?

thanks! edited for clarity

Are you wanting the output to be in-step?

The two (tokio 0.2. not run old) are giving the same result.
The lazy way to do so is add a small delay between spawning. The runtime isn't realtime so it's possible to glitch and temporarily not have the in-step output.

figured it out, thanks for the help!

//#[tokio::main(core_threads = 2, max_threads = 4)]
//async fn main() -> ()

tokio::spawn(async move { work1().await });
tokio::spawn(async move { work2().await });

loop {}

also refactored repeat function

async fn repeat<W: Fn()>(work: W, interval: u64) {
    let mut interval = interval_at(Instant::now(), Duration::from_secs(interval));

	loop {
        interval.tick().await;
        work();
        tokio::task::yield_now().await;
	}
}

still have not figured out how to prevent the program from exiting without a loop or read_line() ...

FYI, I tried your original code and it does run in parallel. (Though maybe your code was a little more elaborate) I simplified it to prove it. sleep will block a whole thread, which is not what you would typically want to do in async code, but if you run that, both values print at the same time indicating that they are running on two separate threads. If it was just concurrent on a single thread, one would print 2 seconds after the other.

use std::time::Duration;

async fn work1() {
    std::thread::sleep(Duration::from_secs(5));
    println!("HI")
}

async fn work2() {
    std::thread::sleep(Duration::from_secs(5));
    println!("HI")
}

#[tokio::main]
async fn main() -> () {
    let tasks = vec![
        tokio::spawn(async move { work1().await }),
        tokio::spawn(async move { work2().await }),
    ];

    futures::future::join_all(tasks).await;
}


Nice! but i am fairly certain that my code was only parallel after i configured threads on the main macro, maybe because i have my dev environment in docker or because i use interval_at, not sure.

anyway awaiting a join_all on tasks that run async intervals just exits the program =|

Did you have either the rt-threaded or full feature enabled when you first tried it? If not, according to the documentation tokio only runs code single threaded. Maybe that’s what you were seeing?

For your second comment, if you show some sample code that you expect to work but isn’t it’s easier to debug. Particularly it’s helpful to use the rust playground Link where you can actually run the code and share a link to it.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.