Setting a hyper::Body from a Stream

I'm trying to set a hyper::Body to the results of tokio_postgres stream like this:

    fn stream_ads(&self, req: Request) -> ResponseFuture {
        let connector = OpenSsl::new().unwrap();
        let notifications = Connection::connect(
            self.database_url.clone(),
            TlsMode::Require(Box::new(connector)),
            &self.handle.clone(),
        ).then(|c| c.unwrap().batch_execute("listen ad_update"))
            .map(|c| {
                let mut resp = Response::new()
                    .with_header(ContentType("text/event-stream".parse().unwrap()))
                    .with_header(CacheControl(
                        vec![CacheDirective::NoStore, CacheDirective::Private],
                    ));

                let stream = c.notifications().map(|n| n.payload).map_err(|e| {
                    hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
                });
                resp.set_body(Box::new(stream));
                resp
            })
            .map_err(|(e, _)| {
                hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
            });


        Box::new(notifications)
    }

However I can't seem to make it work, the hyper docs seem to imply that any stream can be a body, but my call to set_body errors out at:

    error[E0277]: the trait bound `hyper::Body: std::convert::From<std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>>` is not satisfied
     --> src/server.rs:297:22
        |
    297 |                 resp.set_body(Box::new(stream));
        |                      ^^^^^^^^ the trait `std::convert::From<std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>>` is not implemented for `hyper::Body`
        |
        = help: the following implementations were found:
              <hyper::Body as std::convert::From<std::borrow::Cow<'static, [u8]>>>
              <hyper::Body as std::convert::From<&'static [u8]>>
              <hyper::Body as std::convert::From<std::string::String>>
              <hyper::Body as std::convert::From<&'static str>>
            and 7 others
        = note: required because of the requirements on the impl of `std::convert::Into<hyper::Body>` for `std::boxed::Box<futures::stream::MapErr<futures::stream::Map<tokio_postgres::Notifications, [closure@src/server.rs:294:52: 294:65]>, [closure@src/server.rs:294:75: 296:18]>>`

Is there something I'm not understanding? Or a simpler way to go about this?

1 Like

I don’t think so - where in the docs does it imply that?

What’s the type of n.payload? You probably have to flatten the stream into a single byte buffer (eg Vec<u8>) and then set that as the body. There are a few From impls for Body (some of them in the error message there) so pick one that makes the most sense.

Hm. I'm confused about the Body Mapping section here because it looks like it should be possible to have a different body type. I can't collect the notifications into a Vec<u8> because I'm trying to use server sent events and a long running connection.

I suppose another way to put this is: is there a way to connect the Hyper body's stream to another type?

Ok, I think I see what you’re getting at.

You need to specify the B type in Response<B> - it defaults to Body if not specified, which is what’s happening in your case. So you probably want to say let resp = Response::<Box<Stream<Item=...>>>::new().with_header(...) or let resp: Response<Box<Stream<...>>> = Response::new()....

Ah ok. I see. Thanks! Rather than trying to change all of by response bodies a box, I noticed that Body allows you to use mpsc with Body::pair() and went with:

    fn stream_ads(&self) -> ResponseFuture {
        let handle = self.handle.clone();
        let notifications = Connection::connect(
            self.database_url.clone(),
            TlsMode::None,
            &self.handle.clone(),
        ).then(|c| c.unwrap().batch_execute("listen ad_update"))
            .map(move |c| {
                let notifications = c.notifications()
                    .map(|n| Ok(Chunk::from(n.payload)))
                    .map_err(|_| unimplemented!());
                let (sender, body) = Body::pair();
                let resp = Response::new()
                    .with_header(ContentType("text/event-stream".parse().unwrap()))
                    .with_header(CacheControl(
                        vec![CacheDirective::NoStore, CacheDirective::Private],
                    ))
                    .with_body(body);
                handle.spawn(sender.send_all(notifications).map(|_| ()).map_err(|_| ()));
                resp
            })
            .map_err(|(e, _)| {
                hyper::Error::Io(StdIoError::new(std::io::ErrorKind::Other, e.description()))
            });

        Box::new(notifications)
    }

I'm a little bit worried about the unimplemented! there, but I couldn't create a SendError due to this issue, and that's the recommended workaround. Are there any other warning bells here that trigger for you?

Ah, I didn't see pair() and with_body() - good catch. It is unfortunate about the SendError but it sounds like that should be resolved at some point; an unimplemented! or panic! stub there is probably ok.