Hello
I have hit a wall in terms of my Rust knowledge, and after spending two days of researching with little progress, I am asking for some guidance to solve this issue (and learn in the process).
Question
How can I pass any async function that takes the same three arguments, to my ActorHandle new() method, in order for the underlying Actor to run it in a spawned Tokio thread?
Overview
- I am currently working on implementing an actor to run an async function on an interval.
- The actor is based off of this fantastic article Actors with Tokio – Alice Ryhl (thank you Alice!).
- The async function passed to the actor should be run on a new tokio thread as a background task.
- The async function takes three arguments: a reference to a Client struct, a mutable reference to a State struct, and a usize.
- The Actor gets spawned via an ActorHandle struct, which is public facing to control the underlying Actor.
Info
Demo Code
I tried my best to recreate a demo of the code, since I cannot show the real code:
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use futures::{future::BoxFuture, Future};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
// Status that gets sent out by the Actor.
#[derive(Clone)]
pub enum ActorStatus {
Started,
Stopped,
Running,
Aborted,
}
// Responses from the actor after we send it a message.
pub enum ActorResponseMessage {
ProcessStarted,
ProcessStopped,
AlreadyRunning,
AlreadyStopped,
ProcessAborted,
}
// Messages we can send to the actor.
pub enum ActorMessage {
Start {
respond_to: oneshot::Sender<ActorResponseMessage>,
},
Stop {
respond_to: oneshot::Sender<ActorResponseMessage>,
},
UrgentAbort {
respond_to: oneshot::Sender<ActorResponseMessage>,
},
}
// Config to pass to actor through ActorHandle
pub struct ActorConfig {
pub start_time: chrono::NaiveTime,
pub start_day: chrono::Weekday,
}
// A demo state that mimics the functionality of the original codebase.
pub struct DemoFunctionState {
pub count: i32,
}
// this might be wrong?
type DemoFn = Box<
dyn Fn(&DemoClient, &mut DemoFunctionState, usize) -> BoxFuture<'static, ()>
+ Send
+ Sync
+ 'static,
>;
pub struct Actor {
// the fn that I need to run in a loop on a seperate thread
demo_fn: DemoFn,
// an atomic value to control the loop.
event_loop_state: Arc<AtomicBool>,
// Message receiver for starting/stopping etc.
pub message_receiver: mpsc::Receiver<ActorMessage>,
}
// For demo to show what client would look like.
#[derive(Default)]
pub struct DemoClient {
pub foo: String,
}
impl Actor {
pub fn new<F>(
demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F,
message_receiver: mpsc::Receiver<ActorMessage>,
status_update_tx: broadcast::Sender<ActorStatus>,
) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
let event_loop_state = Arc::new(AtomicBool::new(true));
Actor {
// tried boxing the demo_fn, is this correct?
demo_fn: Box::new(move |client, state, size| Box::pin(demo_fn(client, state, size))),
event_loop_state,
message_receiver,
}
}
}
// We process the passed async fn in an interval, mutating variables outside of the event loop.
pub async fn call_actor_event_loop(event_loop_state: Arc<AtomicBool>, demo_fn: DemoFn) {
let local_event_loop_state = event_loop_state.clone();
// Background process event loop here
tokio::task::spawn(async move {
let mut state = DemoFunctionState { count: 0 };
let client = DemoClient::default();
let mut stream_interval =
IntervalStream::new(tokio::time::interval(Duration::from_secs(60)));
while let Some(_ts) = stream_interval.next().await {
if local_event_loop_state.load(std::sync::atomic::Ordering::Acquire) {
// Here is where I need my async fn to be passed, taking in a mutable variable;
demo_fn(&client, &mut state, 1).await;
}
}
});
}
// Public facing ActorHandle to interact with the private actor.
pub struct ActorHandle;
impl ActorHandle {
pub fn new<F>(demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
let (_, message_recv) = mpsc::channel::<ActorMessage>(128);
let (status_sender, _) = broadcast::channel::<ActorStatus>(128);
let mut actor = Actor::new(demo_fn, message_recv, status_sender);
// here is where I pass in my function down to the actor.
tokio::spawn(run_my_actor(actor));
ActorHandle {
/* message_sender etc would be here */
}
}
}
async fn run_my_actor(mut actor: Actor) {
call_actor_event_loop(actor.event_loop_state, actor.demo_fn).await;
while let Some(_msg) = actor.message_receiver.recv().await {
/* handle actor messages here ... */
}
}
// This is a demo function to show what I am trying to pass in.
// Any function we want to pass in will take the three same arguments.
pub async fn demo_function(_client: &DemoClient, _state: &mut DemoFunctionState, _demo_num: usize) {
unimplemented!("some async functionality here");
}
// ... so we can pass in something like this
pub async fn demo_function_two(
_client: &DemoClient,
_state: &mut DemoFunctionState,
_demo_num: usize,
) {
unimplemented!("The logic in here would be different");
}
#[tokio::main]
async fn main() {
// Get an error here
let actor = ActorHandle::new(demo_function);
let actor_two = ActorHandle::new(demo_function_two);
}
Currently, the error I see when I pass in the async function is:
error[E0308]: mismatched types
--> src/main.rs:159:34
|
159 | let actor = ActorHandle::new(demo_function);
| ---------------- ^^^^^^^^^^^^^ one type is more general than the other
| |
| arguments to this function are incorrect
|
= note: expected fn pointer `for<'a, 'b> fn(&'a DemoClient, &'b mut DemoFunctionState, _) -> _`
found fn item `for<'a, 'b> fn(&'a DemoClient, &'b mut DemoFunctionState, _) -> impl futures::Future<Output = ()> {demo_function}`
= note: when the arguments and return types match, functions can be coerced to function pointers
note: associated function defined here
--> src/main.rs:118:12
|
118 | pub fn new<F>(demo_fn: fn(&DemoClient, &mut DemoFunctionState, usize) -> F) -> Self
| ^^^ ------------------------------------------------------------
I am not sure how to solve that, or what is causing the mismatched types.
Resources that I have looked at that didn't quite work for this (but were informative):
- how can one await a result of a boxed future?
- how to pass an async function as a parameter to another function
- how to pass async function which accepts parameter with lifetime to other functions?
Thank you in advance for any help, tips, or guidance, I am very open to learning and making mistakes. Let me know if there is anything I can clarify!