Async background initialisation

Greetings!

I'm trying to achieve background initialisation. Something like that:

struct Storage<T> {
    datas: HashMap<usize, LateInit<T>>
}

impl<T> Storage<T> {
    pub fn insert<F, Fut>(&mut self, init: Fut) -> usize {
        static COUNTER: AtomicUsize = AtomicUsize::new(0);
        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
        
        let data = BackgroundInit::new(tokio::task::spawn(async move {
            init.await
        }));
        
        self.datas.insert(id, data);
        
        id
    }
    
    pub async fn get(&self, id: usize) -> &T {
        self.datas.get(&id).await
    }
}

I can't use tokio::sync::OnceCell::get_or_init.

I tried to use tokio::sync::{Notify, RwLock}, but there is 2 problems:

  1. I can't easily return reference in getter;
  2. between dropping read guard and Notify::notified can write() + notify_waiters can appear and notified will be wait forever;

You can have the get_or_init call wait for the task to return its output like this:

use std::pin::Pin;
use std::sync::Mutex;
use std::future::Future;
use tokio::sync::OnceCell;
use tokio::task::JoinHandle;
use futures::future::poll_fn;

struct LateInit<T> {
    recv: Mutex<Option<JoinHandle<T>>>,
    value: OnceCell<Option<T>>,
}

impl<T> LateInit<T> {
    fn new(task: JoinHandle<T>) -> Self {
        Self {
            recv: Mutex::new(Some(task)),
            value: OnceCell::new(),
        }
    }

    async fn get(&self) -> Option<&T> {
        let recv = &self.recv;
        self.value.get_or_init(|| poll_fn(|cx| {
            let opt_recv = &mut *recv.lock().unwrap();
            let res = Pin::new(opt_recv.as_mut().unwrap()).poll(cx);
            if res.is_ready() {
                *opt_recv = None;
            }
            res.map(Result::ok)
        })).await.as_ref()
    }
}

The correctness of this relies on the fact that OnceCell will ensure that only one get_or_init initializer can run at any given time. The get method will return None if the task exits with an error (e.g. if it panics).

1 Like

Thanks!

I come to this:

struct LateInit<T> {
    init: Mutex<Option<JoinHandleWrapper<T>>>,
    data: OnceCell<T>,
}

impl<T> LateInit<T>
where
    T: Send + 'static,
{
    #[inline(always)]
    pub fn new<F, Fut>(init: F) -> Self
    where
        F: FnOnce() -> Fut + Send + 'static,
        Fut: Future<Output = T> + Send + 'static,
    {
        let init = Some(JoinHandleWrapper(tokio::task::spawn(init()))).into();
        let data = OnceCell::default();

        Self { init, data }
    }

    #[inline(always)]
    async fn get(&self) -> &T {
        self.data
            .get_or_init(|| {
                self.init
                    .lock()
                    .expect("Unreachable")
                    .take()
                    .expect("Unreachable")
            })
            .await
    }
}

impl<'a, T> Future for &'a LateInit<T>
where
    T: Send + 'static,
{
    type Output = &'a T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Box::pin(self.get()).poll_unpin(cx)
    }
}

Your code will fail if any of the get calls are cancelled, because it doesn't put the JoinHandle back.

You can't do this. This kind of stuff will generally not work.

impl<'a, T> Future for &'a LateInit<T>
where
    T: Send + 'static,
{
    type Output = &'a T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Box::pin(self.get()).poll_unpin(cx)
    }
}

Yeah, I got freeze =(

Okey, I got why you wrote this in the way you wrote it, thanks again!
Am I got it right (I'm not very familiar with Future/wake internals), mutex can be locked two times?

By same reason, I can't use std::cell::Cell?

                poll_fn(|cx| {
                    let mut init = self.init.take().expect("Unreachable");
                    let ret = init.poll_unpin(cx);
                    // Because get can be canceled, erase only after ready.
                    if ret.is_pending() {
                        self.init.set(Some(init));
                    }
                    ret
                })

A mutex can certainly be locked more than once if that's your question. It just can't happen at the same time.

The Cell type is not thread safe. You could use it if the runtime was single-threaded and the task on a LocalSet.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.