Multiple async callback level issue

I am trying to write a Rust program that has multiple levels of async Fn callbacks. The real code in my application is substantially more complicated but I have been able to duplicate my issue in the code below. I have spent multiple hours trying to find the right combination to work including wrapping Fn in Arc, etc... and haven't been able to find a workable solution yet.

Error:

error[E0507]: cannot move out of `fn_processor`, a captured variable in an `Fn` closure
  --> src/main.rs:62:41
   |
52 |           fn_processor: F,
   |           ------------ captured outer variable
...
62 |                       Box::pin(async move {
   |  _________________________________________^
63 | |                         let fn_processor_future = fn_processor(params);
   | |                                                   ------------
   | |                                                   |
   | |                                                   move occurs because `fn_processor` has type `F`, which does not implement the `Copy` trait
   | |                                                   move occurs due to use in generator
64 | |                         let fn_process_result = fn_processor_future.await;
65 | |                         match fn_process_result {
...  |
69 | |                         Ok(())
70 | |                     })
   | |_____________________^ move out of `fn_processor` occurs here

For more information about this error, try `rustc --explain E0507`.
error: could not compile `asynccallbacks` due to previous error

main.rs

#[derive(Default)]
struct Settings {
    field_a: String,
}

#[derive(Default)]
struct ConsumeParams {
    id: String,
}
#[derive(Default)]
struct Payload {
    id: String,
}

struct Queuer {}

impl Queuer {
    fn new() -> Self {
        Self {}
    }

    async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>>,
    {
        let mut counter = 0;
        loop {
            if counter > 0 {
                return Ok(());
            }
            counter = counter + 1;
            match fn_consume(ConsumeParams::default()).await {
                Ok(_) => println!("success"),
                Err(err) => return Err(err),
            }
        }
    }
}

struct Syncer {
    queuer: Queuer,
}

impl Syncer {
    fn new() -> Self {
        Self {
            queuer: Queuer::new(),
        }
    }
    pub async fn perform_actions<F, RespMsg>(
        &self,
        fn_processor: F,
        consume_settings: Settings,
    ) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<RespMsg, anyhow::Error>> + Send + 'static,
        RespMsg: Into<Payload> + Send,
    {
        self.queuer
            .consume(
                |params: ConsumeParams| {
                    Box::pin(async move {
                        let fn_processor_future = fn_processor(params);
                        let fn_process_result = fn_processor_future.await;
                        match fn_process_result {
                            Ok(_) => println!("Success"),
                            Err(err) => return Err(err),
                        };
                        Ok(())
                    })
                },
                consume_settings,
            )
            .await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let queuer = Queuer::new();
    queuer
        .consume(
            |_: ConsumeParams| Box::pin(async move { panic!("testing consume") }),
            Settings::default(),
        )
        .await
        .unwrap();
    let syncer = Syncer::new();
    syncer
        .perform_actions(
            |_: ConsumeParams| {
                Box::pin(async move {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    Ok(Payload::default())
                })
            },
            Settings::default(),
        )
        .await
        .unwrap();
    println!("Hello, world!");
}

Cargo.toml

[package]
name = "asynccallbacks"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.41"
futures = "0.3.17"
tokio = { version = "1", features = ["full"] }

fn_processor starts out owned by perform_actions. Then, async move { on line 62 tries to move fn_processor (a variable it mentions) into itself, because that's what move means — move all variables. You do need to move something in, so that consume can have what it wants.

One possible fix — I haven't looked hard enough to see if this is the best thing for the task — is to share ownership of fn_processor. That looks like the below; I added comments to each change I made.

    pub async fn perform_actions<F, RespMsg>(
        &self,
        fn_processor: F,
        consume_settings: Settings,
    ) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<RespMsg, anyhow::Error>> + Send + Sync + 'static,
        //  The function must be additionally declared Sync                                                ^^^^^^
        RespMsg: Into<Payload> + Send,
    {
        let shared_processor = Arc::new(fn_processor);
        // ^^^ Put the function in an Arc (shared ownership)
        self.queuer
            .consume(
                |params: ConsumeParams| {
                    let shared_processor = shared_processor.clone();
                    // ^^^ Clone the Arc so that the `async move` block can own the clone
                    // and the `|params|` closure still has access to the original.
                    Box::pin(async move {
                        let fn_processor_future = shared_processor(params);

Another possibility is to do

    let fn_processor = &fn_processor;

at the beginning of Syncer::perform_actions, but this requires relaxing the signature of Queuer::consume to allow returning a non-'static future:

async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), Error>
where
    F: Fn(ConsumeParams) -> BoxFuture<'_, Result<(), Error>>,

I don't know whether that 'static requirement is important in context. (This also requires F: Sync on perform_actions, like the Arc solution.)

And yet another alternative is to require F (the type of fn_processor) to be Copy. Function pointers are always Copy, and closures are Copy when they only capture Copy values, so this might actually be a workable constraint. If so, there's no need for F: Sync.

Finally, I think there might be something to say here about the subtleties of |_| async move { ... } vs. move |_| async move { ... } vs. move |_| async { ... }, but I didn't get anywhere thinking about it :‌)

1 Like

I tried sharing ownership of the fn_processor and got this error:

Error:

error: future cannot be sent between threads safely
  --> src/main.rs:65:21
   |
65 | /                     Box::pin(async move {
66 | |                         let fn_processor_future = shared_processor_clone(params);
67 | |                         let fn_process_result = fn_processor_future.await;
68 | |                         match fn_process_result {
...  |
72 | |                         Ok(())
73 | |                     })
   | |______________________^ future created by async block is not `Send`
   |
note: captured value is not `Send`
  --> src/main.rs:66:51
   |
66 |                         let fn_processor_future = shared_processor_clone(params);
   |                                                   ^^^^^^^^^^^^^^^^^^^^^^ has type `Arc<F>` which is not `Send`
   = note: required for the cast to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`
help: consider further restricting this bound
   |
57 |             + Send + std::marker::Sync,
   |                    ^^^^^^^^^^^^^^^^^^^

error: future cannot be sent between threads safely
  --> src/main.rs:65:21
   |
65 | /                     Box::pin(async move {
66 | |                         let fn_processor_future = shared_processor_clone(params);
67 | |                         let fn_process_result = fn_processor_future.await;
68 | |                         match fn_process_result {
...  |
72 | |                         Ok(())
73 | |                     })
   | |______________________^ future created by async block is not `Send`
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Output = Result<RespMsg, anyhow::Error>>`
note: future is not `Send` as it awaits another future which is not `Send`
  --> src/main.rs:67:49
   |
67 |                         let fn_process_result = fn_processor_future.await;
   |                                                 ^^^^^^^^^^^^^^^^^^^ await occurs here on type `Pin<Box<dyn futures::Future<Output = Result<RespMsg, anyhow::Error>>>>`, which is not `Send`
   = note: required for the cast to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`

error[E0308]: mismatched types
  --> src/main.rs:96:17
   |
96 | /                 Box::new(async move {
97 | |                     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
98 | |                     Ok(Payload::default())
99 | |                 })
   | |__________________^ expected struct `Pin`, found struct `Box`
   |
   = note: expected struct `Pin<Box<(dyn futures::Future<Output = Result<_, anyhow::Error>> + 'static)>>`
              found struct `Box<impl futures::Future>`
   = help: use `Box::pin`

For more information about this error, try `rustc --explain E0308`.
error: could not compile `asynccallbacks` due to 3 previous errors

Code:

#[derive(Default)]
struct Settings {
    field_a: String,
}

#[derive(Default)]
struct ConsumeParams {
    id: String,
}
#[derive(Default)]
struct Payload {
    id: String,
}

struct Queuer {}

impl Queuer {
    fn new() -> Self {
        Self {}
    }

    async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>>,
    {
        let mut counter = 0;
        loop {
            if counter > 0 {
                return Ok(());
            }
            counter = counter + 1;
            match fn_consume(ConsumeParams::default()).await {
                Ok(_) => println!("success"),
                Err(err) => return Err(err),
            }
        }
    }
}

struct Syncer {
    queuer: Queuer,
}

impl Syncer {
    fn new() -> Self {
        Self {
            queuer: Queuer::new(),
        }
    }
    pub async fn perform_actions<F, RespMsg>(
        &self,
        fn_processor: F,
        consume_settings: Settings,
    ) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> std::pin::Pin<Box<dyn futures::future::Future<Output = Result<RespMsg, anyhow::Error>>>>
            + Send,
        RespMsg: Into<Payload> + Send,
    {
        let shared_processor = std::sync::Arc::new(fn_processor);
        self.queuer
            .consume(
                |params: ConsumeParams| {
                    let shared_processor_clone = shared_processor.clone();
                    Box::pin(async move {
                        let fn_processor_future = shared_processor_clone(params);
                        let fn_process_result = fn_processor_future.await;
                        match fn_process_result {
                            Ok(_) => println!("Success"),
                            Err(err) => return Err(err),
                        };
                        Ok(())
                    })
                },
                consume_settings,
            )
            .await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let queuer = Queuer::new();
    queuer
        .consume(
            |_: ConsumeParams| Box::pin(async move { panic!("testing consume") }),
            Settings::default(),
        )
        .await
        .unwrap();
    let syncer = Syncer::new();
    syncer
        .perform_actions(
            |_: ConsumeParams| {
                Box::new(async move {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    Ok(Payload::default())
                })
            },
            Settings::default(),
        )
        .await
        .unwrap();
    println!("Hello, world!");
}

Between the version you originally posted + my changes, and the one you posted now, you made another change: from BoxFuture to Pin<Box<dyn Future.... In doing that you lost the bounds that BoxFuture provides:

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a, Global>>;

Compared to BoxFuture<'static, Result<RespMsg, anyhow::Error>>, your std::pin::Pin<Box<dyn futures::future::Future<Output = Result<RespMsg, anyhow::Error>>>> is missing Send (which is needed for the future perform_actions passes to consume to be usable). Also, you had but removed F: 'static, which is needed, and I added F: Sync which is also needed (in order to share it with Arc).

This bound should work (but so will the version I posted earlier written with BoxFuture):

where
    F: Fn(ConsumeParams) -> Pin<Box<dyn Future<Output = Result<RespMsg, anyhow::Error>> + Send>>
            + Send + Sync + 'static

When I say “is needed”, I mean needed for the approach of using Arc to solve the original problem. If you use @cole-miller's approach then you will not need F: 'static but you will still need F: Sync (because it's potentially being used from multiple threads by the async executor).

I didn't realize I had dropped out the BoxedFuture requirement. Thank you for pointing that out. I am now able to get the code to compile. However, when I tried to implement the fix in my real application I am running into another problem because inside the async block, I need to invoke a method on self and that is causing my lifetime issues. I have updated the example to capture the problem.

Error:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:53:9
   |
53 |           &self,
   |           ^^^^^
   |           |
   |           this data with an anonymous lifetime `'_`...
   |           ...is captured here...
...
66 | /                     Box::pin(async move {
67 | |                         self.util_function().await;
68 | |                         let fn_processor_future = shared_processor_clone(params);
69 | |                         let fn_process_result = fn_processor_future.await;
...  |
75 | |                         Ok(())
76 | |                     })
   | |______________________- ...and is required to live as long as `'static` here

Code

use futures::FutureExt;

#[derive(Default)]
struct Settings {
    field_a: String,
}

#[derive(Default)]
struct ConsumeParams {
    id: String,
}
#[derive(Default)]
struct Payload {
    id: String,
}

struct Queuer {}

impl Queuer {
    fn new() -> Self {
        Self {}
    }

    async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>>,
    {
        let mut counter = 0;
        loop {
            if counter > 0 {
                return Ok(());
            }
            counter = counter + 1;
            match fn_consume(ConsumeParams::default()).await {
                Ok(_) => println!("success"),
                Err(err) => return Err(err),
            }
        }
    }
}

struct Syncer {
    queuer: Queuer,
}

impl Syncer {
    fn new() -> Self {
        Self {
            queuer: Queuer::new(),
        }
    }
    pub async fn perform_actions<F, RespMsg>(
        &self,
        fn_processor: F,
        consume_settings: Settings,
    ) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<RespMsg, anyhow::Error>> + Send + Sync + 'static,
        RespMsg: Into<Payload> + Send,
    {
        let shared_processor = std::sync::Arc::new(fn_processor);
        self.queuer
            .consume(
                |params: ConsumeParams| {
                    let shared_processor_clone = shared_processor.clone();
                    Box::pin(async move {
                        self.util_function().await;
                        let fn_processor_future = shared_processor_clone(params);
                        let fn_process_result = fn_processor_future.await;
                        match fn_process_result {
                            Ok(_) => println!("Success"),
                            Err(err) => return Err(err),
                        };
                        self.util_function().await;
                        Ok(())
                    })
                },
                consume_settings,
            )
            .await?;
        Ok(())
    }

    async fn util_function(&self) -> Result<(), anyhow::Error> {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let queuer = Queuer::new();
    queuer
        .consume(
            |_: ConsumeParams| Box::pin(async move { panic!("testing consume") }),
            Settings::default(),
        )
        .await
        .unwrap();
    let syncer = Syncer::new();
    syncer
        .perform_actions(
            |_: ConsumeParams| {
                async move {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    Ok(Payload::default())
                }.boxed()
            },
            Settings::default(),
        )
        .await
        .unwrap();
    println!("Hello, world!");
}

An update, I was able to get the sample application code to compile by cloning self - see below. Thanks for everyone's help.

use futures::FutureExt;

#[derive(Default)]
struct Settings {
    field_a: String,
}

#[derive(Default)]
struct ConsumeParams {
    id: String,
}
#[derive(Default)]
struct Payload {
    id: String,
}

#[derive(Clone)]
struct Queuer {}

impl Queuer {
    fn new() -> Self {
        Self {}
    }

    async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>>,
    {
        let mut counter = 0;
        loop {
            if counter > 0 {
                return Ok(());
            }
            counter = counter + 1;
            match fn_consume(ConsumeParams::default()).await {
                Ok(_) => println!("success"),
                Err(err) => return Err(err),
            }
        }
    }
}

#[derive(Clone)]
struct Syncer {
    queuer: Queuer,
}

impl Syncer {
    fn new() -> Self {
        Self {
            queuer: Queuer::new(),
        }
    }
    pub async fn perform_actions<F, RespMsg>(
        &self,
        fn_processor: F,
        consume_settings: Settings,
    ) -> Result<(), anyhow::Error>
    where
        F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<RespMsg, anyhow::Error>> + Send + Sync + 'static,
        RespMsg: Into<Payload> + Send,
    {
        let shared_processor = std::sync::Arc::new(fn_processor);
        let clone_queuer = self.queuer.clone();
        clone_queuer
            .consume(
                |params: ConsumeParams| {
                    let shared_processor_clone = shared_processor.clone();
                    let syncer_clone = self.clone();
                    Box::pin(async move {
                        syncer_clone.util_function().await.unwrap();
                        let fn_processor_future = shared_processor_clone(params);
                        let fn_process_result = fn_processor_future.await;
                        match fn_process_result {
                            Ok(_) => println!("Success"),
                            Err(err) => return Err(err),
                        };
                        syncer_clone.util_function().await.unwrap();
                        Ok(())
                    })
                },
                consume_settings,
            )
            .await?;
        Ok(())
    }

    async fn util_function(&self) -> Result<(), anyhow::Error> {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let queuer = Queuer::new();
    queuer
        .consume(
            |_: ConsumeParams| Box::pin(async move { panic!("testing consume") }),
            Settings::default(),
        )
        .await
        .unwrap();
    let syncer = Syncer::new();
    syncer
        .perform_actions(
            |_: ConsumeParams| {
                async move {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    Ok(Payload::default())
                }.boxed()
            },
            Settings::default(),
        )
        .await
        .unwrap();
    println!("Hello, world!");
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.