Why does tokio::sync::RwLock::try_write_owned consume the Arc?

I noticed that tokio::sync::RwLock has a method try_write_owned, which operates on Arc<Self>. But since this method consumes the Arc, I need to clone it just to "try" locking it, even if it can't be locked by the method. (At least if I want to retain the Arc<RwLock> and not lose it.) Wouldn't it be better if this method worked on &Arc<Self> instead of Arc<Self> and performs the clone internally and only in case of successfully locking the RwLock?

I suppose it’s a trade-off. If your use case would be that you already own an Arc, no longer need it anymore after the call to try_write_owned and don’t want to try multiple times either in case of failure, then try_write_owned taking the Arc by-value can avoid an additional cloning step that would otherwise be necessary if it accepted &Arc instead.


I mean, changing anything about the method now seems hard in terms of API stability. Especially since it uses the same error type as the non-_owned variants of the try_-methods, so it’s not even possible to neatly fit the Arc into the error value, which would’ve otherwise been a possibly nice approach.


Looking through the API, perhaps it’s possible to offer a method for converting the non-owned RwLockWriteGuard into an OwnedRwLockWriteGuard by providing an Arc<RwLock<…>> to the same underlying RwLock (which would panic at run-time if an Arc to a different lock was provided). This way, one could obtain an OwnedRwLockWriteGuard using try_write-method-calls that don’t need the Arc-cloning step for every call.


Even if the existing API could be changed arbitrarily without breaking tokio 1.0, note that:

  • IMO Arc<Self> is the correct type for write_owned since that method will always eventually need ownership of an Arc, so taking it owned can avoid an unnecessary .clone() in case the Arc isn’t needed anymore on the caller side; and additionally, by taking an owned Arc, the future itself for the locking operation can be 'static, too, without needing to be wrapped into an async block.
  • So then, the Arc<Self> does have a small advantage for try_write_owned, too, in that it makes the API look “more consistent”.

On an unrelated note, I’d be curious how often usage of try_write_owned comes up in practice in the first place. It’s probably less common than either of try_write or write_owned. If you came across this because you have a use-case for it, I’m somewhat interested as to what such a practical use-case looks like. (It’s not that I cannot come up with any good use-case at all myself, but I’m having a hard time imagining what a typical/natural use case might look like, right now.)

2 Likes

Ah, I was missing that. It makes sense!

Yes, I agree on that. I was particularly talking about the try_ case.

I experimented with re-using buffers for a software defined radio application. The idea was to try to use an existing buffer if no reader holds a lock on it (and otherwise allocate a new buffer). But I finally decided RwLock isn't the best thing to use. So my use-case isn't a use-case anymore :wink:

Instead, I'll be trying something like this:

type Buffer = Vec<Complex<f32>>;

#[derive(Clone, Debug)]
struct ReadGuard {
    buffer: Option<Arc<Buffer>>,
    count: usize,
    recycler: mpsc::UnboundedSender<Buffer>,
}

impl Drop for ReadGuard {
    fn drop(&mut self) {
        if let Ok(buffer) = Arc::try_unwrap(self.buffer.take().unwrap()) {
            let _ = self.recycler.send(buffer);
        }
    }
}

impl Deref for ReadGuard {
    type Target = [Complex<f32>];
    fn deref(&self) -> &Self::Target {
        &self.buffer.as_ref().unwrap()[0..self.count]
    }
}
Full example (unfinished / work in progress)
[dependencies]
soapysdr = "0.3.2"
num-complex = "0.4.2"
num-traits = "0.2.11"
tokio = { version = "1", features = ["full"] }
use num_complex::Complex;
use soapysdr::{self, Direction::Rx};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::task::{spawn, spawn_blocking};

type Buffer = Vec<Complex<f32>>;

#[derive(Clone, Debug)]
struct ReadGuard {
    buffer: Option<Arc<Buffer>>,
    count: usize,
    recycler: mpsc::UnboundedSender<Buffer>,
}

impl Drop for ReadGuard {
    fn drop(&mut self) {
        if let Ok(buffer) = Arc::try_unwrap(self.buffer.take().unwrap()) {
            let _ = self.recycler.send(buffer);
        }
    }
}

impl Deref for ReadGuard {
    type Target = [Complex<f32>];
    fn deref(&self) -> &Self::Target {
        &self.buffer.as_ref().unwrap()[0..self.count]
    }
}

#[tokio::main]
async fn main() {
    let dev = soapysdr::Device::new("").unwrap();
    dev.set_frequency(Rx, 0, 100e6, "").unwrap();
    dev.set_sample_rate(Rx, 0, 1024000.0).unwrap();
    dev.set_bandwidth(Rx, 0, 1024000.0).unwrap();
    let mut rx = dev.rx_stream::<Complex<f32>>(&[0]).unwrap();
    rx.activate(None).unwrap();
    let (send, mut recv) = broadcast::channel::<ReadGuard>(16);
    let (recycler, mut dispenser) = mpsc::unbounded_channel::<Buffer>();
    spawn(async move {
        let file = File::create("output.raw").unwrap();
        let mut writer = BufWriter::new(file);
        loop {
            let block = recv.recv().await.unwrap();
            for sample in &*block {
                writer.write_all(&sample.re.to_ne_bytes()).unwrap();
                writer.write_all(&sample.im.to_ne_bytes()).unwrap();
            }
        }
    });
    spawn_blocking(move || loop {
        let mut buffer: Buffer = dispenser.try_recv().unwrap_or_else(|_| {
            println!("Create new buffer");
            vec![Complex::<f32>::default(); rx.mtu().unwrap()]
        });
        let count = rx.read(&[&mut buffer], 1000000).unwrap();
        if send
            .send(ReadGuard {
                buffer: Some(Arc::new(buffer)),
                count,
                recycler: recycler.clone(),
            })
            .is_err()
        {
            panic!("could not send data");
        }
        println!("sent {count}");
    })
    .await
    .unwrap();
}

The idea is that ReadGuard can be cloned, and when the last ReadGuard is dropped, the buffer will be sent to recycler.

Side question: Do you know, by any chance, how to get rid of the Option? I can't move out of buffer because the drop method doesn't own self but only has a mutable reference to it. Figured it out: I can use std::mem::take in that case, because Vec implements Default and there is a blanket implementation of Default for Arc as well.


P.S.: There are some other flaws in the example code as well. For example, I don't need to store a count; I could do that by shortening the Vec.

Updated example (but still unfinished / work in progress)
[dependencies]
soapysdr = "0.3.2"
num-complex = "0.4.2"
num-traits = "0.2.11"
tokio = { version = "1", features = ["full"] }
use num_complex::Complex;
use soapysdr::{self, Direction::Rx};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::mem::{replace, take};
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::task::{spawn, spawn_blocking};

type Sample = Complex<f32>;

#[derive(Clone, Debug)]
struct BufGuard<T> {
    buffer: Arc<Vec<T>>,
    recycler: mpsc::UnboundedSender<Vec<T>>,
}

impl<T> Drop for BufGuard<T> {
    fn drop(&mut self) {
        if let Ok(buffer) = Arc::try_unwrap(take(&mut self.buffer)) {
            let _ = self.recycler.send(buffer);
        }
    }
}

impl<T> Deref for BufGuard<T> {
    type Target = [T];
    fn deref(&self) -> &Self::Target {
        &self.buffer
    }
}

struct BufPool<T> {
    recycler: mpsc::UnboundedSender<Vec<T>>,
    dispenser: mpsc::UnboundedReceiver<Vec<T>>,
    current: Vec<T>,
}

impl<T> BufPool<T> {
    fn new() -> Self {
        let (recycler, dispenser) = mpsc::unbounded_channel::<Vec<T>>();
        Self {
            recycler,
            dispenser,
            current: Vec::new(),
        }
    }
    fn current(&mut self) -> &mut Vec<T> {
        &mut self.current
    }
    fn finish(&mut self) -> BufGuard<T> {
        let replacement = match self.dispenser.try_recv() {
            Ok(mut buffer) => {
                buffer.clear();
                buffer
            }
            Err(_) => {
                println!("Create new buffer");
                Vec::new()
            }
        };
        BufGuard {
            buffer: Arc::new(replace(&mut self.current, replacement)),
            recycler: self.recycler.clone(),
        }
    }
}

#[tokio::main]
async fn main() {
    let dev = soapysdr::Device::new("").unwrap();
    dev.set_frequency(Rx, 0, 100e6, "").unwrap();
    dev.set_sample_rate(Rx, 0, 1024000.0).unwrap();
    dev.set_bandwidth(Rx, 0, 1024000.0).unwrap();
    let mut rx = dev.rx_stream::<Sample>(&[0]).unwrap();
    rx.activate(None).unwrap();
    let (rx_rf_send, mut rx_rf_recv) = broadcast::channel::<BufGuard<Sample>>(16);
    spawn(async move {
        let file = File::create("output.raw").unwrap();
        let mut writer = BufWriter::new(file);
        loop {
            let block = rx_rf_recv.recv().await.unwrap();
            for sample in &*block {
                writer.write_all(&sample.re.to_ne_bytes()).unwrap();
                writer.write_all(&sample.im.to_ne_bytes()).unwrap();
            }
        }
    });
    spawn_blocking(move || {
        let mut buf_pool = BufPool::<Sample>::new();
        let mtu = rx.mtu().unwrap();
        loop {
            let mut buffer = buf_pool.current();
            buffer.resize_with(mtu, Default::default);
            let count = rx.read(&[&mut buffer], 1000000).unwrap();
            buffer.truncate(count);
            if rx_rf_send.send(buf_pool.finish()).is_err() {
                panic!("could not send data because all receivers died");
            }
            println!("sent {count} samples");
        }
    })
    .await
    .unwrap();
}

Taking self: &Arc<Self> is not possible on Tokio's minimum supported Rust version.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.