use tokio::sync::{mpsc, oneshot};
type Responder<T> = oneshot::Sender<T>;
pub struct Actor<T, D> {
pub data: D,
receiver: mpsc::Receiver<T>,
}
impl<T, D> Actor<T, D> {
fn new(receiver: mpsc::Receiver<T>, data: D) -> Self {
Self {
receiver,
data
}
}
}
#[derive(Clone)]
pub struct ActorHandle<T> {
sender: mpsc::Sender<T>
}
impl<T> ActorHandle<T>
where
T : std::marker::Send + 'static
{
pub async fn new<F, Fut, D>(buffer_size: usize, handle_actor_msg_fn: F, data: D) -> Self
where
F: FnMut(T, &mut D) -> Fut + std::marker::Send + 'static,
Fut: std::future::Future<Output=bool>,
D : std::marker::Send + 'static
{
let (sender, receiver) = mpsc::channel(buffer_size);
let actor = Actor::new(receiver, data);
tokio::spawn(async move {run_actor::<T, D, F, Fut>(actor, handle_actor_msg_fn).await});
Self {
sender
}
}
}
async fn run_actor<T, D, F, Fut>(mut actor: Actor<T, D>, mut handle_actor_msg_fn: F)
where
F: FnMut(T, &mut D) -> Fut,
Fut: std::future::Future<Output=bool>
{
tracing::info!("Starting comm actor");
while let Some(msg) = actor.receiver.recv().await {
if !handle_actor_msg_fn(msg, &mut actor.data).await {
break;
}
}
tracing::info!("Closing comm actor");
}
#[derive(Clone)]
pub enum ActorMessage {
}
struct Data {
}
#[derive(Clone)]
pub struct Comm {
pub actor: ActorHandle<ActorMessage>,
}
impl Comm {
pub async fn new(actor_buffer_size: usize) -> Self {
Self {
actor: ActorHandle::new(actor_buffer_size, handle_msg, Data {}).await
}
}
}
async fn handle_msg(msg: ActorMessage, data: &mut Data) -> bool {
false
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
}
[package]
name = "main-test"
version = "0.1.0"
edition = "2021"
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
error[E0308]: mismatched types
--> tests/main-test/src/main.rs:74:20
|
74 | actor: ActorHandle::new(actor_buffer_size, handle_msg, Data {}).await
| ^^^^^^^^^^^^^^^^ one type is more general than the other
|
= note: expected trait `for<'a> <for<'a> fn(ActorMessage, &'a mut Data) -> impl Future<Output = bool> {handle_msg} as FnMut<(ActorMessage, &'a mut Data)>>`
found trait `for<'a> <for<'a> fn(ActorMessage, &'a mut Data) -> impl Future<Output = bool> {handle_msg} as FnMut<(ActorMessage, &'a mut Data)>>`
The idea is to pass a handling function into the actor in order to define the handling code. For some reason I don't understand, it does not compile and the error does not make sense to me (expected trait and found are the same). What am I missing ? Thanks!