Sharing a resource through `Arc<Mutex<_>>` or `mpsc` and `oneshot` channels

The below Rust code implements a simple "database" that stores a single u32 which can be written to and read from asynchronously. In order to share access to this database between tasks, there are two DatabaseHandle implementations: one based on mpsc and oneshot channels and one based on Arc<Mutex<_>>. I would like to better understand the trade-offs between the two since they both provide the exact same interface (see the tests).

pub struct Database(u32);

impl Database {
    pub async fn write(&mut self, value: u32) {
        self.0 = value;
    }

    pub async fn read(&self) -> u32 {
        self.0
    }
}

pub mod mutex {
    use std::sync::Arc;
    use tokio::sync::Mutex;

    #[derive(Clone)]
    pub struct DatabaseHandle(Arc<Mutex<super::Database>>);

    impl DatabaseHandle {
        pub fn new(database: super::Database) -> Self {
            Self(Arc::new(Mutex::new(database)))
        }

        pub async fn write(&self, value: u32) {
            self.0.lock().await.write(value).await
        }

        pub async fn read(&self) -> u32 {
            self.0.lock().await.read().await
        }
    }
}

pub mod channel {
    use tokio::sync::mpsc;
    use tokio::sync::oneshot;

    enum Call {
        Write {
            value: u32,
            return_tx: oneshot::Sender<()>,
        },
        Read {
            return_tx: oneshot::Sender<u32>,
        },
    }

    #[derive(Clone)]
    pub struct DatabaseHandle {
        call_tx: mpsc::Sender<Call>,
    }

    impl DatabaseHandle {
        pub fn new(database: super::Database) -> Self {
            let (call_tx, call_rx) = mpsc::channel(1);
            tokio::spawn(task(database, call_rx));
            DatabaseHandle { call_tx }
        }

        pub async fn write(&self, value: u32) {
            let (return_tx, return_rx) = oneshot::channel();
            self.call_tx
                .send(Call::Write { value, return_tx })
                .await
                .unwrap();
            return_rx.await.unwrap()
        }

        pub async fn read(&self) -> u32 {
            let (return_tx, return_rx) = oneshot::channel();
            self.call_tx.send(Call::Read { return_tx }).await.unwrap();
            return_rx.await.unwrap()
        }
    }

    async fn task(mut database: super::Database, mut call_rx: mpsc::Receiver<Call>) {
        while let Some(message) = call_rx.recv().await {
            match message {
                Call::Write { value, return_tx } => {
                    #[allow(clippy::unit_arg)]
                    return_tx.send(database.write(value).await).unwrap();
                }
                Call::Read { return_tx } => {
                    #[allow(clippy::unit_arg)]
                    return_tx.send(database.read().await).unwrap();
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
            
    macro_rules! impl_handle_test {
        ($Handle:ty) => {
            let database = Database(0);
            let handle = <$Handle>::new(database);

            assert_eq!(handle.read().await, 0);

            tokio::spawn({
                let handle = handle.clone();
                async move {
                    handle.write(1).await;
                }
            })
            .await
            .unwrap();

            assert_eq!(handle.read().await, 1);
        };
    }

    #[tokio::test]
    async fn mutex_handle_works() {
        impl_handle_test! {mutex::DatabaseHandle}
    }
    
    #[tokio::test]
    async fn channel_handle_works() {
        impl_handle_test! {channel::DatabaseHandle}
    }
}

My gut feeling is that since the mpsc channel implementation also uses an Arc and a Mutex inside of the Semaphore type, there is no point in using channels (especially if the channel capacity is 1).

Aside: It would be nice if we could generate the channel implementation from the impl Database block.

Aside: The resource being called Database is just coïncidence. I needed a quickly understandable example to demonstrate the pattern I wanted to discuss.

The mpsc approach looks similar to my actors blog post, and I give an answer to this question in the recorded talk during the questions at the end.

Anyway, let me put some pros and cons here as well:

Actors:

  • With an actor, it is easier to take action if the list of pending operations becomes too large. (E.g., you can use a bounded channel with try_send.)
  • With an actor, it becomes possible to start operations without waiting for them to finish.
  • With an actor, it becomes possible to trigger actions without outside influence. For example, you could perform some action if there are no messages for 1 minute.
  • With an actor, it is easy to support multiple connections. For example, you could open 10 connections, to handle commands in parallel.
  • With an actor, you can use database implementations that don't support cancellation. For example, I recall that redis used to break if you cancel an operation while it's running. With an actor, it will continue running to completion, even if the handle is cancelled.

Mutex:

  • Actors force you to allocate for every operation, but mutexes do not.
  • When there's no contention, mutexes have lower latency.
  • Mutexes work better with non-static values.

In general, my rule of thumb is that actors are better for IO resources, and that mutexes are better for in-memory data structures.

That said, for the specific case of databases, you probably want a database pool instead. See e.g. the deadpool crate.

3 Likes

Very useful, thanks @alice.

I was wondering also about the possibility of deadlocks. Perhaps I should do some experiments myself to try and understand if there is a difference between the two methods. My intuition tells me that channels may be able to "delay" when a deadlock occurs (depending on channel capacity) but they do not fundamentally prevent the issue.

Indeed. I cover prevention of deadlocks in the blog post, and I go into more details on it in the talk.

1 Like

Right. However as far as I could tell it did not explicitly cover if there any fundamental differences between actor and arc-mutex solutions. I suppose Sender::try_send and Mutex::try_lock are in some sense similar with respect to preventing dead locks at run time.

Fundamentally, they are the same:

  • A bounded channel is just an unbounded channel together with a semaphore.
  • A mutex is just some data together with a semaphore.

Sure, the semaphores have different numbers of permits, but it turns out that theoretically the number of permits is almost always irrelevant to figuring out whether something can deadlock. (Usually it just affects how rare the deadlock is, not whether it's possible.)

1 Like

Here is an attempt at trying to collect the different trade-offs in an overview:

mpsc & oneshot arc & mutex
can deadlock yes yes
can check if would deadlock Sender::try_send Mutex::try_lock
does not require spawning a thread or task no yes
can do work on dedicated thread yes not directly
can omit waiting for result yes by spawning a task
requires little code no yes
can take parameters by reference no yes
can do self initiated work yes not directly

Maybe I'll take some time to demonstrate each row, and maybe that could be a good blog post.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.