Granular locking without the nedd for total locking

I have a data structure that stores some data that can get very big. Too big to want to hold it all in memory. So whenever the data structure gets too big, it offloads it's data to a log in the filesystem in append only in a line by line format of the data. Each new line is a new piece of data. Then it truncates itself to begin anew.

There are subscribers that would like to read this data. Old data that's been put into the filesystem and new data as it's stored into the data structure in-memory. Of course, there can be no overlapping/duplication of data sent to the subscribers.

These actions can happen concurrently within an async context. I've created some granular locking fences to allow an efficient reading of the data form both sources, filesystem log and in-memory fresh data.

struct DataStream {
    log: File,
    buffer: Vec<String>,
    subscribers: Vec<tokio::sync::broadcast::Sender<String>>,
    trunc_lock: tokio::sync::Mutex<()>,
    bytes_lock: tokio::sync::Mutex<()>,
}

impl DataStream {
    async fn sync_and_subscribe(&mut self, sender: tokio::sync::broadcast::Sender<String>) -> anyhow::Result<()> {
        self.trunc_lock.lock().await;

        let mut len = 0;
        {
            self.bytes_lock.lock().await;
            len = self.log.metadata().await.unwrap().len();
        }

        let mut reader = BufReader::new(&mut self.log).take(len);
        let mut lines = reader.lines();
        while let Some(line) = lines.next_line().await? {
            sender.send(line)?;
        }

        self.subscribers.push(sender);

        Ok(())
    }

    async fn push(&mut self, item: String) {
        if self.buffer.len() >= self.buffer.capacity() {
            {
                self.bytes_lock.lock().await;

                for item in &self.buffer {
                    self.log.write(item.to_bytes()).await;
                    self.log.write(b"\n").await;
                }

                self.log.flush().await;
                self.log.sync_data().await;
                self.log.sync_all().await;
            }

            self.trunc_lock.lock().await;
            self.buffer.clear();
        }

        self.buffer.push(item.clone());
        for subscriber in &self.subscribers {
            subscriber.send(item.clone());
        }
    }
}

When a subscriber calls sync_and_subscribe, it first acquires a trunc_lock. This prevents the DataStream from truncating itself for the remainder of the function.

It then needs to read the log file's memory len. First it acquires a bytes_lock so as to prevent any writing to the log file until it obtains the len. Then promptly releases this lock. This gives the subscriber caller a way to read the log file line by line and be sure that the total len it has is a complete data set of lines. No inbetween writing of incomplete writing of lines can occur.

Now the subscriber is free to read the data it got from the log file and push itself unto the subscriber list of the DataStream. Then it releases the trunc_lock.


For the push function, the DataStream needs to hold only a set capacity in-memory. When exceeding this capacity, it needs to offload to the file. So if we are at capacity, it acquires a `bytes_lock, then writes to the log all of its contents. Then releases the lock.

Before it can truncate itself, it needs to acquire the trunc_lock. Finally updates its list of subscribers.

Reading and writing from that log file can take a long time. If we get a new subscriber, that subscriber has to first read from that log file before it can receive any new data from the in-memory store. So that means the in-memory store can't truncate itself until the new subscriber finishes reading from the log to then finally obtain the in-memory data. While this is happening, we don't need to stop appending to the in-memory data structure. And we don't have wait to append to the log file with one caveat. Can't have incomplete lines of bytes written to then read by a new subscriber. So we use the bytes_lock at the critical point.


This works but there is now an issue. Time to share this DataStream

async fn accept_connection(......., stream: Arc<DataStream>) {
    let stream = Arc::clone(&stream);
    tokio::spawn(async move {
        ....
        ....
        stream.push(line).await;
     });
     ....
     ....
}

cannot borrow data in an Arc as mutable trait DerefMut is required to modify through a dereference, but it is not implemented for Arc<DataStream>.

So the compiler is going to make me wrap the whole DataStream in a Mutex Arc<Mutex<DataStream>>. This just defeated the entire granular locking scheme I setup. New subscribers reading from that massive log will now halt the entire show until they release this Mutex. I don't want to do this. What are my options?

While I don't quite understand the problem you're trying to solve, the first thing that comes to my mind is that you should probably use tokio::sync::mpsc between your producer task and the consumer (which is DataStream. This will unblock you, and then you can use tokio::sync::oneshot to notify the producer when the data has been synced/flushed. Does that make sense?

In other words, instead of passing stream: Arc<DataStream> to your accept_connection(), just give it an mpsc channel and push each line with a oneshot channel (which you can then await).

The tokio docs have some good examples on how to use oneshot for this kind of thing: tokio::sync::oneshot - Rust

One more thing: tokio also provides a Mutex (which is not actually a mutex), which is async and non-blocking: Mutex in tokio::sync - Rust

Thank you but I already know about channels and they don't help my goal.The quintessential feature of this code is that there be a backing filesystem memory store for data that overflow a set capacity. I've implemented a granular locking scheme to handle simultaneous subscribers to fresh and old data. The code I posted is long with the explanations, but it has to be looked at to understand my goal.

Your root problem is that buffer and log don't have any means of synchronizing data accesses. That means you need &mut self to mutate them. If they were inside locks of their own you could just make the methods take &self and you could just use Arc on the whole struct.

Whether or not just throwing more granular locks at the problem would deliver the performance characteristics you're looking for, I can't really say.

1 Like

Putting locks on both buffer and log would effectively do the same thing as putting one lock on the whole DataStream data structure. Eliminating the granular locking I've setup. Reading and writing from and to the log can take a long time. The granular locking has been setup so as to allow reading and writing to the log file. RwLock won't help here because we are writing to a file where each logical item of data is a line. If we allowed simultaneous reading and writing using only a Rwlock, then a reader can read bytes that are incomplete lines. Yet we can allow simultaneous reading and writing. The writing of lines just has to be atomic.

I am effectively accomplishing the same goal by using the bytes_lock and reading the len of bytes for a complete data set and using that len to read up to that point in the append only file before releasing the lock and allowing more bytes to be written. From there, simultaneous reading and writing can occur freely.

Two threads could both try to insert into buffer at the same time, which would produce a data race. It looks like you're trying to guard against the reallocation of the Vec with the check in push but I'm not sure if that check is sufficient to ensure the underlying memory never moves. Even if it is, you could still have two threads insert into the same slot in the Vec. You'd need some sort of concurrent data structure to make that safe.

Overall, the thing you're trying to do is going to require unsafe code. The compiler can't reason about how your Mutex<()> locks protect the log value.

Actually there will only ever be one thread pushing to the buffer. There are no locks preventing this, but I guarantee it by the very design of the program. It wouldn't make sense at all to ever do this now or in the future, so I allowed myself one less lock.

As a side note, is there a way to panic the program if there is ever concurrent write access to buffer? This is how serious I am that there never be concurrent write to buffer. This program can be thought of as a single producer, multi consumer queue with in-memory overflow protection to the filesystem.

It also seems like a like a data race could happen if one thread is writing to self.log while another thread is reading from it. The write is guarded by a lock but the read in sync_and_subscribe isn't because the lock goes out of scope after reading the metadata.

1 Like

Fundamentally, you have a problem with trying to read and write to the same File handle in parallel. It doesn't matter how fine your mutex is if the file itself is blocking concurrent reads at the OS level! (Tokio and other async IO just run blocking operations on a thread).

The fix is a bit silly, just open the file twice: once for reading and once for writing. Apparently the default options for File::create() and File::open() are OK here for allowing file sharing on Windows, even.

Then, the proximate issue is that you need to call methods on a &self receiver, punting the problem of mutating the fields. The simple option here is to wrap every field in a Mutex to access it, though you will get better performance with some of the other suggestions here.

All together, this looks like (with some other small fixes and cleanup):

use std::io::SeekFrom;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncBufReadExt, AsyncWriteExt, BufReader, AsyncSeekExt};
use tokio::sync::broadcast::Sender;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = std::sync::Arc::new(DataStream::new("output.log").await?);

    tokio::spawn({
        let stream = stream.clone();
        async move {
            stream.push("foo".into()).await;

            stream.flush().await;
        }
    }).await?;

    Ok(())
}

struct DataStream {
    log_write: Mutex<File>,
    log_read: Mutex<File>,
    buffer: Mutex<Vec<String>>,
    subscribers: Mutex<Vec<Sender<String>>>,
}

impl DataStream {
    async fn new(path: &str) -> anyhow::Result<Self> {
        let log_write = File::create(path).await?;
        let log_read = File::open(path).await?;

        Ok(Self {
            log_write: Mutex::new(log_write),
            log_read: Mutex::new(log_read),
            buffer: Mutex::new(Vec::new()),
            subscribers: Mutex::new(Vec::new()),
        })
    }

    async fn sync_and_subscribe(&self, sender: Sender<String>) -> anyhow::Result<()> {
        // use log_write lock here to avoid splitting a line
        let len = self.log_write.lock().await.metadata().await?.len();

        let mut log = self.log_read.lock().await;
        log.seek(SeekFrom::Start(0)).await?;
        let mut reader = BufReader::new(&mut *log).take(len);
        let mut lines = reader.lines();
        while let Some(line) = lines.next_line().await? {
            sender.send(line)?;
        }

        self.subscribers.lock().await.push(sender);

        Ok(())
    }

    async fn push(&self, item: String) {
        let mut buffer = self.buffer.lock().await;
        if buffer.len() >= buffer.capacity() {
            // Don't hold lock on buffer while we are dumping the log file
            drop(buffer);

            self.flush();

            buffer = self.buffer.lock().await;
            buffer.clear();
        }

        buffer.push(item.clone());
        drop(buffer);

        for subscriber in &*self.subscribers.lock().await {
            subscriber.send(item.clone());
        }
    }

    async fn flush(&self) {
        let mut log = self.log_write.lock().await;

        for item in &*self.buffer.lock().await {
            log.write(item.as_bytes()).await;
            log.write(b"\n").await;
        }

        log.flush().await;
        log.sync_data().await;
        log.sync_all().await;
    }
}

No valid data race can occur as far as I am aware. What you are describing is not a data race per my requirements. self.log describes an append only file. We acquire a lock to read how many bytes are in the file. Lets say 1024 bytes. The lock is then released. ThreadA reads from 0 bytes to 1023. During this read operation, ThreadB writes to the append only file. Now it has 1025 bytes. No data race has occured. ThreadA does not care. It is still going to get the same 0 to 1023 bytes.

Mostly in that you have to open the file twice, rather than e.g. just being able to get two pointers into the file by duplicating the handle (which will instead end up still sharing the pointer in both handles since they are still for the same kernel object).

I would probably instead use the read/write_at (or seek_read/write in Windows) extension methods for std File, but that's a bit much to get into for this level of discussion.

I think the logic there is that you can get independent pointers by just opening file twice while creating shared file pointer needs support from kernel.

And that's for pointing out that using dup is dangerous since pointers would still be shared.

On Linux you can just open /proc/self/fd/<N> to get independent pointers.

Others' have written with various degrees of courtesy, but I'll put my quick comments:

  1. &mut self is basically a compiler enforce mutex-like-lock around the function. Until you can rewrite to just have &self, this is serialized.
  2. The OS doesn't guarantee serialization when multiple threads write/append to a file.. An OS may, for example, allow a 4096B buffer to write (atomicly) but then interleave it 4k at a time with other threads/processes. When you write in a buffered mode, of course then this is happening at the library level (not the OS level), so the problem can be even worse. Either you should use positionally independent and zero padded block aligned writes (e.g. on 512byte boundaries with exact multiples of zero padded 512 memory; or even better 4096). Or use a single writer thread (Actor model). So 1 thread writes, but it accepts write requests from an async queue/channel
  3. As others' have said, in rust, it's probably best to just dup the file handle (in java, i typically do pread/pwrite via hundreds of threads to the same file handle, but I haven't found how to do that in Rust yet - think it's a least-common-demonitor issue). In fact, if you want N writers, you should have N dups
  4. for parallel reads from file; you might want to use a sharded lock bucket. Namely, have an array of N-cpus, then mutex the read-only file handle (dupped as in 3) to perform your streaming read. I'm sure you can find a 3rd party library to get around this; just not completely sure if std:: supports anything else. Vec<Mutex<MyFileStruct>>
  5. Lots of little writes is less efficient than one big buffered write. So either using a mutex streamed write buffer (which flushes to OS when full), or using a dedicated actor thread (as in 2). (not following your code well enough to know how much of this you are already doing)
  6. Personally I shy away from the async model for stuff like this.. I'm not comfortable with the latency implications (you can have a thread-task starved 20x longer than new entrants for example).. This is in contrast to producing a handful of dedicated threads performing channel/queued instructions; such that a FIFO operation is all but guanranteed. In general a mutex around a critical section is faster than an actor thread with communication channels, but for IO, that's not necessarily true. Most of the BIG databases use a handful of dedicated threads for all IO (as having 2,000 async IO requests in the kernel isn't necessarily a good thing - maybe with nvme it's better; but certainly not with HDD).
  7. I think tokio async IO is excellent for socket and full-file read/write operations. But in such a situation this sort of synchronization isn't necessary. So it's sort of out of my comfort zone to know what's best.
  8. this does seem a lot like kafka - you might find some well crafted libraries to take the guess work out of it.
1 Like

I may be wrong, and we may also be past this at this point, but doesn't this entire setup violate the aliasing rules? Two different threads would both need a &mut self to the DataStream to call sync_and_subscribe and push simultaneously, but &mut self must be unique otherwise it's UB.

There's some good points here but I'd like to note at least:

  • The file is append only, and the lock (in theory, it wasn't held in OP's code) is guarding the write (including a full flush) and reading the length, so there's not going to be any torn reads, it other races... on the file data
  • You can't just dup the file handle: that just gives you two handles to the same kernel object, which is explicitly called out as meaning the pointer is shared too. If you don't use the file pointer with pread (called read_at or seek_read depending on platform in Rust), then you can use the same handle.
  • Sharding the reads seems like overkill here, would need careful handling to get full lines, and might not even be faster due to prefetch, overhead, and sender back pressure.
  • It would be pretty easy to buffer the writes just by swapping the buffer to a single String rather than a Vec.
1 Like

Others' have written with various degrees of courtesy

I wouldn't call this

Are you even reading what people are writing? I'll try to explain again.

Because currently you show so little understanding of what you are dealing with it's not even funny. Like a guy who was always riding a bus and have now decided he have enough skills to sit in the driver's place and go on the highway. Because that would be faster.

any form of courtesy I would recognize.

The OS doesn't guarantee serialization when multiple threads write/append to a file.. An OS may, for example, allow a 4096B buffer to write (atomicly) but then interleave it 4k at a time with other threads/processes. When you write in a buffered mode, of course then this is happening at the library level (not the OS level), so the problem can be even worse. Either you should use positionally independent and zero padded block aligned writes (e.g. on 512byte boundaries with exact multiples of zero padded 512 memory; or even better 4096). Or use a single writer thread (Actor model). So 1 thread writes, but it accepts write requests from an async queue/channel

Does this apply to the situation where there is only a single writer? If only a single thread is ever writing then flushing? I've stated that's the case.

  • for parallel reads from file; you might want to use a sharded lock bucket. Namely, have an array of N-cpus, then mutex the read-only file handle (dupped as in 3) to perform your streaming read. I'm sure you can find a 3rd party library to get around this; just not completely sure if std:: supports anything else. Vec<Mutex<MyFileStruct>>

For efficiency or correctness? I view an instance of DataStream as the one path where readers need to contend for the read locks, namely bytes_lock and `trunc_lock.

You make other points that seem good but I don't know if you're addressing optimization or correctness. I want to make sure correctness is at least achieved with the granular locking I'm doing first before delving in platform specific stuff with accessing the filesystem.

Kafka or actually just Redis would accomplish my goal. But I'm not trying to build a product as fast as possible, right now. Just messing around with Rust in my free time.

Hey folks, the forums are a safe place for ideas and exploration. Criticizing people instead of their ideas makes this place less safe for everyone and discourages the sharing of ideas in the future.

Please be kind to each other.

Okay so far this is what I've come up with

struct DataStream<const S: usize> {
    log: String,
    buffer: [String; S],
    head: usize,
    trunc_lock: tokio::sync::RwLock<()>,
    bytes_lock: tokio::sync::RwLock<()>,
    sender: tokio::sync::broadcast::Sender<String>,
}

impl<const S: usize> DataStream<S> {
    async fn subscribe(&self) -> anyhow::Result<tokio::sync::broadcast::Receiver<String>> {
        self.trunc_lock.read().await;

        let mut len = 0;
        {
            let mut file = tokio::fs::File::open(self.log.as_str()).await?;
            self.bytes_lock.read().await;
            len = file.metadata().await.unwrap().len();
        }

        let file_sender = self.sender.clone();
        let mut file = tokio::fs::File::open(self.log.as_str()).await?;
        tokio::spawn(async move {
            let mut reader = BufReader::new(&mut file).take(len);
            let mut lines = reader.lines();
            while let Some(line) = lines.next_line().await.unwrap() {
                file_sender.send(line);
            }

        });

        Ok(self.sender.subscribe())
    }

    async fn push(&mut self, item: String) -> anyhow::Result<()> {
        assert!(self.head <= self.buffer.len());
        if self.head == self.buffer.len() {
            {
                self.bytes_lock.write().await;

                let mut file = tokio::fs::File::open(self.log.as_str()).await?;
                for item in &self.buffer[0..self.head] {
                    file.write(item.to_bytes()).await;
                    file.write(b"\n").await;
                }

                file.flush().await;
                file.sync_data().await;
                file.sync_all().await;
            }

            self.trunc_lock.write().await;
            self.head = 0;
        }

        self.buffer[self.head] = item.clone();
        self.head += 1;
        self.sender.send(item);

        Ok(())
    }
}

I took the advise to find some way to make subscribe() take &self. I'm leveraging tokio's multi-producer, multi-consumer channel.

I can now wrap the granular locks in proper RwLock, because that's how I intend to use DataStream. Only one thread will ever call push(), multiple threads can call subscribe().

I think I can now focus on the FileSystem caveats that are being brought up. So I don't want the same file pointer being concurrently accessed? I assume the reason read functions are &mut self is because reads are reads only linguistically? Semantically someone is tracing and updating a concurrently accessed index or something that allows a file pointer to advance through a file's bytes length or seek or whatever?

Right now I'm just.. opening the damn file each time I need to use it. What's the better way?

Did you see my earlier post with code?

I'm not saying it's perfect or heck even works properly (deadlocks are always a risk with several mutexes), but it addresses that point and several more subtle details. You could combine it with feeding buffers via mpsc to reduce the number of mutexes too.