How to concatenate chunks of hyper body?

Can you help me with this please? The task is to join chunks of returned hyper body, so that I can actually use it. I do not want to trivially echo it back in another stream but actually need to turn it whole to json and further use it.

I have searched high and low to find the futures version of concat but it does not seem to recognise the body as:
hyper::body::body::Body : futures_util::stream::StreamExt

Here is the code and the compilation error:

use futures_util::stream::StreamExt;
use hyper::Client;
use hyper::http::Request;
use hyper_tls::{ HttpsConnector};
use hyper::body::Body;

--> src/main.rs:178:20|
178 | let reply = body.concat().await;
| ^^^^^^ method not found in &mut hyper::body::body::Body
|
= note: the method concat exists but the following trait bounds were not satisfied:
&&mut hyper::body::body::Body : futures_util::stream::StreamExt
&hyper::body::body::Body : futures_util::stream::StreamExt
&mut &mut hyper::body::body::Body : futures_util::stream::StreamExt
&mut hyper::body::body::Body : futures_util::stream::StreamExt
hyper::body::body::Body : futures_util::stream::StreamExt

Also this problem, how to jsonify this, or anything, if it does not have compile time known size?

--> src/main.rs:179:50
|
179 | let price:PriceStruct = serde_json::from_slice(&reply)
| ^^^^^^ doesn't have a size known at compile-time

You can do something like this:

// Body is a stream of chunks of bytes.
let body = response.body_mut();
let mut output = Vec::new();

while let Some(chunk) = body.next().await {
    let bytes = chunk?.into_bytes();
    // Append this chunk to the vector.
    output.extend(&bytes[..]);
}
// Now use serde on the vector of bytes.
let parsed = serde_json::from_slice(&output[..]);

You shouldn't run into issues with unsized types because Vec<u8> is sized. The unknown-size part is stored on the heap and dynamically resized.

If you still have issues with unknown sizes, please post the definition of PriceStruct.

1 Like

Regarding as to why it fails to find the StreamExt type, which version of futures_util are you using? On the top of the hyper documentation, you can hover the crate name and see some more information, including which crates hyper depends on. In this case it lists that it uses futures_util version 0.3.0-alpha.18.

Yes, that is the one I am using, the last one that does not conflict with everything else.
I have in my Cargo.toml:

futures-preview = { version = "0.3.0-alpha.18", features = ["nightly","async-await"] }
futures-util-preview = { version = "0.3.0-alpha.18", features = ["nightly","async-await"] }

PriceStruct is nested with vecs in it.

Thank you so much for the "extend" suggestion, my list of compilation errors is thereby diminished!
It seems such a simple thing if you know the right function, I have been searching and trying all sorts of things, as above. If there was a simple example anywhere for this common task, it could have saved me a lot of useless effort. I thought it would require some special function tied up with all this async stuff.

See the documentation on impl Stream for Body:

Unstable

This function requires enabling the unstable-stream feature in your Cargo.toml .

(I'm assuming that hyper::rt::Stream == futures_core::stream::Stream, not really sure why it's re-exported like that).

@alice I still get a whole lots of errors about your earlier suggestions for using join!, which I implemented as follows:

fn main() {

task::block_on(async {
let a = gethtprice().await.expect("main:getprice() failed");
let b = areadaccount().await.expect("main:readaccount() failed");
let (dats, acc) = join!(a,b);
trade(&dats,&acc).await.expect("trade failed");
});
}

where gethtprice and areadaccount return structs but join! seems to expect some futures or something, viz:

error[E0277]: the trait bound DatStruct: core::future::future::Future is not satisfied
--> src/main.rs:454:21
|
454 | let (dats, acc) = join!(a,b);
| ^^^^^^^^^^ the trait core::future::future::Future is not implemented for DatStruct
|
= note: required by futures_util::future::maybe_done::MaybeDone
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)

You shouldn't manually await them when using join!. The macro calls await for you.

Please use code blocks for your code like this:

```rust
// your code here
```
1 Like

Thanks to @alice, I was now able to remove the dependence on:
futures-util-preview = { version = "0.3.0-alpha.18", features = ["nightly","async-await"] }
and that blind-alley approach.

Oh wow, removing those awaits and adding .unwraps and I have a clean compilation at last! :grinning:

1 Like

You may find the try_join macro useful. It's just like join, except that it is specially designed to handle fallible computations, e.g.

let a_fut = fallible_1();
let b_fut = fallible_2();
let (a, b) = try_join!(a_fut, b_fut).unwrap(); // or use the ? operator
1 Like