Async/await and threadpools in futures-await 0.1.1

I've started to play around with the async/await combinations in the futures-await crate. I'm comfortable with async in C++1x. I noted a focus on IO for Rust futures in the documentation. Can I also assume it will spawn threads for computation as well. For example

#[async]
fn reflect_trade(file :&'static Path) -> Result<i32,Error> {
    // Async Load trade
    let buffer = await!(load_file(file))?; //IO
    // Convert to pdl
    let pdl = pdl::PdlDictionary::from(buffer.as_slice()); // CPU bound
    let v1 = await!(calculation1(&pdl))?; // CPU bound
    let v2 = await!(calculation2(&pdl))?; // CPU bound
    let v3 = await!(calculation3(&pdl))?; // CPU bound

    // sync point
   let final_value = combine(v1,v2,v3);
   Ok(final_value)
}

Here I have some IO followed by 3 calculations that are independent and (presumably) long running. Will await!() use some internal threadpool to distribute this out among cores as C++ may do.
Or do I have to do this manually?

My understanding, which might be incorrect, is each of those await! is a suspension point. The generated state machine does not transition to the other suspension points until the previous one resolves. In other words, the 3 async statements doing CPU work will not run in parallel as it’s written.

Instead, you may need to use async_block! for each of them.

As for the actual execution, I think this will depend on the underlying executor that’s used. If it’s, for example, tokio 0.2 in default configuration, then it’ll have a threadpool for executing futures.

Did you mean await! by any chance?

Yes, I did - fixed. Thanks!

Thanks. It seems to work. Given it's an unstable feature will report back if any issues found

By "seems to work", you mean it runs the 3 CPU bound workloads in parallel? If so, with async_block! or with your original code using await!?

Ok, it's been a couple of days since I could look at this problem so apologies. But it definitely doesn't behave as I expected it to. Here's the code

#[async]
fn ramp_connection() -> Result<ramp::RampConnection, Error> {
    let r = ramp::RampConnection::new();
    // This takes a while to startup
    r.startup();
    println!("Ramp started");
    Ok(r)
}

#[async]
fn fixing_mapping(p: PathBuf) -> Result<FnvHashMap<u64, String>, Error> {
    // Waste time here
    for i in 0..100000000 {
        if i % 100000 == 0 {
            print!(".")
        }
    }
    println!("Loaded fixingmapping");
    Ok(FnvHashMap::default())
}

#[async]
fn load_pdl_file(pth: PathBuf) -> Result<pdl::PdlDictionary, Error> {
    pdl::PdlDictionary::try_from(File::open(pth).expect("pdl file doesn't parse"))
}

fn reflect_trades(p: &Path) -> Result<(), Error> {
    let s = async_block! {
        let r : ramp::RampConnection = await!(ramp_connection())?;
        let fixing_mapping = await!(fixing_mapping(Path::new("fixingmappings.csv").to_owned()))?;
        for pdl_path in controller::get_files_iter(p, ".pdl")? {
            let pdl = await!(load_pdl_file(pdl_path.to_path_buf()))?;
            println!("{:?} {:?}",pdl_path,pdl.pdl_dict.len());
        }
        Ok(())
    }.wait();
    s
}

 #[test]
    fn test_async_loading() {
        assert!(reflect_trades(Path::new("pdl")).is_ok());
    }

My expectation was that we would see printed is something similar to the following

Ramp started
"pdl\\12345.pdl" 2141
"pdl\\6789.pdl" 999
Loaded fixingmapping   // this is after a couple of files have been processed as it takes a while to run
"pdl\\101112.pdl" 5435

But instead everything happens in order

Ramp started
Loaded fixingmapping  
"pdl\\12345.pdl" 2141
"pdl\\6789.pdl" 999
"pdl\\101112.pdl" 5435

Am I missing something in the await() syntax?

Every time await!() is called the code following it only runs once the async code inside the await block completes. In other words, I think these two snippets are equivalent from asynchrony standpoint:

let res = async!(some_async_fn());
<more code>
some_async_fn().and_then(|res| <more code>)

So whenever you have back to back async! calls, they proceed sequentially in terms of their execution - they form a continuation chain except the source code looks linear rather than closure based like with stock futures.

If you want parallelism I suspect you’ll need to do this manually, perhaps with futures-cpupool, and schedule the work yourself.

I might be wrong - I’ve not studied futures-async in great detail - but it looks mostly like a way to chain futures without all the closure based nesting. It’s not a parallel execution scheduled.

cc @alexcrichton who’d have an expert view on this.

ramp_connection and fixing_mapping both lack any await!s, that means they will run synchronously when await!ed in reflect_trades.

Also, as mentioned by @vitalyd you are not using any of the concurrency combinators in reflect_trades so that function will run each sub-future it's await!ing in series.

Once ramp_connection and fixing_mapping are made to be actually asynchronous (in ramp_connection you need to make r.startup() return a future and await it, in fixing_mapping you need to use a thread pool to do your busy work on), you would then want to use join in reflect_trades to process them in parallel, e.g.

fn reflect_trades(p: &Path) -> Result<(), Error> {
    let s = async_block! {
        let loading_pdl = async_block! {
            for pdl_path in controller::get_files_iter(p, ".pdl")? {
                let pdl = await!(load_pdl_file(pdl_path.to_path_buf()))?;
                println!("{:?} {:?}",pdl_path,pdl.pdl_dict.len());
            }
        };
        let map = fixing_mapping(Path::new("fixingmappings.csv").to_owned());
        let joined = ramp_connection().join3(map, loading_pdl);
        let (r, fixing_mapping, _) = await!(joined);
        Ok(())
    }.wait();
    s
}

EDIT: If RampConnection::startup can't be made asynchronous you could change ramp_connection to also use a thread pool for that.

@Nemo157 : By marking ramp_connection() #[async] it is returning a impl future?

I'm comparing to C++ or C# where if I call

auto&& a = std::async([]() { /* do some long running task */ return 1234;});
auto&& b = std::async([]() { /* do some long running task */ return 1234;});
auto result = a.get() + b.get();

I expect those two to run asynchronously and just wait at the final line until both completed. I don't have a guarantee whether it's run on threads or a single thread, but they are concurrent.

I don't have to manage anything in my C++ code to do this , which I was expecting for await!()

Ah, actually went back and re-read your original post

Definitely not, async and await! is a relatively simple transform that allows defining sequential flows of logic that involve waiting on asynchronous IO and turning that into a state machine. Whenever you hit an await! the code in this function will stop executing until the await!ed future resolves. If you want concurrency you need to do this yourself by using one of the join/select combinators, if you want to have CPU bound work also fitting into this asynchronous IO model you need to do that using a thread pool (I'd presume tokio-threadpool would be a good starting point for this, although that lacks the ability to return values from your spawned futures).

Yes, it is, but it is a Future that completes in a single step without yielding back to the caller because it doesn't contain any await!s.

I haven't used C++ futures but I believe in that example std::async -> thread_pool.spawn_with_handle and .get() -> await!, so you would write that:

let a = thread_pool.spawn_with_handle(async_block! { /* do some long running task */; 1234 });
let b = thread_pool.spawn_with_handle(async_block! { /* do some long running task */; 1234 });
let result = await!(a) + await!(b);

This is almost identical to how you would do this in C#:

var a = Task.Run(async () => { /* do some long running task */; return 1234; });
var b = Task.Run(async () => { /* do some long running task */; return 1234; });
var result = (await a) + (await b);
3 Likes

That definitely helps.
But that ties me to either making every line potentially await which is not going to happen for 3rd party libraries (in this case the ramp lib is a C++ lib). And a threadpool is not really what I need, a co-routine pool is relevant here? Do you have a pointer to any docs on that?

I don't think you need a threadpool. If you use tokio, you can execute all these futures on its runtime, which in tokio 0.2 consists of a threadpool (by default). To get concurrency, you need to create the futures representing the execution, and then use combinators like join to create yet another future that will resolve once all the joined futures are done; then continue working with that future. If you're using async/await, you need to be deliberate in where you place await! - it creates a continuation from that point forward, and all subsequent code to it does not run concurrently until the await()'d operation completes.

From what I'm gathering, await! is mostly sugar for Future::and_then that you'd write by hand, linearizing the code rather than having nested closures. I think the dependency chain is perhaps easier to spot when you see fut1.and_then(|res| fut2).and_then(|res2| fut3)...