Mpsc pattern vs direct invoke

I found an interesting code, which use mpsc pattern to use rocksdb.
Roughly, I have these problems:

  1. &mut self in Store member function impl is not neccssary.
  2. compared to direct use db instance in function impl, what is the pros and cons of mpsc pattern?

background:

Even with SingleThreaded, almost all of RocksDB operations is multi-threaded unless the underlying RocksDB instance is specifically configured otherwise. SingleThreaded only forces serialization of column family alternations by requring &mut self of DB instance due to its wrapper implementation details.

// Copyright(C) Facebook, Inc. and its affiliates.
use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::oneshot;

#[cfg(test)]
#[path = "tests/store_tests.rs"]
pub mod store_tests;

pub type StoreError = rocksdb::Error;
type StoreResult<T> = Result<T, StoreError>;

type Key = Vec<u8>;
type Value = Vec<u8>;

pub enum StoreCommand {
    Write(Key, Value),
    Read(Key, oneshot::Sender<StoreResult<Option<Value>>>),
    NotifyRead(Key, oneshot::Sender<StoreResult<Value>>),
}

#[derive(Clone)]
pub struct Store {
    channel: Sender<StoreCommand>,
}

impl Store {
    pub fn new(path: &str) -> StoreResult<Self> {
        let db = rocksdb::DB::open_default(path)?;
        let mut obligations = HashMap::<_, VecDeque<oneshot::Sender<_>>>::new();
        let (tx, mut rx) = channel(100);
        tokio::spawn(async move {
            while let Some(command) = rx.recv().await {
                match command {
                    StoreCommand::Write(key, value) => {
                        let _ = db.put(&key, &value);
                        if let Some(mut senders) = obligations.remove(&key) {
                            while let Some(s) = senders.pop_front() {
                                let _ = s.send(Ok(value.clone()));
                            }
                        }
                    }
                    StoreCommand::Read(key, sender) => {
                        let response = db.get(&key);
                        let _ = sender.send(response);
                    }
                    StoreCommand::NotifyRead(key, sender) => {
                        let response = db.get(&key);
                        match response {
                            Ok(None) => obligations
                                .entry(key)
                                .or_insert_with(VecDeque::new)
                                .push_back(sender),
                            _ => {
                                let _ = sender.send(response.map(|x| x.unwrap()));
                            }
                        }
                    }
                }
            }
        });
        Ok(Self { channel: tx })
    }

    pub async fn write(&mut self, key: Key, value: Value) {
        if let Err(e) = self.channel.send(StoreCommand::Write(key, value)).await {
            panic!("Failed to send Write command to store: {}", e);
        }
    }

    pub async fn read(&mut self, key: Key) -> StoreResult<Option<Value>> {
        let (sender, receiver) = oneshot::channel();
        if let Err(e) = self.channel.send(StoreCommand::Read(key, sender)).await {
            panic!("Failed to send Read command to store: {}", e);
        }
        receiver
            .await
            .expect("Failed to receive reply to Read command from store")
    }

    pub async fn notify_read(&mut self, key: Key) -> StoreResult<Value> {
        let (sender, receiver) = oneshot::channel();
        if let Err(e) = self
            .channel
            .send(StoreCommand::NotifyRead(key, sender))
            .await
        {
            panic!("Failed to send NotifyRead command to store: {}", e);
        }
        receiver
            .await
            .expect("Failed to receive reply to NotifyRead command from store")
    }
}

The official name for the mpsc pattern you are seeing is the Actor Model.

The general idea is to create some sort of self-contained object and make sure the only way the outside world can modify its state is by sending messages to this object. Dong things this way gives the actor full control over how it is manipulated and makes things like asynchrony and concurrency really easy to do. You can send a message and can go to sleep until it is delivered, and it's trivial to give multiple components the ability to talk to the actor by just cloning the channel's Sender.

You also get nice things like backpressure for free. For example, if the actor can't handle messages fast enough, a fixed number of messages will be buffered before sends block. That force the message producer to either slow down or shed load, preventing you from overloading the rest of the system. You see this pattern all the time in high performance applications - buffer a bit so you can deal with spikes in load, then start rejecting new work until you've got capacity again.

The downside is that sending messages requires you to create an explicit message type for each operation and then implement some sort of handler for that message - something that can get pretty tedious if your object has a large public API (i.e. it wants to support lots of types of messages). It also means you aren't guaranteed that the database is still open (e.g. maybe the actor triggered a panic and the receiving channel was closed when the actor died) and the async-by-default nature of actors can be a pain to work with.

It's totally possible to use something like a Arc<Mutex<...>> to give multiple components access to the same database and then call methods on it, but that comes with tradeoffs of its own. For example,
now multiple components are able to modify the database so it's harder to figure out when something was changed or what sequence of calls triggers an edge case. It also makes the ownership story of your app a bit more complex.

2 Likes

This blog post discusses the actor model in the context of Rust: Actors with Tokio – Alice Ryhl

4 Likes