I am trying to write a Rust program that has multiple levels of async Fn callbacks. The real code in my application is substantially more complicated but I have been able to duplicate my issue in the code below. I have spent multiple hours trying to find the right combination to work including wrapping Fn
in Arc
, etc... and haven't been able to find a workable solution yet.
Error:
error[E0507]: cannot move out of `fn_processor`, a captured variable in an `Fn` closure
--> src/main.rs:62:41
|
52 | fn_processor: F,
| ------------ captured outer variable
...
62 | Box::pin(async move {
| _________________________________________^
63 | | let fn_processor_future = fn_processor(params);
| | ------------
| | |
| | move occurs because `fn_processor` has type `F`, which does not implement the `Copy` trait
| | move occurs due to use in generator
64 | | let fn_process_result = fn_processor_future.await;
65 | | match fn_process_result {
... |
69 | | Ok(())
70 | | })
| |_____________________^ move out of `fn_processor` occurs here
For more information about this error, try `rustc --explain E0507`.
error: could not compile `asynccallbacks` due to previous error
main.rs
#[derive(Default)]
struct Settings {
field_a: String,
}
#[derive(Default)]
struct ConsumeParams {
id: String,
}
#[derive(Default)]
struct Payload {
id: String,
}
struct Queuer {}
impl Queuer {
fn new() -> Self {
Self {}
}
async fn consume<F>(&self, fn_consume: F, _: Settings) -> Result<(), anyhow::Error>
where
F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<(), anyhow::Error>>,
{
let mut counter = 0;
loop {
if counter > 0 {
return Ok(());
}
counter = counter + 1;
match fn_consume(ConsumeParams::default()).await {
Ok(_) => println!("success"),
Err(err) => return Err(err),
}
}
}
}
struct Syncer {
queuer: Queuer,
}
impl Syncer {
fn new() -> Self {
Self {
queuer: Queuer::new(),
}
}
pub async fn perform_actions<F, RespMsg>(
&self,
fn_processor: F,
consume_settings: Settings,
) -> Result<(), anyhow::Error>
where
F: Fn(ConsumeParams) -> futures::future::BoxFuture<'static, Result<RespMsg, anyhow::Error>> + Send + 'static,
RespMsg: Into<Payload> + Send,
{
self.queuer
.consume(
|params: ConsumeParams| {
Box::pin(async move {
let fn_processor_future = fn_processor(params);
let fn_process_result = fn_processor_future.await;
match fn_process_result {
Ok(_) => println!("Success"),
Err(err) => return Err(err),
};
Ok(())
})
},
consume_settings,
)
.await?;
Ok(())
}
}
#[tokio::main]
async fn main() {
let queuer = Queuer::new();
queuer
.consume(
|_: ConsumeParams| Box::pin(async move { panic!("testing consume") }),
Settings::default(),
)
.await
.unwrap();
let syncer = Syncer::new();
syncer
.perform_actions(
|_: ConsumeParams| {
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(Payload::default())
})
},
Settings::default(),
)
.await
.unwrap();
println!("Hello, world!");
}
Cargo.toml
[package]
name = "asynccallbacks"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.41"
futures = "0.3.17"
tokio = { version = "1", features = ["full"] }