Call async function that references self in a poll function

I'm writing an implementation of hyper::body::Body that receives data loaded from disk. The data is transferred over tokio::sync::mpsc::channel, something like:

struct FileBody {
    chunk_req_sender: tokio::sync::mpsc::Sender<(u64, u64)>,
    chunk_data_receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
    //some other stuff ...
}

however, I find it extremely difficult to call Sender::send() and Receiver::recv() in hyper's poll_frame function. In the end I have to store the futures in an Option field of FileBody and get creative by wrapping the async functions that I need to call into an async "bundle", like this:

struct AsyncBundle {
    chunk_req_sender: tokio::sync::mpsc::Sender<(u64, u64)>,
    chunk_data_receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
}

impl AsyncBundle {
    pub async fn get_next_chunk(
        mut self,
        start: u64,
        end: u64,
    ) -> Result<(Vec<u8>, AsyncBundle), anyhow::Error> {
        if let Err(_) = self.chunk_req_sender.send((start, end)).await {
            anyhow::bail!("send error");
        }
        let data = match self.chunk_data_receiver.recv().await {
            Some(data) => data,
            _ => anyhow::bail!("recv error"),
        };
        Ok((data, self))
    }
}

The modified FileBody would become:

struct FileBody {
    bundle: Option<AsyncBundle>,
    bundle_future: Option<BundleFuture>
    //some other stuff ...
}

and in poll_frame, I'd do:

impl FileBody {
    fn poll_frame(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Option<Result<hyper::body::Frame<hyper::body::Bytes>, Error>>> {
        if let Some(bundle) = self.bundle.take() {            
            let bundle_future = bundle.get_next_chunk(self.next_start, self.next_end);
            self.bundle_future = Some(Box::pin(bundle_future));                        
        } 
        if let Some(bundle_future) = self.bundle_future.as_mut() {
            match bundle_future.poll_unpin(cx) {
                Poll::Ready(bundle_result)=> {
                    if let Ok((data, bundle)) = bundle_result{
                        self.bundle_future.take();
                        self.bundle = Some(bundle);
                        //some other stuff
                        Poll::Ready(Some(data))
                    } else {
                        Poll::Ready(anyhow!("error"))
                    }
                }
                Poll::Pending => Poll::Pending
            }
        } else {
            Poll::Ready(None)
        }
    }
}

To deal with the lifetime issue, I have to let get_next_chunk take ownership of itself and then return itself back. Anyway, this works, but there is a lot of data moving around frequently. I wonder if this hurts performance and if there is a better design. Surely it can't be that complicated to load data into a hyper::body::Body?

If your goal is just to create a hyper body, then I strongly recommend that you do not attempt do so using poll functions. They are extremely inconvenient.

Instead, I recommend the async-stream crate. Another possibility is to use futures::stream::unfold.

2 Likes

for the receiver, you can poll the receiver directly, see:

for the sender, it cannot be polled directly, but you can use the PollSender utility:

1 Like

Thank you, Alice and Nerd, those are some expert advice there. With the mpsc::Sender wrapper, I think I'll be able to simplify the poll_frame function. While I haven't figured out how to make use of async-stream and unfold yet, they definitely look interesting, even magical. Need to research on them.

Here's an example using unfold.

use futures::Stream;

struct MyState {}

impl MyState {
    /// None = end of Stream
    async fn next_chunk(&mut self) -> Option<String> {
        todo!()
    }
}

fn get_stream(state: MyState) -> impl Stream<Item = String> {
    futures::stream::unfold(state, |mut state| async move { Some((state.next_chunk().await?, state)) })
}

Then you use a StreamBody to convert the Stream into a hyper body.

2 Likes

This is indeed a clear solution. I couldn't come up with it on my own. Thanks for the tip!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.