Spawn loop and select! on closure

The block method works fine. I want to do exactly the same thing as the block method but by passing in a closure instead.

It seems that theoretically, this should be doable. The block method moves obj into the async move block and owns it and so is able to call it on each loop.

But I've tried many things on the closure approach and can't get it to work. with_closure has an identical signature. I want to be able to make run_closure a general method/function somewhere and so that's why I'm trying to make this work. It's a little different than this, but this is a simplified version to ask this question.

The overall goal is to be able to take any number and types of params that goes into some code and comes out a future that will return X (bool in this case) and so that's why a closure makes sense.

I might have found one solution that involves cloning everything in between the move and async move, but... the block function does not require obj to be Clone, so theoretically it shouldn't have to be, and it wouldn't work well for my case to require it.

(ignore the Arc<Self>'s, not pertinent to this question)

playground

use std::sync::Arc;

pub trait Rec<T>: Send + Sync + 'static {
    fn drum(&mut self) -> impl Future<Output = bool> + Send + Sync;
}

struct Test();

impl Test {
    pub fn block<T: Rec<T>>(arc_self: Arc<Self>, mut obj: T) {
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    result = obj.drum() => {
                        if !result {
                            break;
                        }
                    }
                    // ...
                }
            }
        });
    }

    pub async fn with_closure<T: Rec<T>>(arc_self: Arc<Self>, mut obj: T) {
        Self::run_closure(arc_self, move || async move { obj.drum().await }).await;
    }

    pub async fn run_closure<F, Fut>(arc_self: Arc<Self>, mut task: F)
    where
        F: FnMut() -> Fut + Send + 'static,
        Fut: Future<Output = bool> + Send {
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    result = task() => {
                        if !result {
                            break;
                        }
                    }
                    // ...
                }
            }
        });
    }
}

It's impossible to use async move { captured_var } inside FnMut, because move is a destructive operation that can only be done once, but FnMut promises it can be called many times. The second time it's called, it would have nothing left to move into the async block.

This is the case where you need AsyncFnMut and real async closures. You need the captured object to be moved into the closure, but only borrow it in the Future being returned. Regular closures aren't allowed to lend captured variables, so they can't do it.

If you need to use FnMut, you will have to clone on each call.

In principle, what you need is an AsyncFnMut closure, which allows the future it returns to borrow the obj owned by the closure:

    pub async fn with_closure<T: Rec<T>>(arc_self: Arc<Self>, mut obj: T) {
        Self::run_closure(arc_self, async move || obj.drum().await).await;
    }

    pub async fn run_closure<F>(arc_self: Arc<Self>, mut task: F)
    where
        F: AsyncFnMut() -> bool + Send + 'static,
    {

Unfortunately, there is not currently a stable way to express that the future returned by F should be Send, which is required to use it with tokio::spawn(). You can do it unstably:

#![feature(async_fn_traits)]
...
    pub async fn run_closure<F>(arc_self: Arc<Self>, mut task: F)
    where
        F: AsyncFnMut() -> bool + Send + 'static,
        for<'a> F::CallRefFuture<'a>: Send,
    {

If you didn't need &mut self access for drum(), then you can use an Arc instead to outright share it:

    pub async fn with_closure<T: Rec<T>>(arc_self: Arc<Self>, mut obj: T) {
        let obj = Arc::new(obj);
        Self::run_closure(arc_self, move || {
            let obj = obj.clone();
            async move { obj.drum().await }
        })
        .await;
    }

And of course you can regain &mut self access with a Mutex.

To get the necessary bounds without unstable or run-time overhead, you can replace FnMut with a trait of your own, at the cost of needing to declare explicit structs instead of closures:

trait MyAsyncFnMut {
    fn call(&mut self) -> impl Future<Output = bool> + Send;
}
struct DrumFn<T> {
    obj: T,
}
impl<T: Rec<T>> MyAsyncFnMut for DrumFn<T> {
    fn call(&mut self) -> impl Future<Output = bool> + Send {
        async { self.obj.drum().await }
    }
}

...

    pub async fn with_closure<T: Rec<T>>(arc_self: Arc<Self>, mut obj: T) {
        Self::run_closure(arc_self, DrumFn { obj }).await;
    }

    pub async fn run_closure<F>(arc_self: Arc<Self>, mut task: F)
    where
        F: MyAsyncFnMut + Send + 'static,
    {

Finally, one unrelated note on your code:

    fn drum(&mut self) -> impl Future<Output = bool> + Send + Sync;

There is (almost) never a reason to have a Future + Sync bound. Futures are always accessed exclusively, so you only need Send, and having a Sync bound can needlessly constrain what the future can do (e.g. having a Cell in a local variable).

4 Likes

This is very helpful. I found it rather odd and concerning that what seems very simple when together in a single function suddenly becomes a mess when trying to extract part of it.

Your suggestion of the unstable AsyncFnMut might work for me. That seems to suggest that it's just a current missing piece in the Rust type system that is causing this issue for me. That or the trait approach should work.

Thank you!

To be clear to everyone: using AsyncFnMut is stable (since 1.85). What is unstable is being able to refer to its associated future type ::CallRefFuture<'a>.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.