I'm writing an implementation of hyper::body::Body that receives data loaded from disk. The data is transferred over tokio::sync::mpsc::channel, something like:
struct FileBody {
chunk_req_sender: tokio::sync::mpsc::Sender<(u64, u64)>,
chunk_data_receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
//some other stuff ...
}
however, I find it extremely difficult to call Sender::send()
and Receiver::recv()
in hyper's poll_frame
function. In the end I have to store the futures in an Option field of FileBody and get creative by wrapping the async functions that I need to call into an async "bundle", like this:
struct AsyncBundle {
chunk_req_sender: tokio::sync::mpsc::Sender<(u64, u64)>,
chunk_data_receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
}
impl AsyncBundle {
pub async fn get_next_chunk(
mut self,
start: u64,
end: u64,
) -> Result<(Vec<u8>, AsyncBundle), anyhow::Error> {
if let Err(_) = self.chunk_req_sender.send((start, end)).await {
anyhow::bail!("send error");
}
let data = match self.chunk_data_receiver.recv().await {
Some(data) => data,
_ => anyhow::bail!("recv error"),
};
Ok((data, self))
}
}
The modified FileBody would become:
struct FileBody {
bundle: Option<AsyncBundle>,
bundle_future: Option<BundleFuture>
//some other stuff ...
}
and in poll_frame
, I'd do:
impl FileBody {
fn poll_frame(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Option<Result<hyper::body::Frame<hyper::body::Bytes>, Error>>> {
if let Some(bundle) = self.bundle.take() {
let bundle_future = bundle.get_next_chunk(self.next_start, self.next_end);
self.bundle_future = Some(Box::pin(bundle_future));
}
if let Some(bundle_future) = self.bundle_future.as_mut() {
match bundle_future.poll_unpin(cx) {
Poll::Ready(bundle_result)=> {
if let Ok((data, bundle)) = bundle_result{
self.bundle_future.take();
self.bundle = Some(bundle);
//some other stuff
Poll::Ready(Some(data))
} else {
Poll::Ready(anyhow!("error"))
}
}
Poll::Pending => Poll::Pending
}
} else {
Poll::Ready(None)
}
}
}
To deal with the lifetime issue, I have to let get_next_chunk take ownership of itself and then return itself back. Anyway, this works, but there is a lot of data moving around frequently. I wonder if this hurts performance and if there is a better design. Surely it can't be that complicated to load data into a hyper::body::Body?