Help: Simple actor library

Hey all, new to rust :slight_smile: Trying to play with generics and thought to write a simple actor library just to figure out how things work, but I got stuck.

So here's the code I managed to write so far, but it is wrong (made comments in code).

// this needs to be cloneable via Arc and should use
// channel for communication rather than actor reference
struct Address<A: Actor> {
    act: A, // <-- this is wrong
}

impl<A: Actor + Sized> Address<A> {
    // this should not be &mut self, but &self
    pub fn send<M>(&mut self, msg: M) -> <M as Message>::Return
    where
        M: Message + Send + 'static,
        A: Handler<M>,
    {
        self.act.handle(msg) // <-- also this is wrong
    }
}

trait Actor
where
    Self: Sized,
{
    fn start(self) -> Address<Self> {
        Address { act: self }
    }
}

trait Message {
    type Return;
}

trait Handler<M>
where
    Self: Actor,
    M: Message,
{
    fn handle(&mut self, message: M) -> M::Return;
}

fn main() {
    struct MyActor;
    struct Ping;
    struct Pong;

    impl Actor for MyActor {}

    impl Message for Ping {
        type Return = usize;
    }

    impl Message for Pong {
        type Return = usize;
    }

    impl Handler<Ping> for MyActor {
        fn handle(&mut self, _message: Ping) -> <Ping as Message>::Return {
            123usize
        }
    }

    impl Handler<Pong> for MyActor {
        fn handle(&mut self, _message: Pong) -> <Pong as Message>::Return {
            321usize
        }
    }

    let mut act = MyActor.start();

    assert!(act.send(Ping) == 123);
    assert!(act.send(Pong) == 321);

    println!("OK");
}

I've seen actor libraries are using channels to communicate, but the one I wrote here directly references the Actor inside Address and calls the handle method directly, which is wrong.

Can someone help me out with how to make this work with channels (mpsc should be fine)?

Thank you!

Check out "Actors with Tokio" for a great overview of the pattern in rust. Here's an example of what I usually do:

use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::oneshot;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let a = spawn_actor();
    for _ in 0..=5 {
        println!("{}", a.ping().await);
    }
}

fn spawn_actor() -> ActorHandle {
    let (tx, mut rx) = unbounded_channel::<Command>();
    tokio::spawn(async move {
        let mut i = 47;
        while let Some(Command::Ping(pong)) = rx.recv().await {
            pong.send(format!("pong {i}")).unwrap();
            i -= 1;
        }
    });
    ActorHandle { tx }
}

#[derive(Debug)]
enum Command {
    Ping(oneshot::Sender<String>),
}

struct ActorHandle {
    tx: UnboundedSender<Command>,
}

impl ActorHandle {
    async fn ping(&self) -> String {
        let (tx_once, rx_once) = oneshot::channel();
        self.tx.send(Command::Ping(tx_once)).unwrap();
        rx_once.await.unwrap()
    }
}

You will need the following in Cargo.toml:

[dependencies]
tokio = { version = "*", features = ["rt", "macros", "sync"] }

I usually extend the pattern by writing:

fn spawn_actor(alert: UnboundedSender<Alert>)

Then I can clone a send channel for each actor, and in the main function I'll spawn a background task to iterate through the aggregated alert messages. That way, actors can send alerts back to the main program whenever they need to. For example, a keyboard-input actor might need to communicate "Quit the program" (Alert::Quit) if the user presses the escape key.

2 Likes

Thanks for the reply! I've seen actors with tokio, and basically trying to do the same thing, but with less boilerplate code. That's why I'm trying to introduce traits/structs to manage all that (less boilerplate code) so I'm curious about how to do that :slight_smile:

I'm the author of "Actors with Tokio". I wrote the article because I find that all existing attempts to write such an actor trait have failed in one way or another.

2 Likes

Thanks for the reply, had no idea:) Big fan btw. I'll mark toad's answer as a solution then ;p

Out of curiosity, how bad this implementation is (using Arc/Async Mutex instead of channels)?:

use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;

struct Addr<A> {
    inner: Arc<Mutex<A>>,
}

impl<A> Addr<A> {
    async fn send<M>(&mut self, message: M) -> M::Return
    where
        A: Handler<M>,
        M: Message,
    {
        self.inner.lock().await.handle(message).await
    }
}

impl<A> Clone for Addr<A> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

trait Actor
where
    Self: Sized,
{
    fn create(self) -> Addr<Self> {
        Addr {
            inner: Arc::new(Mutex::new(self)),
        }
    }
}

trait Message {
    type Return;
}

#[async_trait]
trait Handler<M>
where
    Self: Actor,
    M: Message,
{
    async fn handle(&mut self, message: M) -> M::Return;
}

A Mutex is fine for some use-cases. However it can have some interesting behavior when cancelation gets into the picture.

Thank you! It's all clear now.

Another useful thing that mutexes can't do is run things in the background. For example, it is sometimes useful to have the actor do something when:

  1. it receives a message, or
  2. a timer triggers.

This can only be done with an actual background task.

I did a basic performance tests with Arc/Mutex vs actor frameworks, the Arc/Mutex was much faster (like 10x more) than those. I guess it's because of the dynamic dispatches (they use Box dyn for every message).

So if you have so much performance decrease, why is it worth it?

The performance difference has to do with latency. The throughput of the solutions should be comparable. The reason is that there's some delay between sending the message and the message starting to be processed.

Thank you so much, things are more clear now. I'll go and study more.

Thanks again!