Scoped `spawn`, unsafe implementation hazards

Hi, to preface, I'm doing a project where it's generally async work with some compute-tasks, specifically compression/decompression.
My message-sizes have an upper bound, and the compression/decompression generally ranges between 10-300 microseconds, I'd like to offload these tasks out of the async runtime, and yield back with an await while waiting for the work to be done.

The caveat here is that I'm working with messages inside of a buffer, that's written to a separate buffer, and I do not want to allocate twice to accomodate the 'static bound required for spawn (in this case Rayon, although std::thread::spawn, (and tokio, although that's not really relevant here)).

To describe the real world case a bit more:
I'm reading bytes off of a stream, and serializing those directly off the stream, sometimes that serialization can be done zero-copy. Now I e.g. decompress off that stream, writing into a separate pre-allocated buffer (growble), then (de)serializing from that, similarly sometimes zero-copy.

Currently, I'm doing this safely by taking ownership of both buffers by core::mem::take(), passing them to the thread pool, and then returning them through a result, putting them back as they were through assignment.

Pseudo:

fn compress(stream_buffer: &mut Vec<u8>, range: Range<usize>, decompression_buffer: &mut Vec<u8>) {
    let stream_owned = core::mem::take(stream_buffer);
    let decompression_owned = core::mem::take(decompression_buffer);
    
    // Pseudocode, rayon doesn't actually return
    let (stream_ret, decompress_ret) = rayon::spawn(move || {
        do_compress(&mut stream_owned[range], &mut decompression_owned);
       (stream_owned, decompression_owned)
    };
    *stream_buffer = stream_ret;
    *decompression_buffer = decompress_ret;
)
}

This solution is mostly okay, but very messy considering error-handling etc.
It also suffers a bit from the move (maybe).

The alternative that I came up with looks like this, reproducible actual compiling code on rustc 1.83:

// Cargo.toml

[package]
name = "transmute-lt-test"
version = "0.1.0"
edition = "2021"

[dependencies]
rayon = "1.10.0"
tokio = { version = "1.42.0", features = ["sync", "macros", "rt-multi-thread"] }

// main.rs

use std::marker::PhantomData;

#[tokio::main]
async fn main() {
    run(b"hello!").await;

}

async fn run(bytes: &[u8]) {
    let mut func = || {
        std::thread::sleep(std::time::Duration::from_secs(2));
        println!("Bytes = {bytes:?}");
    };
    let jh = unsafe {
        spawn_scoped(&mut func)
    };
    // Compilation error if `func` is dropped here, as expected and wanted.
    jh.join().await;

}

struct JoinHandle<'scope> {
    _pd: std::marker::PhantomData<&'scope()>,
    handle: tokio::sync::oneshot::Receiver<()>
}

impl<'scope> JoinHandle<'scope> {
    pub async fn join(self) {
        self.handle.await.unwrap();
    }
}

unsafe fn spawn_scoped<'scope, F: FnMut() + Send + Sync + 'scope>(func: &'scope mut F) -> JoinHandle<'scope> {
    let fn_scoped: &'scope mut (dyn FnMut() + Send + Sync + 'scope) = func;
    let fn_static = upgrade_dyn_scope(fn_scoped);
    let (sender, receiver) = tokio::sync::oneshot::channel();
    rayon::spawn(move || {
        fn_static();
        let _ = sender.send(());
    });
    JoinHandle {
        _pd: PhantomData,
        handle: receiver,
    }
}

fn upgrade_dyn_scope<'scope>(func: &mut (dyn FnMut() + Send + Sync + 'scope)) -> &'static mut (dyn FnMut() + Send + Sync + 'static){
    unsafe {
        core::mem::transmute(func)
    }
}

This would allow me to just pass references to the buffers. As far as I can tell, the only real danger here is if using further lifetime-shennanigans like mem::forget, or more transmuting. This isn't some library that I'm publishing either so I don't have to take misuse into account more than keeping the unsafe fn unsafe, and commenting for my own sanity that mem::forget can cause UB here. What I'm wondering is if there is other inherent UB that I'm risking/causing doing this, and/or if I'm in territory where (known possible) compiler-changes risk making it UB.

Thanks a lot for reading.

Edit:

Ran a synthetic benchmark of the two strategies, unsafe vs move, and the difference was meaningless compared to the compression-time.

I might try a real benchmark if I find the time and get back with the results.

Have you considered using tokio::task::block_in_place ? It allows you to use the current thread to run a blocking function without lifetime issues.

Your spawn_scoped example would lead to use-after-free if the task is cancelled, which you can't prevent from the body of the function.

I might be mistaken, but I believe spawning and handling a thread will end up having more than 10-300 microseconds of overhead. You could have a persistent background thread, but that didn't seem to be your goal (though I may have misunderstood).

I agree. But you could keep a thread alive (or use a thread pool) for this purpose.

Ouch that's a killer, didn't consider the task getting cancelled, great input!

I might be misunderstanding you, but one of the requirements is not blocking the async task, that's currently running.

My pseudocode example up top actually did pretty well in practice. On task cancellation (or join error) it'll be a protocol-error instead of UB, and in my application that's acceptable!

Yeah, in the real example I'm using rayon so that overhead isn't problematic.