Using async await with Streams

I'm trying out async await on the latest nightly compiler using the latest Tokio alpha and the latest futures-preview so I can have backwards compatibility with Futures 0.1.

However, even though I've imported the trait for backwards compatibility with Stream, it still yells at me when I call concat2 saying I need to import futures::stream::Stream. I've added that but cargo just tells me it's an unused import and then says I should import it.

Any thoughts on what is going on?

Also, as a side question, is it fairly trivial to use Futures / async await in conjunction with Rayon?

Thank you

can you please show us some of your code?

Sure @nologik! Here are the relevant parts:

[dependencies]

...

tokio = { version = "=0.2.0-alpha.2" }

futures01 = { package = "futures", version = "0.1"}

[dependencies.futures-preview]

version = "=0.3.0-alpha.18"

features = ["async-await", "nightly", "compat"]

[dependencies.reqwest]

version = "0.9.20"

[dependencies.rusoto_core]

version = "0.40.0"

[dependencies.rusoto_s3]

version = "0.40.0"

src/bin/test.rs:


use my_async_test::*;

#[tokio::main]
async main() {

...

    MyAsyncTest.run().await

}

src/lib.rs defines MyAsyncTest.run(), as async, and further down the async/await chain I have two functions that are actually awaiting Futures/Streams from reqwest and rusoto.

The rusoto function is:


    pub async fn get_object(&self, key: &str) -> RawFile {

        let get_req = GetObjectRequest {

            bucket: self.0.to_string(),

            key: key.to_string(),

            ..Default::default()

        };

        let body = S3_CLIENT.get_object(get_req)

            .compat()

            .await

            .expect("Couldn't send GetObjectRequest.")

            .body

            .expect("Couldn't parse the S3 response body.")

            .concat2()

            .await

            .expect("Couldn't concat chunks of response.");

        RawFile {

            key: format!("{}/{}", self.0, key),

            buf: body.to_vec()

        }

    }

}

compilation error:


error[E0277]: the trait bound `futures::stream::concat::Concat2<rusoto_core::stream::ByteStream>: core::future::future::Future` is not satisfied

   --> src/s3.rs:183:20

    |

183 |           let body = S3_CLIENT.get_object(get_req)

    |  ____________________^

184 | |             .compat()

185 | |             .await

186 | |             .expect("Couldn't send GetObjectRequest.")

...   |

189 | |             .concat2()

190 | |             .await

    | |__________________^ the trait `core::future::future::Future` is not implemented for `futures::stream::concat::Concat2<rusoto_core::stream::ByteStream>`

    |

    = note: required by `std::future::poll_with_tls_context`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

relevant imports i'm making in src/[s3.rs](http://s3.rs/):


use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures01::{future::Future, stream::Stream};

Is this the function you are calling?

As far as I can tell, that returns a future, not a stream.

Sorry, I misread the compiler error. When it says it does not implement Future, it's almost always mixing up 0.1 and 0.3. The error message does not allow seeing which one it needs, so you just have to recheck your types on each step.

Ok, so this is it. Concat2 is from futures 0.1, but you try to await it...

Okay, now after I call compat() on that and await it, it compiled, but at runtime I get:

thread 'main' panicked at 'Couldn't send GetObjectRequest.: HttpDispatch(HttpDispatchError { message: "executor failed to spawn task: tokio::spawn failed (is a tokio runtime running this future?)" })', src/libcore/result.rs:1084:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
[Finished running. Exit status: 101]

which is interesting because that is the exact error I'm also getting from reqwest after converting it to async.

1 Like

Are the libraries you are using doing tokio::spawn whilst depending on tokio 0.1?

I imagine that's what's happening and there is no tokio 0.1 runtime running since you use tokio 0.3.

I don't know the tokio runtime well enough to suggest an answer here.

Have you confirmed you dont have several versions of the same library in use?

How can I check? Are you referring to my custom lib.rs or the crates I'm using?

Try deleting target and rebuilding it and checking the log.

But which library are you saying I should check for?

The one with the trait. Sometimes the version you depend on is different from the version one of your dependencies depend on.

That is true, I am depending on alphas of both Tokio and Futures. But isn't that what compat and other traits in futures-preview is for?

I may be wrong since I'm relatively new to this stuff, but I am thinking the issue is something with the Tokio Executor used in rusoto and reqwest and that I need the new one since my runtime is the newest Tokio alpha, but I'm not sure how to do that...

Does that seem like a likely explanation?

I think that's what's happening, but I don't know if there's a solution to that.

You could start by turning on backtrace, to see which instruction causes the panic. If it's in your code, you can solve it, if it's in one of your dependencies, I'm not sure...

You could have a look to see if the git repo of those libraries has already updated to async/await and latest tokio. If not, you could try to check the issue tracker of those libraries or tokio. And ask there if nobody else raised the question.

I think eventually the libraries have to update to latest tokio. In the mean time I don't know if it's possible to use them with a tokio 0.2 runtime, but that's the thing, tokio is a bit special with that. If the libraries return futures and streams, you can use compat, await and eventually spawn them, but if they use tokio::spawn, they really require that the right tokio runtime is running.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.