Hi, to preface, I'm doing a project where it's generally async work with some compute-tasks, specifically compression/decompression.
My message-sizes have an upper bound, and the compression/decompression generally ranges between 10-300 microseconds, I'd like to offload these tasks out of the async runtime, and yield back with an await while waiting for the work to be done.
The caveat here is that I'm working with messages inside of a buffer, that's written to a separate buffer, and I do not want to allocate twice to accomodate the 'static bound required for spawn
(in this case Rayon, although std::thread::spawn
, (and tokio, although that's not really relevant here)).
To describe the real world case a bit more:
I'm reading bytes off of a stream, and serializing those directly off the stream, sometimes that serialization can be done zero-copy. Now I e.g. decompress off that stream, writing into a separate pre-allocated buffer (growble), then (de)serializing from that, similarly sometimes zero-copy.
Currently, I'm doing this safely by taking ownership of both buffers by core::mem::take(), passing them to the thread pool, and then returning them through a result, putting them back as they were through assignment.
Pseudo:
fn compress(stream_buffer: &mut Vec<u8>, range: Range<usize>, decompression_buffer: &mut Vec<u8>) {
let stream_owned = core::mem::take(stream_buffer);
let decompression_owned = core::mem::take(decompression_buffer);
// Pseudocode, rayon doesn't actually return
let (stream_ret, decompress_ret) = rayon::spawn(move || {
do_compress(&mut stream_owned[range], &mut decompression_owned);
(stream_owned, decompression_owned)
};
*stream_buffer = stream_ret;
*decompression_buffer = decompress_ret;
)
}
This solution is mostly okay, but very messy considering error-handling etc.
It also suffers a bit from the move (maybe).
The alternative that I came up with looks like this, reproducible actual compiling code on rustc 1.83:
// Cargo.toml
[package]
name = "transmute-lt-test"
version = "0.1.0"
edition = "2021"
[dependencies]
rayon = "1.10.0"
tokio = { version = "1.42.0", features = ["sync", "macros", "rt-multi-thread"] }
// main.rs
use std::marker::PhantomData;
#[tokio::main]
async fn main() {
run(b"hello!").await;
}
async fn run(bytes: &[u8]) {
let mut func = || {
std::thread::sleep(std::time::Duration::from_secs(2));
println!("Bytes = {bytes:?}");
};
let jh = unsafe {
spawn_scoped(&mut func)
};
// Compilation error if `func` is dropped here, as expected and wanted.
jh.join().await;
}
struct JoinHandle<'scope> {
_pd: std::marker::PhantomData<&'scope()>,
handle: tokio::sync::oneshot::Receiver<()>
}
impl<'scope> JoinHandle<'scope> {
pub async fn join(self) {
self.handle.await.unwrap();
}
}
unsafe fn spawn_scoped<'scope, F: FnMut() + Send + Sync + 'scope>(func: &'scope mut F) -> JoinHandle<'scope> {
let fn_scoped: &'scope mut (dyn FnMut() + Send + Sync + 'scope) = func;
let fn_static = upgrade_dyn_scope(fn_scoped);
let (sender, receiver) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
fn_static();
let _ = sender.send(());
});
JoinHandle {
_pd: PhantomData,
handle: receiver,
}
}
fn upgrade_dyn_scope<'scope>(func: &mut (dyn FnMut() + Send + Sync + 'scope)) -> &'static mut (dyn FnMut() + Send + Sync + 'static){
unsafe {
core::mem::transmute(func)
}
}
This would allow me to just pass references to the buffers. As far as I can tell, the only real danger here is if using further lifetime-shennanigans like mem::forget
, or more transmuting. This isn't some library that I'm publishing either so I don't have to take misuse into account more than keeping the unsafe fn unsafe, and commenting for my own sanity that mem::forget
can cause UB here. What I'm wondering is if there is other inherent UB that I'm risking/causing doing this, and/or if I'm in territory where (known possible) compiler-changes risk making it UB.
Thanks a lot for reading.
Edit:
Ran a synthetic benchmark of the two strategies, unsafe vs move, and the difference was meaningless compared to the compression-time.
I might try a real benchmark if I find the time and get back with the results.