Granular locking without the nedd for total locking

Fundamentally, you have a problem with trying to read and write to the same File handle in parallel. It doesn't matter how fine your mutex is if the file itself is blocking concurrent reads at the OS level! (Tokio and other async IO just run blocking operations on a thread).

The fix is a bit silly, just open the file twice: once for reading and once for writing. Apparently the default options for File::create() and File::open() are OK here for allowing file sharing on Windows, even.

Then, the proximate issue is that you need to call methods on a &self receiver, punting the problem of mutating the fields. The simple option here is to wrap every field in a Mutex to access it, though you will get better performance with some of the other suggestions here.

All together, this looks like (with some other small fixes and cleanup):

use std::io::SeekFrom;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncBufReadExt, AsyncWriteExt, BufReader, AsyncSeekExt};
use tokio::sync::broadcast::Sender;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = std::sync::Arc::new(DataStream::new("output.log").await?);

    tokio::spawn({
        let stream = stream.clone();
        async move {
            stream.push("foo".into()).await;

            stream.flush().await;
        }
    }).await?;

    Ok(())
}

struct DataStream {
    log_write: Mutex<File>,
    log_read: Mutex<File>,
    buffer: Mutex<Vec<String>>,
    subscribers: Mutex<Vec<Sender<String>>>,
}

impl DataStream {
    async fn new(path: &str) -> anyhow::Result<Self> {
        let log_write = File::create(path).await?;
        let log_read = File::open(path).await?;

        Ok(Self {
            log_write: Mutex::new(log_write),
            log_read: Mutex::new(log_read),
            buffer: Mutex::new(Vec::new()),
            subscribers: Mutex::new(Vec::new()),
        })
    }

    async fn sync_and_subscribe(&self, sender: Sender<String>) -> anyhow::Result<()> {
        // use log_write lock here to avoid splitting a line
        let len = self.log_write.lock().await.metadata().await?.len();

        let mut log = self.log_read.lock().await;
        log.seek(SeekFrom::Start(0)).await?;
        let mut reader = BufReader::new(&mut *log).take(len);
        let mut lines = reader.lines();
        while let Some(line) = lines.next_line().await? {
            sender.send(line)?;
        }

        self.subscribers.lock().await.push(sender);

        Ok(())
    }

    async fn push(&self, item: String) {
        let mut buffer = self.buffer.lock().await;
        if buffer.len() >= buffer.capacity() {
            // Don't hold lock on buffer while we are dumping the log file
            drop(buffer);

            self.flush();

            buffer = self.buffer.lock().await;
            buffer.clear();
        }

        buffer.push(item.clone());
        drop(buffer);

        for subscriber in &*self.subscribers.lock().await {
            subscriber.send(item.clone());
        }
    }

    async fn flush(&self) {
        let mut log = self.log_write.lock().await;

        for item in &*self.buffer.lock().await {
            log.write(item.as_bytes()).await;
            log.write(b"\n").await;
        }

        log.flush().await;
        log.sync_data().await;
        log.sync_all().await;
    }
}