Utlity of async for actors / tokio actors?

I wanted to try out using async for actors. I read Actors with Tokio – Alice Ryhl which seemed sensible and started coding. Except, I need actors to call other actors. Fine, you get something like:

struct MyActor {
    // ...
}

impl MyActor {
    async fn handle_message(&mut self, msg: Request) {
        match msg.payload {
            Request::SomeRequest(reply_channel) => {
                let sub_response = self.other_actor_handle.slow_call().await;
                self.some_state.update(sub_response);
                reply_channel.send(...);
            }
            // ...
        }
    }
}

async fn run_my_actor(mut actor: MyActor) {
    while let Some(msg) = actor.receiver.recv().await {
        actor.handle_message(msg);
    }
}

Except that doesn't actually work. Because now MyActor is blocked on the slow_call and can't service other requests while waiting. So you need to throw out the nice call API on the actor handles and do full on message passing between actors. And which point: why even bother with async in the first place?

My use case looks like the following diagram:

It seems the the actor concept as presented in that blog post doesn't fit at all. I need to be able to service other requests while the slow operation is in progress. What I would have done before would be to use pure message passing with sequence numbers in the messages, but is there a better option here? One that lets me make use of async in Rust to define my state machine rather than having to manually split it up into pieces with message handling? Because right now I don't see the utility of async for actors.

Keep in mind I do need &mut self and can't really hold mutexes over await points (and I don't want to pay the performance or complexity overhead of mutexes in the first place).

In order to avoid XY-problem: the slow operation is downloading debug info from a debuginfod server (or loading it from cache) and then parsing that debug info using a rayon thread pool. I also have operations where I may have many sub-requests for a given parent request. The inputs are from the debugger frontend and I don't want to block user interactions just because it might be doing a slow operation in the background. Many operations can proceed just fine in my debugger even though it is doing slow things in the background. (I'm making a tracing debugger that never pauses the debugee, to be able to debug programs with strict timing requirements, these tend to fail with gdb or lldb. This is possible thanks to the power of eBPF.)

It's entirely normal that actors process messages one at the time. That's what an actor is! If you want something more complex, then it's not a standard actor.

In your case, perhaps the subrequest should forward the top-level response channel to the sub-actor, and the sub-actor replies directly to the original sender? That way, the top-level actor does not need to block.

2 Likes

Fair enough. At my dayjob I have (in C++) used the pattern I described, where each task/object can listen to an event bus, timers, etc with callbacks to implement those. The code used inversion of control, so each object did not have its own event loop.

I don't know that this pattern has a name, or if there is any framework for it in Rust, but I would be interested in learning about this. I imagine an issue in Rust would be around ownership, the event loop really has to own the objects for this to work.

As my background is in robotics / machine control, I would say that the pattern I'm describing is more common than the actor patten you are describing. You really never want an actor blocked and unable to process the full multitude of events that come in.

The top level actor does need to update it's state based on the sub-response though. In this case to incorporate a database of new symbols/source files/etc that exist in the debugee. So it doesn't really work. There is also fan out going on where multiple debug files can be downloaded concurrently, and processed in parallel. So there isn't even a 1:1 correspondence between subrequests and parent requests. It will depend on how many shared libraries we find when reading /proc/<pid>/maps

@alice I tried to forge ahead with a design to figure out what I could and couldn't get working.
And I'm not sure if I'm missing something obvious, or if this is just not possible to do sanely in Rust.

struct DebugInfoStore {
    dwarf_map: HashMap<ElfPath, BinInfo>,
    // other fields...
}

struct BinInfo {
    // Various fields, doesn't matter for this question, but suffice
    // this say this is large and not Clone. It is in fact
    // self-referencing, holding both memory mapped files and references
    // into those mappings using ouroboros.
}

impl DebugInfoStore {
    // This function on it's own works fine.
    pub async fn async_load_many(&mut self, binaries: impl IntoIterator<Item = ElfPath> + Send + 'static) -> Result<(), eyre::Error> {
        let debuginfod_client = self.debuginfod_client.clone();
        let loaded = execute_on_rayon_pool(move || -> Result<Vec<_>, eyre::Error> {
            // Download, memory map and parse debug info for binaries in parallel,
            // with logic to limit concurrency to debuginfod servers if needed.
        })
        .await?;

        for (binary_id, bin_info) in loaded {
            // Register with self
            self.dwarf_map.insert(binary_id, bin_info).map_err(|dup| {
                eyre::eyre!(
                    "Binary already loaded for {}: {:?}",
                    binary_id.as_str(),
                    dup
                )
            })?;
        }
        Ok(())
    }
}


/// A tokio actor that coordinates everything.
struct Coordinator {
    user_receiver: mpsc::Receiver<Request>,
    user_sender: mpsc::Sender<EventOrResponse>,
    debug_info: DebugInfoStore,
    processes: HashMap<Pid, ProcProcess>,
    // other fields...
}

impl Coordinator {
    async fn handle_message(&mut self, msg: &messages::Request) -> Result<(), eyre::Error> {
        // Handle the message
        match &msg.payload {
            messages::RequestPayload::Attach(pid) => {
                let process =
                    ProcProcess::from_pid(*pid).wrap_err("Failed to get process from PID")?;
                let binaries = process.maps().wrap_err("Failed to get process maps")?;
                /// Oops, now we can't handle other requests until all the debug info is loaded.
                self.debug_info.async_load_many(binaries).await?;
                self.processes.insert(*pid, process);
            }
            // ...
        }
    }
}

async fn run_coordinator(mut actor: Coordinator) {
    while let Some(msg) = actor.user_receiver.recv().await {
        actor.handle_message(msg).await;
    }
}

Given this design, and that I don't want to block the coordinator while debug info is
being loaded it seems I have to loose the encapsulation of BinInfo inside DebugInfoStore.

Currently BinInfo is private to that module, but the only solution I can think of is split
loading into two phases. One that fires of the background job and sends a message to the coordinator.
And a second one that is called from the coordinator with that response. Which seems quite ugly.

I tried using Rc<RefCell<Coordinator>> and LocalSet (I'm fine with Coordinator running
inside a single thread with multiple tasks, and farming out the heavy work to other tasks
running on other runtimes or to rayon) but could not come up with a working design.

What really annoys me is that this would be easy in C++, but with Rust's ownership model
and the general lack of cancellation safety of async Rust, I can't see how to make it work.
In particular I would probably have used some form of callback.

I could also have projected a shared_ptr<Coordinator> into a shared_ptr<DebugInfoStore> in
C++, but I don't think Rc or Arc can do that, since the reference counter is always stored together
with the data in Rust (unlike in C++).

There must be a better way to do this in Rust?

Hi, I am not an expert, but can't you just give an/the inbox channel of MyActor to your sub actor, for example like self.other_actor_handle.slow_call(my_actor_send.clone()).await? Your run function could then contain a select and forward the responses.

For my understanding of actors, this seems not correct. For me an actor is the owner of a ressource and if your ressource is the bottleneck your actor should/can block. It's then the obligation of the caller to send/call the actor with a timeout.

1 Like

In that case what I want is clearly not an actor, but something adjecent, that I don't know the word for. I would like to know the correct terminology here.