Hi. I want to use oneshot channels to generate response bodies in Hyper. However, these oneshot channels fail. Below is the minimal example.
Start the program. Make any HTTP request to the address mentioned in the program. Response generation freezes becauseMyBody::call
freezes because sender.send('g')
returns an error:
[src/main.rs:16:17] "sent" = "sent"
[src/main.rs:57:13] sender.send('g') = Err(
'g',
)
I read that this may be because the receiver is dropped. I don't understand how this is possible: "sent"
is printed, and then we go to receiver.await
.
If you replace code0
with code1
, you'll get a response.
use std::future::Future as _;
use std::task::Poll;
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
use hyper::{server::conn::http1, service::service_fn};
use http::{response::Response, status::StatusCode};
struct MyBody(mpsc::Sender<oneshot::Sender<char>>, bool);
impl MyBody {
async fn call(&self) -> char {
let (sender, receiver) = oneshot::channel();
match self.0.send(sender).await {
Err(error) => { dbg!(error); 's' },
Ok(()) => {
dbg!("sent");
match receiver.await {
Err(error) => { dbg!(error); 'r' },
Ok(a) => { dbg!(a); a },
}
},
}
}
}
impl http_body::Body for MyBody {
type Data = Bytes;
type Error = std::convert::Infallible;
fn poll_frame(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
/* code0 */
{
let mut pinned = Box::pin(self.call());
pinned.as_mut().poll(cx).map(|a|
Some(Ok(http_body::Frame::data(vec![a as u8].into())))
)
}
/* code1
{
if self.1 { Poll::Ready(None) }
else {
self.1 = true;
Poll::Ready(Some(Ok(http_body::Frame::data("here".into()))))
}
} */
}
}
#[tokio::main]
async fn main() {
let (sender, mut receiver) = mpsc::channel::<oneshot::Sender<char>>(1);
tokio::spawn(async move {
while let Some(sender) = receiver.recv().await {
dbg!(sender.send('g'));
}
});
let tcp_listener = tokio::net::TcpListener::bind(
std::net::SocketAddr::from(([127, 0, 0, 1], 17680))
).await.unwrap();
let (stream, _) = tcp_listener.accept().await.unwrap();
let conn = http1::Builder::new().serve_connection(
hyper_util::rt::TokioIo::new(stream),
service_fn(move |_| {
let sender = sender.clone();
async move {
Response::builder()
.status(StatusCode::OK)
.body(MyBody(sender, false))
}
}),
);
conn.await.unwrap();
}
“Cargo.toml”:
[package]
name = "package"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.10.1"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "sync"] }
http = "1.3.1"
http-body = "1.0.1"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }