I have a data structure that stores some data that can get very big. Too big to want to hold it all in memory. So whenever the data structure gets too big, it offloads it's data to a log in the filesystem in append only in a line by line format of the data. Each new line is a new piece of data. Then it truncates itself to begin anew.
There are subscribers that would like to read this data. Old data that's been put into the filesystem and new data as it's stored into the data structure in-memory. Of course, there can be no overlapping/duplication of data sent to the subscribers.
These actions can happen concurrently within an async context. I've created some granular locking fences to allow an efficient reading of the data form both sources, filesystem log and in-memory fresh data.
struct DataStream {
log: File,
buffer: Vec<String>,
subscribers: Vec<tokio::sync::broadcast::Sender<String>>,
trunc_lock: tokio::sync::Mutex<()>,
bytes_lock: tokio::sync::Mutex<()>,
}
impl DataStream {
async fn sync_and_subscribe(&mut self, sender: tokio::sync::broadcast::Sender<String>) -> anyhow::Result<()> {
self.trunc_lock.lock().await;
let mut len = 0;
{
self.bytes_lock.lock().await;
len = self.log.metadata().await.unwrap().len();
}
let mut reader = BufReader::new(&mut self.log).take(len);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
sender.send(line)?;
}
self.subscribers.push(sender);
Ok(())
}
async fn push(&mut self, item: String) {
if self.buffer.len() >= self.buffer.capacity() {
{
self.bytes_lock.lock().await;
for item in &self.buffer {
self.log.write(item.to_bytes()).await;
self.log.write(b"\n").await;
}
self.log.flush().await;
self.log.sync_data().await;
self.log.sync_all().await;
}
self.trunc_lock.lock().await;
self.buffer.clear();
}
self.buffer.push(item.clone());
for subscriber in &self.subscribers {
subscriber.send(item.clone());
}
}
}
When a subscriber calls sync_and_subscribe
, it first acquires a trunc_lock
. This prevents the DataStream from truncating itself for the remainder of the function.
It then needs to read the log file's memory len. First it acquires a bytes_lock
so as to prevent any writing to the log file until it obtains the len. Then promptly releases this lock. This gives the subscriber caller a way to read the log file line by line and be sure that the total len it has is a complete data set of lines. No inbetween writing of incomplete writing of lines can occur.
Now the subscriber is free to read the data it got from the log file and push itself unto the subscriber list of the DataStream. Then it releases the trunc_lock
.
For the push function, the DataStream needs to hold only a set capacity in-memory. When exceeding this capacity, it needs to offload to the file. So if we are at capacity, it acquires a `bytes_lock, then writes to the log all of its contents. Then releases the lock.
Before it can truncate itself, it needs to acquire the trunc_lock
. Finally updates its list of subscribers.
Reading and writing from that log file can take a long time. If we get a new subscriber, that subscriber has to first read from that log file before it can receive any new data from the in-memory store. So that means the in-memory store can't truncate itself until the new subscriber finishes reading from the log to then finally obtain the in-memory data. While this is happening, we don't need to stop appending to the in-memory data structure. And we don't have wait to append to the log file with one caveat. Can't have incomplete lines of bytes written to then read by a new subscriber. So we use the bytes_lock
at the critical point.
This works but there is now an issue. Time to share this DataStream
async fn accept_connection(......., stream: Arc<DataStream>) {
let stream = Arc::clone(&stream);
tokio::spawn(async move {
....
....
stream.push(line).await;
});
....
....
}
cannot borrow data in an Arc
as mutable trait DerefMut
is required to modify through a dereference, but it is not implemented for Arc<DataStream>
.
So the compiler is going to make me wrap the whole DataStream in a Mutex Arc<Mutex<DataStream>>
. This just defeated the entire granular locking scheme I setup. New subscribers reading from that massive log will now halt the entire show until they release this Mutex. I don't want to do this. What are my options?