I noticed that tokio::sync::RwLock
has a method try_write_owned
, which operates on Arc<Self>
. But since this method consumes the Arc
, I need to clone it just to "try" locking it, even if it can't be locked by the method. (At least if I want to retain the Arc<RwLock>
and not lose it.) Wouldn't it be better if this method worked on &Arc<Self>
instead of Arc<Self>
and performs the clone internally and only in case of successfully locking the RwLock
?
I suppose it’s a trade-off. If your use case would be that you already own an Arc
, no longer need it anymore after the call to try_write_owned
and don’t want to try multiple times either in case of failure, then try_write_owned
taking the Arc
by-value can avoid an additional cloning step that would otherwise be necessary if it accepted &Arc
instead.
I mean, changing anything about the method now seems hard in terms of API stability. Especially since it uses the same error type as the non-_owned
variants of the try_
-methods, so it’s not even possible to neatly fit the Arc
into the error value, which would’ve otherwise been a possibly nice approach.
Looking through the API, perhaps it’s possible to offer a method for converting the non-owned RwLockWriteGuard
into an OwnedRwLockWriteGuard
by providing an Arc<RwLock<…>>
to the same underlying RwLock
(which would panic at run-time if an Arc
to a different lock was provided). This way, one could obtain an OwnedRwLockWriteGuard
using try_write
-method-calls that don’t need the Arc
-cloning step for every call.
Even if the existing API could be changed arbitrarily without breaking tokio 1.0, note that:
- IMO
Arc<Self>
is the correct type forwrite_owned
since that method will always eventually need ownership of anArc
, so taking it owned can avoid an unnecessary.clone()
in case theArc
isn’t needed anymore on the caller side; and additionally, by taking an ownedArc
, the future itself for the locking operation can be'static
, too, without needing to be wrapped into anasync
block. - So then, the
Arc<Self>
does have a small advantage fortry_write_owned
, too, in that it makes the API look “more consistent”.
On an unrelated note, I’d be curious how often usage of try_write_owned
comes up in practice in the first place. It’s probably less common than either of try_write
or write_owned
. If you came across this because you have a use-case for it, I’m somewhat interested as to what such a practical use-case looks like. (It’s not that I cannot come up with any good use-case at all myself, but I’m having a hard time imagining what a typical/natural use case might look like, right now.)
Ah, I was missing that. It makes sense!
Yes, I agree on that. I was particularly talking about the try_
case.
I experimented with re-using buffers for a software defined radio application. The idea was to try to use an existing buffer if no reader holds a lock on it (and otherwise allocate a new buffer). But I finally decided RwLock
isn't the best thing to use. So my use-case isn't a use-case anymore
Instead, I'll be trying something like this:
type Buffer = Vec<Complex<f32>>;
#[derive(Clone, Debug)]
struct ReadGuard {
buffer: Option<Arc<Buffer>>,
count: usize,
recycler: mpsc::UnboundedSender<Buffer>,
}
impl Drop for ReadGuard {
fn drop(&mut self) {
if let Ok(buffer) = Arc::try_unwrap(self.buffer.take().unwrap()) {
let _ = self.recycler.send(buffer);
}
}
}
impl Deref for ReadGuard {
type Target = [Complex<f32>];
fn deref(&self) -> &Self::Target {
&self.buffer.as_ref().unwrap()[0..self.count]
}
}
Full example (unfinished / work in progress)
[dependencies]
soapysdr = "0.3.2"
num-complex = "0.4.2"
num-traits = "0.2.11"
tokio = { version = "1", features = ["full"] }
use num_complex::Complex;
use soapysdr::{self, Direction::Rx};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::task::{spawn, spawn_blocking};
type Buffer = Vec<Complex<f32>>;
#[derive(Clone, Debug)]
struct ReadGuard {
buffer: Option<Arc<Buffer>>,
count: usize,
recycler: mpsc::UnboundedSender<Buffer>,
}
impl Drop for ReadGuard {
fn drop(&mut self) {
if let Ok(buffer) = Arc::try_unwrap(self.buffer.take().unwrap()) {
let _ = self.recycler.send(buffer);
}
}
}
impl Deref for ReadGuard {
type Target = [Complex<f32>];
fn deref(&self) -> &Self::Target {
&self.buffer.as_ref().unwrap()[0..self.count]
}
}
#[tokio::main]
async fn main() {
let dev = soapysdr::Device::new("").unwrap();
dev.set_frequency(Rx, 0, 100e6, "").unwrap();
dev.set_sample_rate(Rx, 0, 1024000.0).unwrap();
dev.set_bandwidth(Rx, 0, 1024000.0).unwrap();
let mut rx = dev.rx_stream::<Complex<f32>>(&[0]).unwrap();
rx.activate(None).unwrap();
let (send, mut recv) = broadcast::channel::<ReadGuard>(16);
let (recycler, mut dispenser) = mpsc::unbounded_channel::<Buffer>();
spawn(async move {
let file = File::create("output.raw").unwrap();
let mut writer = BufWriter::new(file);
loop {
let block = recv.recv().await.unwrap();
for sample in &*block {
writer.write_all(&sample.re.to_ne_bytes()).unwrap();
writer.write_all(&sample.im.to_ne_bytes()).unwrap();
}
}
});
spawn_blocking(move || loop {
let mut buffer: Buffer = dispenser.try_recv().unwrap_or_else(|_| {
println!("Create new buffer");
vec![Complex::<f32>::default(); rx.mtu().unwrap()]
});
let count = rx.read(&[&mut buffer], 1000000).unwrap();
if send
.send(ReadGuard {
buffer: Some(Arc::new(buffer)),
count,
recycler: recycler.clone(),
})
.is_err()
{
panic!("could not send data");
}
println!("sent {count}");
})
.await
.unwrap();
}
The idea is that ReadGuard
can be cloned, and when the last ReadGuard
is dropped, the buffer will be sent to recycler
.
Side question: Do you know, by any chance, how to get rid of the Figured it out: I can use Option
? I can't move out of buffer
because the drop
method doesn't own self
but only has a mutable reference to it.std::mem::take
in that case, because Vec
implements Default
and there is a blanket implementation of Default
for Arc
as well.
P.S.: There are some other flaws in the example code as well. For example, I don't need to store a count
; I could do that by shortening the Vec
.
Updated example (but still unfinished / work in progress)
[dependencies]
soapysdr = "0.3.2"
num-complex = "0.4.2"
num-traits = "0.2.11"
tokio = { version = "1", features = ["full"] }
use num_complex::Complex;
use soapysdr::{self, Direction::Rx};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::mem::{replace, take};
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::task::{spawn, spawn_blocking};
type Sample = Complex<f32>;
#[derive(Clone, Debug)]
struct BufGuard<T> {
buffer: Arc<Vec<T>>,
recycler: mpsc::UnboundedSender<Vec<T>>,
}
impl<T> Drop for BufGuard<T> {
fn drop(&mut self) {
if let Ok(buffer) = Arc::try_unwrap(take(&mut self.buffer)) {
let _ = self.recycler.send(buffer);
}
}
}
impl<T> Deref for BufGuard<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.buffer
}
}
struct BufPool<T> {
recycler: mpsc::UnboundedSender<Vec<T>>,
dispenser: mpsc::UnboundedReceiver<Vec<T>>,
current: Vec<T>,
}
impl<T> BufPool<T> {
fn new() -> Self {
let (recycler, dispenser) = mpsc::unbounded_channel::<Vec<T>>();
Self {
recycler,
dispenser,
current: Vec::new(),
}
}
fn current(&mut self) -> &mut Vec<T> {
&mut self.current
}
fn finish(&mut self) -> BufGuard<T> {
let replacement = match self.dispenser.try_recv() {
Ok(mut buffer) => {
buffer.clear();
buffer
}
Err(_) => {
println!("Create new buffer");
Vec::new()
}
};
BufGuard {
buffer: Arc::new(replace(&mut self.current, replacement)),
recycler: self.recycler.clone(),
}
}
}
#[tokio::main]
async fn main() {
let dev = soapysdr::Device::new("").unwrap();
dev.set_frequency(Rx, 0, 100e6, "").unwrap();
dev.set_sample_rate(Rx, 0, 1024000.0).unwrap();
dev.set_bandwidth(Rx, 0, 1024000.0).unwrap();
let mut rx = dev.rx_stream::<Sample>(&[0]).unwrap();
rx.activate(None).unwrap();
let (rx_rf_send, mut rx_rf_recv) = broadcast::channel::<BufGuard<Sample>>(16);
spawn(async move {
let file = File::create("output.raw").unwrap();
let mut writer = BufWriter::new(file);
loop {
let block = rx_rf_recv.recv().await.unwrap();
for sample in &*block {
writer.write_all(&sample.re.to_ne_bytes()).unwrap();
writer.write_all(&sample.im.to_ne_bytes()).unwrap();
}
}
});
spawn_blocking(move || {
let mut buf_pool = BufPool::<Sample>::new();
let mtu = rx.mtu().unwrap();
loop {
let mut buffer = buf_pool.current();
buffer.resize_with(mtu, Default::default);
let count = rx.read(&[&mut buffer], 1000000).unwrap();
buffer.truncate(count);
if rx_rf_send.send(buf_pool.finish()).is_err() {
panic!("could not send data because all receivers died");
}
println!("sent {count} samples");
}
})
.await
.unwrap();
}
Taking self: &Arc<Self>
is not possible on Tokio's minimum supported Rust version.
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.