The below Rust code implements a simple "database" that stores a single u32
which can be written to and read from asynchronously. In order to share access to this database between tasks, there are two DatabaseHandle
implementations: one based on mpsc
and oneshot
channels and one based on Arc<Mutex<_>>
. I would like to better understand the trade-offs between the two since they both provide the exact same interface (see the tests).
pub struct Database(u32);
impl Database {
pub async fn write(&mut self, value: u32) {
self.0 = value;
}
pub async fn read(&self) -> u32 {
self.0
}
}
pub mod mutex {
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct DatabaseHandle(Arc<Mutex<super::Database>>);
impl DatabaseHandle {
pub fn new(database: super::Database) -> Self {
Self(Arc::new(Mutex::new(database)))
}
pub async fn write(&self, value: u32) {
self.0.lock().await.write(value).await
}
pub async fn read(&self) -> u32 {
self.0.lock().await.read().await
}
}
}
pub mod channel {
use tokio::sync::mpsc;
use tokio::sync::oneshot;
enum Call {
Write {
value: u32,
return_tx: oneshot::Sender<()>,
},
Read {
return_tx: oneshot::Sender<u32>,
},
}
#[derive(Clone)]
pub struct DatabaseHandle {
call_tx: mpsc::Sender<Call>,
}
impl DatabaseHandle {
pub fn new(database: super::Database) -> Self {
let (call_tx, call_rx) = mpsc::channel(1);
tokio::spawn(task(database, call_rx));
DatabaseHandle { call_tx }
}
pub async fn write(&self, value: u32) {
let (return_tx, return_rx) = oneshot::channel();
self.call_tx
.send(Call::Write { value, return_tx })
.await
.unwrap();
return_rx.await.unwrap()
}
pub async fn read(&self) -> u32 {
let (return_tx, return_rx) = oneshot::channel();
self.call_tx.send(Call::Read { return_tx }).await.unwrap();
return_rx.await.unwrap()
}
}
async fn task(mut database: super::Database, mut call_rx: mpsc::Receiver<Call>) {
while let Some(message) = call_rx.recv().await {
match message {
Call::Write { value, return_tx } => {
#[allow(clippy::unit_arg)]
return_tx.send(database.write(value).await).unwrap();
}
Call::Read { return_tx } => {
#[allow(clippy::unit_arg)]
return_tx.send(database.read().await).unwrap();
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! impl_handle_test {
($Handle:ty) => {
let database = Database(0);
let handle = <$Handle>::new(database);
assert_eq!(handle.read().await, 0);
tokio::spawn({
let handle = handle.clone();
async move {
handle.write(1).await;
}
})
.await
.unwrap();
assert_eq!(handle.read().await, 1);
};
}
#[tokio::test]
async fn mutex_handle_works() {
impl_handle_test! {mutex::DatabaseHandle}
}
#[tokio::test]
async fn channel_handle_works() {
impl_handle_test! {channel::DatabaseHandle}
}
}
My gut feeling is that since the mpsc
channel implementation also uses an Arc
and a Mutex
inside of the Semaphore
type, there is no point in using channels (especially if the channel capacity is 1).
Aside: It would be nice if we could generate the channel implementation from the
impl Database
block.
Aside: The resource being called
Database
is just coïncidence. I needed a quickly understandable example to demonstrate the pattern I wanted to discuss.