Optimal concurrency with async

Hello, in most cases I see how to achieve optimal concurrency between dependent task by composing futures in rust.

However, there are cases where I am not quite sure how to do it without having to circumvent the borrow checker, which very reasonably is not able to prove that my code is safe.

Consider for example the following scenario.

  • first_future_a : requires immutable access to a
  • first_future_b : requires immutable access to b
  • first_future_ab : requires immutable access to a and b
  • second_future_a: requires mutable access to a, and must execute after first_future_a and first_future_ab
  • second_future_b: requires mutable access to b, and must execute after first_future_b and first_future_ab.

I would like second_future_a to be able to run as soon as first_future_a and first_future_ab are completed.
I would also like second_future_b to be able to run as soon as first_future_b and first_future_ab are completed.

For example one may try to write the following code:

        let mut a = ...;
        let mut b = ...;
        let my_future = async {
            let first_fut_a = async {
                    println!("A from first_fut_a: {:?}", a.get()); // immutable access to a
            };

            let first_fut_b = async {
                    println!("B from first_fut_ab: {:?}", b.get());  // immutable access to b
            };

            let first_fut_ab = async {
                    println!("A from first_fut_ab: {:?}", a.get());  // immutable access to a
                    println!("B from first_fut_ab: {:?}", b.get());  // immutable access to b
            };


            let second_fut_a = async {
                first_fut_a.await;
                first_fut_ab.await;
                // This only happens after the immutable refs to a are not used anymore, 
                // but the borrow checker doesn't know that.
                a.increase(1); // mutable access to a, the borrow checker is sad :(
            };

            let second_fut_b =  async {
                first_fut_b.await;
                first_fut_ab.await;
                // This only happens after the immutable refs to b are not used anymore, 
                // but the borrow checker doesn't know that.
                b.increase(1); // mutable access to b, the borrow checker is sad :(
            };

            future::zip(second_fut_a, second_fut_b).await;
        };

Is there a way to make sure that
second_fut_a can run as soon as first_fut_a and first_fut_ab are done, and
second_fut_b can run as soon as first_fut_b and first_fut_ab are done
(whichever happens first) while maintaining borrow checking at compile time (no RefCell please :wink: )?

1 Like

I don’t think it is possible to meet all of your requirements. Borrow checking never affects how a program executes; therefore it cannot affect the timing of when futures are started; therefore, any mechanism to do this scheduling will involve some kind of explicit synchronization mechanism to notice when a is available after its two users finish; and the borrow checker cannot analyze that kind of code and prove it sound.

So, you will need RefCell or similar. However, you may be able to wrap up the RefCells in a helper that prevents misuse by managing both the scheduling and the RefCells, so you do not need to worry about the application code causing a panic via a logic bug.

You can use the run-time-borrow-checking mechanism as the synchronization mechanism, by using an async RwLock and allowing second_fut_a to await lock acquisition — though if you do this you need to make sure all four read locks are acquired before the write lock is attempted, as otherwise the subtasks could possibly execute in the opposite order or deadlock. Example:

use tokio::sync::RwLock;

#[tokio::main]
async fn main() {
    let a = RwLock::new(100);
    let b = RwLock::new(200);

    let first_a_guard = a.read().await;
    let first_fut_a = async move {
        println!("A from first_fut_a: {:?}", *first_a_guard);
    };

    let first_b_guard = b.read().await;
    let first_fut_b = async move {
        println!("B from first_fut_b: {:?}", *first_b_guard);
    };

    let first_ab_a_guard = a.read().await;
    let first_ab_b_guard = b.read().await;
    let first_fut_ab = async move {
        println!("A from first_fut_ab: {:?}", *first_ab_a_guard);
        println!("B from first_fut_ab: {:?}", *first_ab_b_guard);
    };


    let second_fut_a = async {
        *a.write().await += 1;
    };

    let second_fut_b =  async {
        *b.write().await += 1;
    };

    tokio::join!(first_fut_a, first_fut_b, first_fut_ab, second_fut_a, second_fut_b);
    println!("{} {}", *a.read().await, *b.read().await);
}

Another option would be to use unchecked interior mutability, i.e. UnsafeCell. However, you’d still need some additional synchronization — your code as written will not compile because second_fut_a and second_fut_b cannot both own the first_futs; you'd need something like FutureExt::shared() to let them share access, and that is itself a synchronization and interior mutability mechanism. So, on net, this is not a better option.

The only truly zero-overhead way to do this would be to write a custom future combinator that owns all five futures, knows the dependency graph, and can thus decide when to start polling the second_futs. But it’d have to own a and b and hand out borrows only when possible. (This can make sense — for example, the scheduler in Bevy ECS does exactly this though at run time, identifying dependencies and running systems in parallel when possible — but it's not simple to do from scratch or fully statically.)

I recommend you just go ahead and use RefCell or async RwLock. It is very unlikely that this will be a significant cost in your application (either for performance or for development effort); everything else is equally or more complex.

4 Likes

Thank you very much @kpreid, I think your analysis is on spot!

I think that your suggestion of creating a custom future combinator is the right approach, and I will be looking into it and update this post if I manage to figure it out.

Just to make sure that I am not missing anything, let me point out that you mention at the beginning of your post that my requirements can't be met, yet it seems to me that the future combinator solution (if achievable) would effectively allow me to schedule the concurrent execution that I desire while still allowing the borrow checker to validate my data access at compile time. Isn't that a contradiction? Or am I missing something?

It's also funny that you mentioned bevy, since what I am trying to do is very similar to what bevy does when scheduling systems, yet I am specifically trying to achieve that fully statically.

1 Like

To be honest, I wrote the post and didn't go back and revise the beginning of it after I had written down all my ideas, so it wasn't accurate. But also, note that you will need to modify all the code to cooperate with the combinator/scheduler. And doing the scheduling fully statically in a safe and sound way will be a hard problem.

1 Like

Thank you!

While I still think that the future combinator approach you suggested is the right solution, it seems like I opened a can of worms (which I wasn't fully aware of) related to self referential structures.

I am having trouble getting the 5 futures owned by the future combinator to contain references to a and b (which as you mentioned are also owned by the future combinator).

I am trying to understand whether there are ways to do this without relying on unsafe and pinning, but I am really not sure if that can be done.

Still planning to update here if I find a solution, and if anybody has hints on how to write the combinator I would be happy to receive them!

Self-referential data structures require unsafe but there are crates containing the unsafe code that are fairly well tested. The first two listed here are fairly popular. The first one, ouroboros is fairly general purpose and therefore more complex. The second one, self_cell, is simpler but only allows the references to be read-only.

1 Like

I feel like I am being a bit picky here, but I was really hoping to find a way to do this without forcing a number of heap allocations proportional to the number of futures I am scheduling :sweat_smile:

Do I understand correctly that implementing the future combinator solution proposed by @kpreid would necessarily require a self referential structure?

And if yes, are there ways to create self referential structures that do not involve any heap allocation?

Yes, but don't think of this as a general self-reference problem; you're in async land and you should use the self-reference capabilities of async blocks. You can use poll_fn() to mix a custom polling strategy into what an async block does.

I'm working on an example of doing this.

1 Like

Well, this turned out to be a huge mess. It works (though I have not thoroughly tested it) but it has a probably-fatal limitation: its future is not Send. This won’t be fixable until we get the ability to specify Send bounds for AsyncFnOnce futures. (I tried my favorite helper async_fn_traits but the lifetimes don't work out for those not-real-async-fns.)

Also, there is probably a better way to implement polling the five futures, but I didn't find one rummaging through std or futures's helpers.

use futures::future::{Fuse, FusedFuture as _, FutureExt as _, MaybeDone};
use std::cell::UnsafeCell;
use std::future::Future as _;
use std::pin::{Pin, pin};
use std::task::Poll;

#[tokio::main]
async fn main() {
    let mut a = 100;
    let mut b = 200;

    five_part_combination(
        &mut a,
        &mut b,
        async |a| {
            println!("A from first_fut_a: {:?}", a);
        },
        async |b| {
            println!("B from first_fut_b: {:?}", b);
        },
        async |a, b| {
            println!("A from first_fut_ab: {:?}", a);
            println!("B from first_fut_ab: {:?}", b);
        },
        async |a| {
            *a += 1;
        },
        async |b| {
            *b += 1;
        },
    )
    .await;

    assert_eq!((a, b), (101, 201));
    println!("Complete");
}

async fn five_part_combination<'a>(
    a: &'a mut i32,
    b: &'a mut i32,
    // Note: The signatures of these functions do not allow the references to
    // outlive the calls! This is necessary for soundness!
    fn_first_a: impl AsyncFnOnce(&i32),
    fn_first_b: impl AsyncFnOnce(&i32),
    fn_first_ab: impl AsyncFnOnce(&i32, &i32),
    fn_second_a: impl AsyncFnOnce(&mut i32) + 'a,
    fn_second_b: impl AsyncFnOnce(&mut i32) + 'a,
) {
    let mut a = Some(UnsafeCell::new(a));
    let mut b = Some(UnsafeCell::new(b));
    let mut fn_second_a = Some(fn_second_a);
    let mut fn_second_b = Some(fn_second_b);

    let (mut first_fut_a, mut first_fut_b, mut first_fut_ab) = {
        let a_shared = unsafe { &**a.as_ref().unwrap().get() };
        let b_shared = unsafe { &**b.as_ref().unwrap().get() };
        (
            pin!(fn_first_a(a_shared).fuse()),
            pin!(fn_first_b(b_shared).fuse()),
            pin!(fn_first_ab(a_shared, b_shared).fuse()),
        )
    };

    // Lightly abusing `MaybeDone` as a pin-projectable `Option`.
    // Note that we're using `Gone` to denote “not started”.
    let mut second_fut_a: Pin<&mut MaybeDone<Fuse<_>>> = pin!(MaybeDone::Gone);
    let mut second_fut_b: Pin<&mut MaybeDone<Fuse<_>>> = pin!(MaybeDone::Gone);

    std::future::poll_fn(|ctx| {
        let mut busy = false;

        // Poll 'first' futures
        if !first_fut_a.is_terminated() {
            busy |= first_fut_a.as_mut().poll(ctx).is_pending();
        }
        if !first_fut_b.is_terminated() {
            busy |= first_fut_b.as_mut().poll(ctx).is_pending();
        }
        if !first_fut_ab.is_terminated() {
            busy |= first_fut_ab.as_mut().poll(ctx).is_pending();
        }

        // If the relevant 'first' tasks have finished yet, start the 'second'
        if first_fut_a.is_terminated()
            && first_fut_ab.is_terminated()
            && matches!(second_fut_a.as_ref().get_ref(), MaybeDone::Gone)
        {
            Pin::set(
                &mut second_fut_a,
                MaybeDone::Future(
                    fn_second_a.take().unwrap()(a.take().unwrap().into_inner()).fuse(),
                ),
            );
        }
        if first_fut_b.is_terminated()
            && first_fut_ab.is_terminated()
            && matches!(second_fut_b.as_ref().get_ref(), MaybeDone::Gone)
        {
            Pin::set(
                &mut second_fut_b,
                MaybeDone::Future(
                    fn_second_b.take().unwrap()(b.take().unwrap().into_inner()).fuse(),
                ),
            );
        }

        // Poll the 'second' futures
        if !second_fut_a.is_terminated() {
            busy |= second_fut_a.as_mut().poll(ctx).is_pending();
        }
        if !second_fut_b.is_terminated() {
            busy |= second_fut_b.as_mut().poll(ctx).is_pending();
        }

        if busy { Poll::Pending } else { Poll::Ready(()) }
    })
    .await
}

You will almost certainly be much better off using my previous suggestion of "use the run-time-borrow-checking mechanism as the synchronization mechanism" than using this sort of code in production. But, this does demonstrate the principle, and could be improved eventually.

3 Likes

To answer this explicitly: Pin (which is used by Futures) is precisely about providing the ability to create self-referential structures without requiring additional separate allocations. The reason why libraries like ouroboros don't use Pin is:

  • Pin has some problems still to be worked out; it isn't really possible to write truly correct code that actually makes use of Pin self-reference “from scratch” until UnsafePinned is available. (My above code is OK because it makes use of the self-reference created by borrowing from an async block's field … wait, actually, it isn't okay, as Miri tells me. The poll_fn(move |ctx| has to be changed to poll_fn(|ctx| to avoid moving the previously borrowed things. I've edited the previous post.)

  • Pin is only helpful rather than a complication if you are planning to compose multiple self-referential types (such as async blocks awaiting other futures), and those types don't start out self-referential when created (a Future never has any self-reference until polled).

2 Likes

Is it possible to dispence with asyc/await entirely?

This problem is an example of the can of worms that asyc/await opens. It makes the easy part of asynchronous coding easier, and the hard things harder.

I spend my C coding time with asynchronous code, and given some simple data structures to handle events, it is easier than using async/await in the corner cases

I have experimented with async/await in Rust, and doing asynchronous without it. The latter has more code, but it is simple code. I am not going back, myself, to async/await

Also it may be worth considering using synchronous blocking code and parallelism for your concurrency. Rust really shines in that domain.

I see far too many Rust crates going the async/await rout when there is nothing to be gained, do not follow those examples without a good reason

This is a very good point. @thekipplemaker, are you just trying to learn about async, or do you have a specific use case in mind? If you are working on a specific use case, have you already determined that it is one where async provides benefits? For example, significant time spent waiting on file or network IO, under high concurrency (like tens of thousands of simultaneous connections)? If not, async might not be particularly relevant to what you're trying to achieve. If you describe what you're working on in more detail, someone might be able to guide you toward an appropriate approach to concurrency.

1 Like

@kpreid Thank you so much for taking the time to work out the solution above: seeing it and reasoning through it was a great learning experience for me.

@worik and @abyrd : Good point I should probably take a step back and understand whether I need async or just parallelism, especially since my current approach is not proving fruitful.

I created another topic to describe my use case, happy to get your opinion over there: