General Async Function Pointer

I'm trying to create a general async function pointer for processing messages, but I haven't had any success so far.

This is my function type:

pub type MessageProcessor = fn(&str, mpsc::Sender<String>);

With an example implementation being:

pub fn process_heartbeat(message: &str, tx_to_send: mpsc::Sender<String>) {
    if message.contains("~h~") {
        let ping = format_ws_ping(message.replace("~h~", "").parse().unwrap());
        tx_to_send.blocking_send(ping).unwrap();
    }
}

In use in a struct value, so I can place these functions inside:

processors: Vec<MessageProcessor>,

I am currently using blocking_send as the function isn't async but want to switch it to just send. However, I'm not sure how to make this change while keeping the function signature the same. Any help or suggestions would be greatly appreciated.

You won't be able to keep it the same because send() is an async function, and you can only call these inside an async function (meaning process_heartbeat() must also become async).

The async version would be:

use futures::future::BoxFuture;

pub type MessageProcessor = fn(&str, mpsc::Sender<String>) -> BoxFuture<'static, ()>;

and each function satisfying this type must not be an async fn but a regular fn which returns Box::pin(async move { ... }). You can't directly use a fn pointer type for async fns because every async fn has a distinct return type (its specific Future implementation), but BoxFuture = Box<dyn Future> erases those distinct types.

You can keep your functions async by adding a second layer of boxing:

use futures::future::BoxFuture;
use std::future::Future;

async fn add(a: i32, b: i32) -> i32 {
    a + b
}

async fn sub(a: i32, b: i32) -> i32 {
    a - b
}

type StoredFn = Box<dyn Fn(i32, i32) -> BoxFuture<'static, i32>>;

fn convert_fn<Fut: Future<Output = i32> + Send + 'static>(
    f: impl Fn(i32, i32) -> Fut + 'static,
) -> StoredFn {
    Box::new(move |a, b| Box::pin(f(a, b)))
}

#[tokio::main]
async fn main() {
    let async_fns: Vec<StoredFn> = vec![convert_fn(add), convert_fn(sub)];
    for f in async_fns.iter() {
        println!("{}", f(1, 2).await);
    }
}
1 Like

I could eliminate the outer box if Rust added a new (4th!) function trait. I'll use Fn2(...) -> ... as a placeholder. Rust gives every function its own zero-sized type. Each of these would implement Fn2, which indicates zero-sized, callable. A closure which captures this would still be convertible to fn.

Thank you for all your help, I was able to implement an async function using @tbfleming's code:

Now I am having trouble with trying to clone the vectors of box traits for movement into an async block:

self.read.take().expect("rx_to_send is None").for_each(
                |message: Result<Message, tokio_tungstenite::tungstenite::Error>| {
                    // Clone the sender
                    let tx_to_send = self.tx_to_send.clone();
                    let processors = self.processors.clone();
                    async move {
                        // Unwrap the message
                        let data = message
                            .expect("Message is an invalid format")
                            .into_text()
                            .expect("Could not turn into text");
                        // Parse the message
                        let parsed_data = parse_ws_packet(&data);

                        // For each parsed message
                        for d in parsed_data {
                            // If the message is a heartbeat, send a heartbeat back
                            for processor in processors.iter() {
                                {
                                    tokio::task::spawn({
                                        let d = d.clone();
                                        let tx_to_send = tx_to_send.clone();
                                        let processor = *processor;
                                        async move {
                                            processor(d.to_string(), tx_to_send.clone()).await;
                                        }
                                    });
                                }
                            }
                        }
                    }
                },
            )

The error says:

error[E0599]: the method `clone` exists for struct `Vec<Box<dyn Fn(String, Sender<String>) -> Pin<Box<dyn Future<Output = ()> + Send>>>>`, but its trait bounds were not satisfied
   --> src/quote/session.rs:263:54
    |
263 |                       let processors = self.processors.clone();
    |                                                        ^^^^^ method cannot be called due to unsatisfied trait bounds
    |
   ::: /home/.../.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:396:1
    |
396 |   pub struct Vec<T, #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global> {
    |   ------------------------------------------------------------------------------------------------ doesn't satisfy `_: Clone`
    |
   ::: /home/.../.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:1
    |
195 | / pub struct Box<
196 | |     T: ?Sized,
197 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
198 | | >(Unique<T>, A);
    | |_- doesn't satisfy `_: Clone`

Any more help would be greatly appreciated.

Unfortunately dyn Fn(...) -> ... + Clone doesn't work since neither is an auto-derived trait. We can fall back from Box<dyn Fn...> to fn..., but this makes convert_fn impossible to define as a function. It can be a macro though:

use futures::future::BoxFuture;

async fn add(a: i32, b: i32) -> i32 {
    a + b
}

async fn sub(a: i32, b: i32) -> i32 {
    a - b
}

type StoredFn = fn(i32, i32) -> BoxFuture<'static, i32>;

macro_rules! convert_fn {
    ($f:expr) => {
        |a, b| Box::pin($f(a, b))
    };
}

#[tokio::main]
async fn main() {
    let async_fns: Vec<StoredFn> = vec![convert_fn!(add), convert_fn!(sub)];
    let cloned = async_fns.clone();
    for f in async_fns.iter() {
        println!("{}", f(1, 2).await);
    }
    for f in cloned.iter() {
        println!("{}", f(1, 2).await);
    }
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.