I found an interesting code, which use mpsc pattern to use rocksdb.
Roughly, I have these problems:
&mut self
in Store member function impl is not neccssary.- compared to direct use
db
instance in function impl, what is the pros and cons of mpsc pattern?
background:
Even with
SingleThreaded
, almost all of RocksDB operations is multi-threaded unless the underlying RocksDB instance is specifically configured otherwise.SingleThreaded
only forces serialization of column family alternations by requring&mut self
of DB instance due to its wrapper implementation details.
// Copyright(C) Facebook, Inc. and its affiliates.
use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::oneshot;
#[cfg(test)]
#[path = "tests/store_tests.rs"]
pub mod store_tests;
pub type StoreError = rocksdb::Error;
type StoreResult<T> = Result<T, StoreError>;
type Key = Vec<u8>;
type Value = Vec<u8>;
pub enum StoreCommand {
Write(Key, Value),
Read(Key, oneshot::Sender<StoreResult<Option<Value>>>),
NotifyRead(Key, oneshot::Sender<StoreResult<Value>>),
}
#[derive(Clone)]
pub struct Store {
channel: Sender<StoreCommand>,
}
impl Store {
pub fn new(path: &str) -> StoreResult<Self> {
let db = rocksdb::DB::open_default(path)?;
let mut obligations = HashMap::<_, VecDeque<oneshot::Sender<_>>>::new();
let (tx, mut rx) = channel(100);
tokio::spawn(async move {
while let Some(command) = rx.recv().await {
match command {
StoreCommand::Write(key, value) => {
let _ = db.put(&key, &value);
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(value.clone()));
}
}
}
StoreCommand::Read(key, sender) => {
let response = db.get(&key);
let _ = sender.send(response);
}
StoreCommand::NotifyRead(key, sender) => {
let response = db.get(&key);
match response {
Ok(None) => obligations
.entry(key)
.or_insert_with(VecDeque::new)
.push_back(sender),
_ => {
let _ = sender.send(response.map(|x| x.unwrap()));
}
}
}
}
}
});
Ok(Self { channel: tx })
}
pub async fn write(&mut self, key: Key, value: Value) {
if let Err(e) = self.channel.send(StoreCommand::Write(key, value)).await {
panic!("Failed to send Write command to store: {}", e);
}
}
pub async fn read(&mut self, key: Key) -> StoreResult<Option<Value>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self.channel.send(StoreCommand::Read(key, sender)).await {
panic!("Failed to send Read command to store: {}", e);
}
receiver
.await
.expect("Failed to receive reply to Read command from store")
}
pub async fn notify_read(&mut self, key: Key) -> StoreResult<Value> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::NotifyRead(key, sender))
.await
{
panic!("Failed to send NotifyRead command to store: {}", e);
}
receiver
.await
.expect("Failed to receive reply to NotifyRead command from store")
}
}