You can have the get_or_init call wait for the task to return its output like this:
use std::pin::Pin;
use std::sync::Mutex;
use std::future::Future;
use tokio::sync::OnceCell;
use tokio::task::JoinHandle;
use futures::future::poll_fn;
struct LateInit<T> {
recv: Mutex<Option<JoinHandle<T>>>,
value: OnceCell<Option<T>>,
}
impl<T> LateInit<T> {
fn new(task: JoinHandle<T>) -> Self {
Self {
recv: Mutex::new(Some(task)),
value: OnceCell::new(),
}
}
async fn get(&self) -> Option<&T> {
let recv = &self.recv;
self.value.get_or_init(|| poll_fn(|cx| {
let opt_recv = &mut *recv.lock().unwrap();
let res = Pin::new(opt_recv.as_mut().unwrap()).poll(cx);
if res.is_ready() {
*opt_recv = None;
}
res.map(Result::ok)
})).await.as_ref()
}
}
The correctness of this relies on the fact that OnceCell will ensure that only one get_or_init initializer can run at any given time. The get method will return None if the task exits with an error (e.g. if it panics).
Okey, I got why you wrote this in the way you wrote it, thanks again!
Am I got it right (I'm not very familiar with Future/wake internals), mutex can be locked two times?
By same reason, I can't use std::cell::Cell?
poll_fn(|cx| {
let mut init = self.init.take().expect("Unreachable");
let ret = init.poll_unpin(cx);
// Because get can be canceled, erase only after ready.
if ret.is_pending() {
self.init.set(Some(init));
}
ret
})