Callback fn used multiple times

Hi there, and happy new year!

I want to pass in a callback Fn which is used multiple times, and I'm struggling. The use case is that I have some logic which processes every item in a list, and I sometimes want the processed object ASAP or I'm happy to wait and update it asynchronously:

pub(crate) async fn dispatch_async (
    put: Sender<ViewState>,
    users: UserStore,
    activity: &Activity,
) -> anyhow::Result<()> {
    let on_change = |vs| async {
        put.clone().send(vs).await?;
        Ok(())
    };
    dispatch_main(users, activity, &on_change).await
}

pub(crate) async fn dispatch_sync (
    mut results: Vec<ViewStore>,
    users: UserStore,
    activity: &Activity,
) -> anyhow::Result<()> {
    let on_change = |vs| async {
        results.push(vs);
        Ok(())
    };
    dispatch_main(users, activity, &on_change).await
}

async fn dispatch_main<F, Fut>(
    users: UserStore,
    activity: &Activity,
    on_change: &F,
) -> anyhow::Result<()>
where
    F: Fn(ViewState) -> Fut,
    Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
    for (user_id, user) in arc.read().await.users.iter() {
        let vs: ViewState = do_something(user).await?;
        &on_change(vs).await?;
    }
    Ok(())
}

One implementation of the callback captures the context of a tokio Sender channel, so it can be cloned or moved, requiring FnOnce. The other implementation captures a mutable vector which requires FnMut.

The problem is it just won't compile because:

  • FnOnce isn't compatible with FnMut (although I guess I could make this generic)
  • FnOnce isn't Copy and so it can't be used multiple times in the loop

My real surprise is the second one - I wasn't expecting the on_change function to be moved within the loop, and so something is obviously not "clicking" for me in terms of Rust's model here.

Help :slight_smile:

Your explanations / program descriptions are a bit all-over-the place, some aspects:

  • FnOnce/FnMut/Fn form a hierarchy, every Fn or FnMut will also be FnOnce, so the wording “requiring FnOnce” is confusing if you really mean “limiting to FnOnce”
  • the closure you are (probably) referring to doesn’t capture anything by-value and hence should be a full Fn: put.clone().send(vs).await?; only accesses put by shared reference, and hence only captures it by shared reference. In fact, the clone step seems entirely redundant to me.
  • on the topic of redundant stuff in the code: the “&” in “&on_change(vs).await?;” doesn’t do anything either

Now, looking into the source code you provided really the biggest problem that I see with this code and that’s going to be hard to solve:

|vs| async {
    results.push(vs);
    Ok(())
}

cannot really work at all. The closure captures results by mutable reference, and then returns a future that holds onto that mutable reference. That’ll be a lifetime error, no matter how you write it or what Fn…-trait bounds you use.

The fundamental problem here is that any Fn/FnMut/FnOnce returning a future can be called again immediately once it returned the future. There’s nothing forcing the user to .await the future first, before doing the next call to the closure. Hence, the future cannot contain a mutable reference, otherwise we’d get multiple mutable references to the same thing.

In the long term, I’m hoping that Rust gets some nicer support for something like “async closures” that do support restricting the user to get rid of one future first before the next call to the closure creates the next one. Perhaps all that’s fundamentally needed is a way to allow closure output times to contain lifetime borrowing from the closure’s &mut self argument or &self argument. Anyways… in the mean-time, I suppose there is no good way to fix your code except for

  • either – since your use-case looks like the mutating results.push closure doesn’t do anything asynchronous – you could do the pushing action in the closure and just return async { Ok(()) } afterwards [this approach would also need you to switch to a FnMut bound from the Fn bound in the code you posted]
  • or you could use some interior mutability primitive to work around the issue; e.g. put the Vec in a RefCell (or if that messes with some Send/Sync requirements then maybe a Mutex)

Edit: I’m only now noticing the Send + 'static bound on Fut… I mean, Send can make sense to more proactively force dispatch_main invocations to be Send, but what’s the rationale for requiring 'static here?

3 Likes

I created a minimal example that compiles on the playground, but I've no idea if this is sound:

use tokio::sync::mpsc::Sender;

use futures::future::{ready, Future};

#[derive(Debug)]
struct ViewState;

async fn dispatch_async (
    put: &Sender<ViewState>,
) -> anyhow::Result<()> {
    let on_change = |vs| async move {
        put.clone().send(vs).await?;
        Ok(())
    };
    
    dispatch_main(on_change).await
}

async fn dispatch_sync (
    mut results: Vec<ViewState>,
) -> anyhow::Result<()> {
    let on_change = move |vs| {
        results.push(vs);
        ready(Ok(()))
    };
    
    dispatch_main(on_change).await
}

async fn dispatch_main<F, Fut>(
    mut on_change: F,
) -> anyhow::Result<()>
where
    F: FnMut(ViewState) -> Fut,
    Fut: Future<Output = anyhow::Result<()>> + Send,
{
    for _ in 0..3 {
        on_change(ViewState).await?;
    }
    Ok(())
}

Playground.

I circumvented this by not making the closure async and returning Ok(()) as an impromptu created future that resolves immediately with the ready function.

Other than that I fiddled with your types a little (e.g. on_change is now FnMut to incorporate the closure from the dispatch_async function.

But, like I said, I don't know if this actually does what you want, it only compiles

2 Likes

Yes, that’s what I meant by

1 Like

@steffahn , @jofas - huge thank you to you both.

@steffahn, I should have clarified that some of my code sample was just "throwing spaghetti at walls".

@jofas - this is exactly what I want - thank you.

1 Like

(If there's no unsafe, it's sound [1]. It might still deadlock or do nothing useful, but it's sound...)


  1. modulo bugs in your dependencies or the compiler ↩︎

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.