Using multiple `tokio::sync::mpsc::Receiver` in `tokio::select!`

Hi All,

I am trying to use multiple channel Receivers in a struct. The code looks something like the following -

This does not compile because mpsc:Receiver::recv takes a mutable reference and hence it cannot be used in a tokio::select! arm.

Is there a way to actual work around this? (One option that I can see is take the receivers out of the structure and pass them to the run function). I am wondering if there is something I am obviously missing.

Thanks

use std::sync::Arc;

use tokio::sync::{
    mpsc::{self, Receiver, Sender},
    Mutex,
};

struct Control; // Actual contents omitted for example
struct Data; // Actual contents omitted for example

struct Actor {
    data_rx: Receiver<Data>,
    ctrl_rx: Receiver<Control>,
}

impl Actor {
    async fn run(me: Arc<Mutex<Self>>) {
        let mut actor = me.lock().await;
        loop {
            let _ = tokio::select! {
                Some(ctrl) = (*actor).ctrl_rx.recv() => {
                    eprintln!("ctrl");
                }
                Some(data) = (*actor).data_rx.recv() => {
                    eprintln!("ctrl");
                }
            };
        }
    }
}

struct Manager {
    actor: Arc<Mutex<Actor>>,
    data_tx: Sender<Data>,
    ctrl_tx: Sender<Control>,
}

impl Manager {
    async fn run(&self) {
        let actor = self.actor.clone();
        let actor_task = tokio::spawn(async move { Actor::run(actor).await });

        actor_task.await;
    }
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (data_tx, data_rx) = mpsc::channel::<Data>(10);
    let (ctrl_tx, ctrl_rx) = mpsc::channel::<Control>(10);

    let manager = Manager {
        data_tx,
        ctrl_tx,
        actor: Arc::new(Mutex::new(Actor { data_rx, ctrl_rx })),
    };
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0499]: cannot borrow `actor` as mutable more than once at a time
  --> src/main.rs:24:32
   |
20 |               let _ = tokio::select! {
   |  _____________________-
21 | |                 Some(ctrl) = (*actor).ctrl_rx.recv() => {
   | |                                ----- first mutable borrow occurs here
22 | |                     eprintln!("ctrl");
23 | |                 }
24 | |                 Some(data) = (*actor).data_rx.recv() => {
   | |                                ^^^^^ second mutable borrow occurs here
25 | |                     eprintln!("ctrl");
26 | |                 }
27 | |             };
   | |_____________- first borrow later used here

For more information about this error, try `rustc --explain E0499`.
error: could not compile `playground` due to previous error

Here's one way to do it:


impl Actor {
    async fn run(me: Arc<Mutex<Self>>) {
        // We get the lock guard here
        let mut actor = me.lock().await;
        // We get through deref so we can split borrow
        let actor = &mut *actor;
        // Now the compiler recognize there's no overlap
        loop {
            let _ = tokio::select! {
                Some(ctrl) = actor.ctrl_rx.recv() => {
                    eprintln!("ctrl");
                }
                Some(data) = actor.data_rx.recv() => {
                    eprintln!("ctrl");
                }
            };
        }
    }
}

Putting the stuff owned by the actor behind a mutex is usually counter-productive. I recommend checking out this article on actors.

3 Likes

@alice Thanks! I have read that. The code posted above is a representation of what I am trying to do (and not exactly what I am trying to do). For example how can I call Manager::run without consuming the manager, if I have to keep the context of the Actor in the 'Manager`?

I agree - if there is a control channel to the Actor, why does one need to have Actor as a member of Manager? Something is wrong with that design perhaps. But will refactor one at a time.

But that is a valuable suggestion indeed.

I go into more details in the talk associated with the blog post on this, but the general answer is that you should split your struct. One part of it should be owned exclusively by the actor background task. You can then have another part that is owned by the manager. Don't attempt to merge those two things into a single struct.