Setting a hyper::Body from a Stream


#1

I’m trying to set a hyper::Body to the results of tokio_postgres stream like this:

    fn stream_ads(&self, req: Request) -> ResponseFuture {
        let connector = OpenSsl::new().unwrap();
        let notifications = Connection::connect(
            self.database_url.clone(),
            TlsMode::Require(Box::new(connector)),
            &self.handle.clone(),
        ).then(|c| c.unwrap().batch_execute("listen ad_update"))
            .map(|c| {
                let mut resp = Response::new()
                    .with_header(ContentType("text/event-stream".parse().unwrap()))
                    .with_header(CacheControl(
                        vec![CacheDirective::NoStore, CacheDirective::Private],
                    ));

                let stream = c.notifications().map(|n| n.payload).map_err(|e| {
                    hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
                });
                resp.set_body(Box::new(stream));
                resp
            })
            .map_err(|(e, _)| {
                hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
            });


        Box::new(notifications)
    }

However I can’t seem to make it work, the hyper docs seem to imply that any stream can be a body, but my call to set_body errors out at:

    error[E0277]: the trait bound `hyper::Body: std::convert::From<std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>>` is not satisfied
     --> src/server.rs:297:22
        |
    297 |                 resp.set_body(Box::new(stream));
        |                      ^^^^^^^^ the trait `std::convert::From<std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>>` is not implemented for `hyper::Body`
        |
        = help: the following implementations were found:
              <hyper::Body as std::convert::From<std::borrow::Cow<'static, [u8]>>>
              <hyper::Body as std::convert::From<&'static [u8]>>
              <hyper::Body as std::convert::From<std::string::String>>
              <hyper::Body as std::convert::From<&'static str>>
            and 7 others
        = note: required because of the requirements on the impl of `std::convert::Into<hyper::Body>` for `std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>`

Is there something I’m not understanding? Or a simpler way to go about this?


Follow up: Setting a hyper::Body from a Stream
#2

I don’t think so - where in the docs does it imply that?

What’s the type of n.payload? You probably have to flatten the stream into a single byte buffer (eg Vec<u8>) and then set that as the body. There are a few From impls for Body (some of them in the error message there) so pick one that makes the most sense.


#3

Hm. I’m confused about the Body Mapping section here because it looks like it should be possible to have a different body type. I can’t collect the notifications into a Vec<u8> because I’m trying to use server sent events and a long running connection.

I suppose another way to put this is: is there a way to connect the Hyper body’s stream to another type?


#4

Ok, I think I see what you’re getting at.

You need to specify the B type in Response<B> - it defaults to Body if not specified, which is what’s happening in your case. So you probably want to say let resp = Response::<Box<Stream<Item=...>>>::new().with_header(...) or let resp: Response<Box<Stream<...>>> = Response::new()....


#5

Ah ok. I see. Thanks! Rather than trying to change all of by response bodies a box, I noticed that Body allows you to use mpsc with Body::pair() and went with:

    fn stream_ads(&self) -> ResponseFuture {
        let handle = self.handle.clone();
        let notifications = Connection::connect(
            self.database_url.clone(),
            TlsMode::None,
            &self.handle.clone(),
        ).then(|c| c.unwrap().batch_execute("listen ad_update"))
            .map(move |c| {
                let notifications = c.notifications()
                    .map(|n| Ok(Chunk::from(n.payload)))
                    .map_err(|_| unimplemented!());
                let (sender, body) = Body::pair();
                let resp = Response::new()
                    .with_header(ContentType("text/event-stream".parse().unwrap()))
                    .with_header(CacheControl(
                        vec![CacheDirective::NoStore, CacheDirective::Private],
                    ))
                    .with_body(body);
                handle.spawn(sender.send_all(notifications).map(|_| ()).map_err(|_| ()));
                resp
            })
            .map_err(|(e, _)| {
                hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
            });

        Box::new(notifications)
    }

I’m a little bit worried about the unimplemented! there, but I couldn’t create a SendError due to this issue, and that’s the recommended workaround. Are there any other warning bells here that trigger for you?


Follow up: Setting a hyper::Body from a Stream
#6

Ah, I didn’t see pair() and with_body() - good catch. It is unfortunate about the SendError but it sounds like that should be resolved at some point; an unimplemented! or panic! stub there is probably ok.