How do I pass an async function to a Tokio thread from a struct method?

Hello :slight_smile:
I have hit a wall in terms of my Rust knowledge, and after spending two days of researching with little progress, I am asking for some guidance to solve this issue (and learn in the process).

Question
How can I pass any async function that takes the same three arguments, to my ActorHandle new() method, in order for the underlying Actor to run it in a spawned Tokio thread?

Overview

  • I am currently working on implementing an actor to run an async function on an interval.
  • The actor is based off of this fantastic article Actors with Tokio – Alice Ryhl (thank you Alice!).
  • The async function passed to the actor should be run on a new tokio thread as a background task.
  • The async function takes three arguments: a reference to a Client struct, a mutable reference to a State struct, and a usize.
  • The Actor gets spawned via an ActorHandle struct, which is public facing to control the underlying Actor.

Info

Demo Code
I tried my best to recreate a demo of the code, since I cannot show the real code:

use std::{
    sync::{atomic::AtomicBool, Arc},
    time::Duration,
};

use futures::{future::BoxFuture, Future};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::{wrappers::IntervalStream, StreamExt};

// Status that gets sent out by the Actor.
#[derive(Clone)]
pub enum ActorStatus {
    Started,
    Stopped,
    Running,
    Aborted,
}

// Responses from the actor after we send it a message.
pub enum ActorResponseMessage {
    ProcessStarted,
    ProcessStopped,
    AlreadyRunning,
    AlreadyStopped,
    ProcessAborted,
}

// Messages we can send to the actor.
pub enum ActorMessage {
    Start {
        respond_to: oneshot::Sender<ActorResponseMessage>,
    },
    Stop {
        respond_to: oneshot::Sender<ActorResponseMessage>,
    },
    UrgentAbort {
        respond_to: oneshot::Sender<ActorResponseMessage>,
    },
}

// Config to pass to actor through ActorHandle
pub struct ActorConfig {
    pub start_time: chrono::NaiveTime,
    pub start_day: chrono::Weekday,
}

// A demo state that mimics the functionality of the original codebase.
pub struct DemoFunctionState {
    pub count: i32,
}

// this might be wrong?
type DemoFn = Box<
    dyn Fn(&DemoClient, &mut DemoFunctionState, usize) -> BoxFuture<'static, ()>
        + Send
        + Sync
        + 'static,
>;

pub struct Actor {
    // the fn that I need to run in a loop on a seperate thread
    demo_fn: DemoFn,
    // an atomic value to control the loop.
    event_loop_state: Arc<AtomicBool>,
    // Message receiver for starting/stopping etc.
    pub message_receiver: mpsc::Receiver<ActorMessage>,
}

// For demo to show what client would look like.
#[derive(Default)]
pub struct DemoClient {
    pub foo: String,
}

impl Actor {
    pub fn new<F>(
        demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F,
        message_receiver: mpsc::Receiver<ActorMessage>,
        status_update_tx: broadcast::Sender<ActorStatus>,
    ) -> Self
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let event_loop_state = Arc::new(AtomicBool::new(true));

        Actor {
            // tried boxing the demo_fn, is this correct? 
            demo_fn: Box::new(move |client, state, size| Box::pin(demo_fn(client, state, size))),
            event_loop_state,
            message_receiver,
        }
    }
}

// We process the passed async fn in an interval, mutating variables outside of the event loop.
pub async fn call_actor_event_loop(event_loop_state: Arc<AtomicBool>, demo_fn: DemoFn) {
    let local_event_loop_state = event_loop_state.clone();
    // Background process event loop here
    tokio::task::spawn(async move {
        let mut state = DemoFunctionState { count: 0 };
        let client = DemoClient::default();


        let mut stream_interval =
            IntervalStream::new(tokio::time::interval(Duration::from_secs(60)));

        while let Some(_ts) = stream_interval.next().await {
            if local_event_loop_state.load(std::sync::atomic::Ordering::Acquire) {
                // Here is where I need my async fn to be passed, taking in a mutable variable;
                demo_fn(&client, &mut state, 1).await;
            }
        }
    });
}

// Public facing ActorHandle to interact with the private actor.
pub struct ActorHandle;
impl ActorHandle {
    pub fn new<F>(demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F) -> Self
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let (_, message_recv) = mpsc::channel::<ActorMessage>(128);
        let (status_sender, _) = broadcast::channel::<ActorStatus>(128);
        let mut actor = Actor::new(demo_fn, message_recv, status_sender);
        // here is where I pass in my function down to the actor.
        tokio::spawn(run_my_actor(actor));

        ActorHandle {
            /* message_sender etc would be here */
        }
    }
}

async fn run_my_actor(mut actor: Actor) {
    call_actor_event_loop(actor.event_loop_state, actor.demo_fn).await;
    while let Some(_msg) = actor.message_receiver.recv().await {
        /* handle actor messages here ... */
    }
}

//  This is a demo function to show what I am trying to pass in.
// Any function we want to pass in will take the three same arguments.
pub async fn demo_function(_client: &DemoClient, _state: &mut DemoFunctionState, _demo_num: usize) {
    unimplemented!("some async functionality here");
}

// ... so we can pass in something like this
pub async fn demo_function_two(
    _client: &DemoClient,
    _state: &mut DemoFunctionState,
    _demo_num: usize,
) {
    unimplemented!("The logic in here would be different");
}

#[tokio::main]
async fn main() {
    // Get an error here
    let actor = ActorHandle::new(demo_function);
    let actor_two = ActorHandle::new(demo_function_two);
}

Currently, the error I see when I pass in the async function is:

error[E0308]: mismatched types
   --> src/main.rs:159:34
    |
159 |     let actor = ActorHandle::new(demo_function);
    |                 ---------------- ^^^^^^^^^^^^^ one type is more general than the other
    |                 |
    |                 arguments to this function are incorrect
    |
    = note: expected fn pointer `for<'a, 'b> fn(&'a DemoClient, &'b mut DemoFunctionState, _) -> _`
                  found fn item `for<'a, 'b> fn(&'a DemoClient, &'b mut DemoFunctionState, _) -> impl futures::Future<Output = ()> {demo_function}`
    = note: when the arguments and return types match, functions can be coerced to function pointers
note: associated function defined here
   --> src/main.rs:118:12
    |
118 |     pub fn new<F>(demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F) -> Self
    |            ^^^    ------------------------------------------------------------

I am not sure how to solve that, or what is causing the mismatched types.

Resources that I have looked at that didn't quite work for this (but were informative):

Thank you in advance for any help, tips, or guidance, I am very open to learning and making mistakes. Let me know if there is anything I can clarify!

Have you tried like this?

Link to playground.

Easiest is probably to just accept DemoFn directly.

Disadvantage is that caller needs to box themselves:

I haven't looked at the matter in detail but the error message looks pretty much as if async_fn_traits could also help here.

Sorry for the late response, been off in the world doing stuff. Thank you all for the great input, both ideas led me down the path to somewhat of a solution that I believe I understand what is fully happening (though it might not be the best/cleanest).

I tried moy2010's idea with impl Fn() for the argument and added the trait bounds for the impl Fn, and also wrapped both the DemoClient in Arc and DemoFunctionState in Arc<Mutex>. This seemed to make the compiler happy, and I believe I understand why this satisfied the compiler. Though, I am curious about the performance implications of wrapping DemoClient (which holds a reqwest client + a String identifier that could possibly be converted to something more efficient) in an Arc, as well as wrapping DemoFunctionState in an Arc. Now that it works though, I can optimize.

This is where I ended up:

impl Actor {
    pub fn new<F>(
        demo_fn: impl Fn(Arc<DemoClient>, Arc<Mutex<DemoFunctionState>>) -> F + Send + Sync + 'static,
        state_sender: mpsc::Sender<i32>,
    ) -> Self
    where
        F: Future<Output = ()> + Send + Sync + 'static,
    {
...

Calling the function in the ActorHandle becomes let actor = ActorHandle::new(demo_function, tx);

I am curious if I can clean up the F + Send + Sync + 'static in any way, but for now its working!
Thanks for the help, you all really saved me on this one and I learned a lot.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.