I want a background task (or thread, or something like that) to repeatingly read from this blocking function:
And then feed the result into a radiorust::flow::Sender
.
Currently, I do it as follows:
let join_handle = spawn_blocking(move || {
let rt = runtime::Handle::current();
let mut buf_pool = ChunkBufPool::<Complex<f32>>::new();
let mut result = Ok(());
while !abort_recv.has_changed().unwrap_or(true) {
let mut buffer = buf_pool.get();
buffer.resize_with(mtu, Default::default);
let count = match rx_stream.read(&[&mut buffer], 1000000) {
Ok(x) => x,
Err(err) => {
result = Err(err);
break;
}
};
buffer.truncate(count);
if let Err(_) = rt.block_on(sender.send(Samples {
sample_rate,
chunk: buffer.finalize(),
})) {
break;
}
}
if let Err(err) = rx_stream.deactivate(None) {
if result.is_ok() {
result = Err(err);
}
}
SoapySdrRxRetval { rx_stream, result }
});
Notice that I'm in an
-
async
context ofradiorust::blocks::io::rf::soapysdr::SoapySdrRx::activate
, then use-
tokio::task::spawn_blocking
to allow blocking code to execute (in a loop), but then use-
tokio::runtime::Handle::block_on
to allow execution of async code, namely
-
-
So I go from async
to sync, and then back to async
. This feels a bit awkward. Is it the right way to go?
I wondered if maybe it's better to do something like this:
-let join_handle = spawn_blocking(move || {
- let rt = runtime::Handle::current();
+let join_handle = spawn(async move {
let mut buf_pool = ChunkBufPool::<Complex<f32>>::new();
let mut result = Ok(());
while !abort_recv.has_changed().unwrap_or(true) {
let mut buffer = buf_pool.get();
buffer.resize_with(mtu, Default::default);
- let count = match rx_stream.read(&[&mut buffer], 1000000) {
+ let (read, tmp) = spawn_blocking(move || {
+ let read = rx_stream.read(&[&mut buffer], 1000000);
+ (read, (rx_stream, buffer))
+ }).await.unwrap();
+ rx_stream = tmp.0;
+ buffer = tmp.1;
+ let count = match read {
Ok(x) => x,
Err(err) => {
result = Err(err);
break;
}
};
buffer.truncate(count);
- if let Err(_) = rt.block_on(sender.send(Samples {
+ if let Err(_) = sender.send(Samples {
sample_rate,
chunk: buffer.finalize(),
- })) {
+ }).await {
break;
}
This way, I would only go from async
to sync, but not back. However, I wonder if this comes with more overhead because spawn_blocking
is called inside the while
loop now. How efficient is tokio
on spawning threads for blocking operations?
What's the best way to go?