Sending to a oneshot channel in an implementation of `Body` in Hyper fails

Hi. I want to use oneshot channels to generate response bodies in Hyper. However, these oneshot channels fail. Below is the minimal example.

Start the program. Make any HTTP request to the address mentioned in the program. Response generation freezes becauseMyBody::call freezes because sender.send('g') returns an error:

[src/main.rs:16:17] "sent" = "sent"
[src/main.rs:57:13] sender.send('g') = Err(
    'g',
)

I read that this may be because the receiver is dropped. I don't understand how this is possible: "sent" is printed, and then we go to receiver.await.

If you replace code0 with code1, you'll get a response.

use std::future::Future as _;
use std::task::Poll;
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
use hyper::{server::conn::http1, service::service_fn};
use http::{response::Response, status::StatusCode};

struct MyBody(mpsc::Sender<oneshot::Sender<char>>, bool);

impl MyBody {
    async fn call(&self) -> char {
        let (sender, receiver) = oneshot::channel();
        match self.0.send(sender).await {
            Err(error) => { dbg!(error); 's' },
            Ok(()) => {
                dbg!("sent");
                match receiver.await {
                    Err(error) => { dbg!(error); 'r' },
                    Ok(a) => { dbg!(a); a },
                }
            },
        }
    }
}

impl http_body::Body for MyBody {
    type Data = Bytes;
    type Error = std::convert::Infallible;

    fn poll_frame(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
        /* code0 */
        {
            let mut pinned = Box::pin(self.call());
            pinned.as_mut().poll(cx).map(|a|
                Some(Ok(http_body::Frame::data(vec![a as u8].into())))
            )
        }
        /* code1
        {
            if self.1 { Poll::Ready(None) }
            else {
                self.1 = true;
                Poll::Ready(Some(Ok(http_body::Frame::data("here".into()))))
            }
        } */
    }
}

#[tokio::main]
async fn main() {
    let (sender, mut receiver) = mpsc::channel::<oneshot::Sender<char>>(1);
    tokio::spawn(async move {
        while let Some(sender) = receiver.recv().await {
            dbg!(sender.send('g'));
        }
    });
    let tcp_listener = tokio::net::TcpListener::bind(
        std::net::SocketAddr::from(([127, 0, 0, 1], 17680))
    ).await.unwrap();
    let (stream, _) = tcp_listener.accept().await.unwrap();
    let conn = http1::Builder::new().serve_connection(
        hyper_util::rt::TokioIo::new(stream),
        service_fn(move |_| {
            let sender = sender.clone();
            async move {
                Response::builder()
                    .status(StatusCode::OK)
                    .body(MyBody(sender, false))
            }
        }),
    );
    conn.await.unwrap();
}

“Cargo.toml”:

[package]
name = "package"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1.10.1"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "sync"] }
http = "1.3.1"
http-body = "1.0.1"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }

So this code

    fn poll_frame(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
        /* code0 */
        {
            let mut pinned = Box::pin(self.call());
            pinned.as_mut().poll(cx).map(|a|
                Some(Ok(http_body::Frame::data(vec![a as u8].into())))
            )
        }
    }

is implementing a poll-method by code that

  • constructing a new future for the self.call() call
  • then polling this newly constructed future

The issue here is probably that you’re re-starting a new self.call() execution on every single poll, even though for correct behavior, you should probably be resuming an in-flight execution of self.call().

For suggestions on how to “properly” fix this, I’ll have to look up more context of the relevant traits here, I’m not personally very familiar with the APIs in question, such as the http_body::Body trait.


Edit: Indeed, the poll method being written this way means that the future for self.call() is immediately dropped once it starts waiting for a response (by returning Pending), which is why the sender.send('g') will always fail.

The http_body::Body trait seems to be very low-level intentionally, it’s probably easier to find an appropriate existing implementor instead of making an implementation manually. Maybe you could use StreamBody in http_body_util - Rust and then build up a Stream for it using API from futures_util::stream - Rust

1 Like

Yes, the issue is that calling poll will not poll the Future to completion, it only polls once and then throws it away. You need to implement the full Future state machine in Body::poll_frame.

Or, maybe better, implement the async fn as a generator (uhhhm, there's the genawaiter crate, at least).

use bytes::Bytes;
use futures_core::Stream as _;
use genawaiter::sync::{r#gen, Gen};
use genawaiter::yield_;
use http::{response::Response, status::StatusCode};
use hyper::{server::conn::http1, service::service_fn};
use std::pin::Pin;
use std::task::Poll;
use tokio::sync::{mpsc, oneshot};

struct MyBody(mpsc::Sender<oneshot::Sender<char>>);

impl MyBody {
    fn call(self) -> Gen<char, (), impl Future<Output = ()>> {
        r#gen!({
            loop {
                let (sender, receiver) = oneshot::channel();
                match self.0.send(sender).await {
                    Err(error) => {
                        dbg!(error);
                        yield_!('s');
                    }
                    Ok(()) => {
                        dbg!("sent");
                        match receiver.await {
                            Err(error) => {
                                dbg!(error);
                                yield_!('r');
                            }
                            Ok(a) => {
                                dbg!(a);
                                yield_!(a);
                            }
                        }
                    }
                }
            }
        })
    }
}

struct MyBodyProducer<F>(Pin<Box<Gen<char, (), F>>>)
where
    F: Future<Output = ()>;

impl<F> http_body::Body for MyBodyProducer<F>
where
    F: Future<Output = ()>,
{
    type Data = Bytes;
    type Error = std::convert::Infallible;

    fn poll_frame(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
        self.0
            .as_mut()
            .poll_next(cx)
            .map(|a| a.map(|a| Ok(http_body::Frame::data(vec![a as u8].into()))))
    }
}

#[tokio::main]
async fn main() {
    let (sender, mut receiver) = mpsc::channel::<oneshot::Sender<char>>(1);
    tokio::spawn(async move {
        while let Some(sender) = receiver.recv().await {
            dbg!(sender.send('g'));
        }
    });
    let tcp_listener =
        tokio::net::TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 17680)))
            .await
            .unwrap();
    let (stream, _) = tcp_listener.accept().await.unwrap();
    let conn = http1::Builder::new().serve_connection(
        hyper_util::rt::TokioIo::new(stream),
        service_fn(move |_| {
            let sender = sender.clone();
            let my_body = Box::pin(MyBody(sender).call());
            async move {
                Response::builder()
                    .status(StatusCode::OK)
                    .body(MyBodyProducer(my_body))
            }
        }),
    );
    conn.await.unwrap();
}

FWIW, this is a very bad implementation with a ton of overhead. Streaming one byte at a time is almost certainly not what you want.

To demonstrate the feasibility of these ideas, here’s something that works with minimal changes:

Add a conversion method that turns your MyBody into an actual Body-implementor which is a different type using StreamBody and futures_util::stream as noted:

impl MyBody {
    fn into_body(self) -> impl http_body::Body<Data = Bytes, Error = std::convert::Infallible> {
        http_body_util::StreamBody::new(futures_util::stream::once(async move {
            let a = self.call().await;
            Ok(http_body::Frame::data(vec![a as u8].into()))
        }))
    }
}

update the use-site to call this method

-                   .body(MyBody(sender, false))
+                   .body(MyBody(sender, false).into_body())

and add futures_util to Cargo.toml.

Thanks. That was a stupid mistake :flushed_face:. I went with the proposal of writing my own state machine. Below is the real code.

type ReadOutput = Result<Bytes, CallDiskError>;

/// Transfers torrent content in a slice of a torrent file
/// from [`vincenzo::disk::Disk`] to Hyper.
/// It takes content via [`ContentBody::slice_reader`].
/// Taking content reduces the slice.
/// It gives content by implementing [`http_body::Body`].
#[pin_project]
struct ContentBody {
    slice_reader: SliceReader,

    /// This is `Some(future)` iff there is an outstanding request
    /// to [`vincenzo::disk::Disk`]. In this case,
    /// the output of `future` will be the response to this request.
    /// See `<ContentBody as http_body::Body>::poll_frame`
    /// to understand how this is used.
    #[pin] read_future: Option<<SliceReader as SliceReaderI>::ReadFuture>,

    /// The remaining size of the slice.
    rem_size: u64,
}

impl ContentBody {
    async fn new(
        cs: ContentServer,
        file_id: FileId,
        range_in_file: Range<u64>,
    ) -> Result<Self, CallDiskError> {
        let rem_size = range_from::range_len(range_in_file.clone());
        cs.new_slice(file_id, range_in_file).await.map(|slice_reader| {
            Self { slice_reader, read_future: None, rem_size }
        })
    }

    fn handle_resp(
        self: Pin<&mut Self>,
        resp: Poll<ReadOutput>,
    ) ->
    Poll<
        Option<
            Result<
                http_body::Frame<<ContentBody as http_body::Body>::Data>,
                <ContentBody as http_body::Body>::Error,
            >
        >
    >
    {
        resp.map(|resp| {
            let mut selfp = self.project();
            selfp.read_future.set(None);
            Some(resp.map(|mut chunk| {
                let size = u64::try_from(chunk.len()).unwrap();
                let rem_size = selfp.rem_size;
                match rem_size.checked_sub(size) {
                    None => {
                        warn!(
                            size,
                            rem_size,
                            "the HTTP server received a chunk of content \
                                larger than the remaining amount of content",
                        );
                        chunk.truncate((*rem_size).try_into().unwrap());
                        *rem_size = 0;
                    },
                    Some(new_rem_size) => {
                        debug!(size, new_rem_size, "ContentBody received content");
                        *rem_size = new_rem_size;
                    },
                }
                http_body::Frame::data(chunk)
            }))
        })
    }
}

impl http_body::Body for ContentBody {
    type Data = Bytes;
    type Error = CallDiskError;

    /// Removes and returns a prefix of the slice
    /// by calling the [`SliceReaderI::read`] method
    /// of the [`ContentBody::slice_reader`] field.
    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
        let selfp = self.as_mut().project();
        let mut read_future = selfp.read_future;
        match read_future.as_mut().as_pin_mut() {
            None => if *selfp.rem_size == 0 {
                Poll::Ready(None)
            } else {
                read_future.as_mut().set(Some(selfp.slice_reader.read()));
                let resp = read_future.as_pin_mut().unwrap().poll(cx);
                self.handle_resp(resp)
            },
            Some(read_future) => {
                let resp = read_future.poll(cx);
                self.handle_resp(resp)
            },
        }
    }

    fn size_hint(&self) -> http_body::SizeHint {
        http_body::SizeHint::with_exact(self.rem_size)
    }
}