Poll() not implemented for Future?

The rest of the function isn't relevant to any async stuff at all. I had built a custom ElasticIter struct that implemented Iterator that takes an Elasticsearch query and uses the Elasticsearch Scroll API get all documents of a search and iterate over all of them.

The problem I had was it was using blocking http requests to Elasticsearch. Since reqwest just released the stable async / await support, I updated to that (0.10.0) and changed impl Iterator to impl Stream so it could be non-blocking.

I'm just updating the one place in the impl that made the Elasticsearch http request to fetch the results.

Implementing Stream manually is more difficult than implementing Iterator. How did the blocking version look, and can you post the full impl Stream for MyType block?

Here is a full dummy example that's giving the same issues. I actually found couple issues with it that I can't figure out.

use futures::{future::FutureExt, stream::{Stream, StreamExt}};
use std::pin::Pin;
use std::task::{Context, Poll};
use serde_json::{json, Value};
use lazy_static::lazy_static;

lazy_static! {
    static ref CLIENT: reqwest::Client = reqwest::Client::new();
}

static URI: &str = "https://jsonplaceholder.typicode.com/todos";

#[tokio::main]
async fn main() {

    let mut todos = TodoStream {
	item: 1
    };

    while let Some(res) = todos.next().await {
    	dbg!(res);
    }

}

struct TodoStream {
    item: u64
}

impl Stream for TodoStream {
    type Item = Value;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {

	let uri = format!("{}/{}", URI, self.item);
	let mut fut = CLIENT.get(&uri).send();

	let res = futures::ready!(fut.poll_unpin(cx)).unwrap();
	let res: Value = futures::ready!(res.json().poll_unpin(cx)).unwrap();

	Poll::Ready(Some(res))

    }
}
  1. Even if I comment out the second res and just try propagating the Poll variant with ready! and returning a hardcoded Poll::Ready(Some(json!("test"))), the stream never yields any items, making me think the executor isn't continuing to poll anymore. Is that correct? How can I get it to keep checking if the HTTP response is ready?
  2. If the second res isn't commented out and you run the code as is, you get:
error[E0277]: the trait bound `std::future::GenFuture<[static generator@DefId(49:730 ~ reqwest[e889]::async_impl[0]::response[0]::{{impl}}[0]::json[0]::{{closure}}[0]) 0:reqwest::async_impl::response::Response {reqwest::async_impl::response::Response, reqwest::async_impl::response::Response, impl core::future::future::Future, impl core::future::future::Future, ()}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
  --> src/main.rs:39:46
   |
39 |     let res: Value = futures::ready!(res.json().poll_unpin(cx)).unwrap();
   |                                                 ^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@DefId(49:730 ~ reqwest[e889]::async_impl[0]::response[0]::{{impl}}[0]::json[0]::{{closure}}[0]) 0:reqwest::async_impl::response::Response {reqwest::async_impl::response::Response, reqwest::async_impl::response::Response, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
   |
   = help: the following implementations were found:
             <std::future::GenFuture<T> as std::marker::Unpin>
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `impl core::future::future::Future`

which I really don't understand.

At first I thought it's because reqwest's json() method (reqwest::Response - Rust) returns a generic Result<T> with no trait bounds for T: Unpin, but I tried changing it to let res: Value = futures::ready!(res.json::<Value>().poll_unpin(cx)).unwrap(); to tell Rust it's a Result<Value> so Rust could see that Value is Unpin, but that didn't change anything.

Any thoughts on what I'm doing wrong?

Thank you.

Alright, now that we have a full example, I can show you how to fix it. The basic issue is that you're using the ready! macro as if it was await, but it's not. It's a lot more similar to ?. Let me expand the macro for you:

impl Stream for TodoStream {
    type Item = Value;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {

        let uri = format!("{}/{}", URI, self.item);
        let mut fut = CLIENT.get(&uri).send();

        let res = match fut.poll_unpin(cx) {
            Poll::Ready(t) => t.unwrap(),
            Poll::Pending => return Poll::Pending,
        };
        let res: Value = match res.json().poll_unpin(cx) {
            Poll::Ready(t) => t.unwrap(),
            Poll::Pending => return Poll::Pending,
        };

        Poll::Ready(Some(res))
    }
}

So what happens when you poll this steam? Well first it initializes a connection to some url, immediately polls this connection, and naturally this connection is not yet ready as you just created it, so it returns pending. But since you now return from the function, the future you just created is dropped and the connection is canceled.

If you try to poll it again, it will reestablish a new connection and immediately cancel it again. To get around this you have to store the futures in the stream type and repeatedly call poll on them. Here's one way to do that:

use futures::stream::{Stream, StreamExt};
use futures::future::{FutureExt, BoxFuture};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;
use serde_json::{json, Value};
use lazy_static::lazy_static;
use reqwest::{Response, Error};

lazy_static! {
    static ref CLIENT: reqwest::Client = reqwest::Client::new();
}

static URI: &str = "https://jsonplaceholder.typicode.com/todos";

#[tokio::main]
async fn main() {

    let mut todos = TodoStream::new(1);

    while let Some(res) = todos.next().await {
        dbg!(res);
    }

}

struct TodoStream {
    state: State,
}
impl TodoStream {
    pub fn new(item: u64) -> Self {
        TodoStream {
            state: State::NotStarted(item),
        }
    }
}

enum State {
    NotStarted(u64),
    Connecting(BoxFuture<'static, Result<Response, Error>>),
    ToJson(BoxFuture<'static, Result<Value, Error>>),
    Done(),
}

impl Stream for TodoStream {
    type Item = Result<Value, Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut this = Pin::as_mut(&mut self); // remove the pin
        match &mut this.state {
            State::NotStarted(item) => {
                let uri = format!("{}/{}", URI, item);
                let fut = CLIENT.get(&uri).send();
                this.state = State::Connecting(Box::pin(fut));
                self.poll_next(cx)
            },
            State::Connecting(fut) => match Pin::new(fut).poll(cx) {
                Poll::Pending => Poll::Pending,
                Poll::Ready(result) => match result {
                    Ok(response) => {
                        let fut = response.json();
                        this.state = State::ToJson(Box::pin(fut));
                        self.poll_next(cx)
                    },
                    Err(err) => {
                        this.state = State::Done();
                        Poll::Ready(Some(Err(err)))
                    },
                },
            },
            State::ToJson(fut) => match Pin::new(fut).poll(cx) {
                Poll::Pending => Poll::Pending,
                Poll::Ready(json_result) => {
                    this.state = State::Done();
                    match json_result {
                        Ok(json) => Poll::Ready(Some(Ok(json))),
                        Err(err) => Poll::Ready(Some(Err(err))),
                    }
                },
            },
            State::Done() => Poll::Ready(None),
        }
    }
}

So the conclusion is that you probably want to change your code to use async functions instead of manually implementing stream or future.

3 Likes

Thanks for your example!

Did this behavior change in Tokio since its docs were written? Their docs show using the try_ready! macro which seems like the equivalent of ready!. Isn't their "getting asynchronous" example what I was doing?

https://tokio.rs/docs/futures/streams/

Or does that only work since they're using a loop from a Future to continuously poll the Stream? Would that be a viable alternative for my case too?

Nothing has changed wrt to the macro. It works in that example because it's repeatedly polling the same future by storing it in the type, like I did in my enum. The example I gave can also be written more simply using the macro, but I don't have time to do that right now.

I don't need a loop because I recursively call self when I transition from one state to the next. (So the looping is done with recursion)

Okay thank you!

So making these changes today brought me back to the original issue I'm trying to solve. I'm using reqwest 0.10.0 to make my HTTP requests async. the problem is that since Iterator isn't async, I can't use it in the Iterator anymore, so I now need to use reqwest::blocking. However, when I do that I get:

thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) a
ttempted to block the current thread while the thread is being used to drive asynchronous tasks.'

This sounds to me like I either have to switch back to request 0.9 and just have it block everything, or else choose to not impl Iterator at all and just inline that code in other areas, which doesn't sound like a good idea to me.

Do you have any ideas how I should fix this problem?

The error is caused by you trying to do something blocking inside an async function, which you really should not do as the executor running that async function relies on it giving back control quickly so it can poll other futures in the mean-time.

I understand, but I am not sure how to resolve this issue. I need to block for Iterator but I can't block because I'm using reqwest 0.10.0.

Even if you used an older reqwest that didn't panic when you tried to block, you would still be blocking inside an async-function which is a big no-no.

Sorry, I don't think I explained myself fully. I chose to not implement a custom Stream since it's more complex than I thought, so I'm using async / await in the rest of my program, but just not in the Iterator. I only want to block within the Iterator impl, which isn't async.

If you're referring to using an Iterator with blocking IO in the rest of my async program, I'm not sure how to resolve that issue. Should I just inline all the code and not use any Iterator methods so that I can use async / await everywhere?

You are clearly using the iterator from async code, so you will have to perform the blocking operations somewhere where blocking is acceptable.

Your executor probably has a feature to mark a section of async code as blocking. For tokio this is tokio::task::spawn_blocking, and I'm sure async_std has a similar tool. Alternatively you can spawn a new thread and wait for it to finish using an oneshot channel.

spawn_blocking gives me the same error about running a runtime from within a runtime. I'm guessing reqwest::blocking creates its own runtime which is trying to run within the main one since i am using #[tokio::main].

In that case you could:

  1. Using non-blocking reqwest, obtain a future for the reqwest.
  2. Push it to the tokio runtime using tokio::spawn.
  3. Use a completely normal mpsc channel (or crossbeam) to send the results from the task you just spawned back.
  4. Block on this channel in a spawn_blocking section.

This avoids starting a new runtime by reusing the one you're already inside.

You may prefer tokio::task::block_in_place to spawn_blocking.

Note that even with this design, you may still find other parts of the code is blocked by the operation. In particular I am referring to other things happening in the same task. This happens if you have used futures::future::join to do other things at the same time as your iterator, but will not happen if they are separated by a call to tokio::spawn or tokio::spawn_blocking. In particular this can happen if you're using block_in_place as recommended above instead of spawn_blocking.

Sometimes using block_in_place is much easier, because it can be used from non-async code, and if you are in this situation, you can add an extra tokio::spawn around the whole section that might enter blocking sections to separate any other jobs from the blocking code.

On the other hand, the blocking code above has the advantage that if you wanted to perform multiple requests, you can do them simultaneously by just spawning several tasks.

For cases where implementing Stream is particularly challenging, there's the async-stream crate, which uses macro-based black magic to allow writing Stream implementations with async/await syntax. I've used it with some degree of success in one of my own projects.

futures::stream::unfold also seems like a good fit for this situation. Not as magical as async-stream, but if you're familiar with iterator combinators, it may do a good job of getting you a stream of requests that you can then manipulate to get what you're looking for.

stream::unfold looks awesome, thanks! I actually tried the async-stream stream! macro with partial success, but it had some weird errors like making me set my crate's recursion level to 512 and the maximum type length to 1.3 million or something.

Yeah; if I'd discovered stream::unfold before async-stream, I probably would have used that instead. I might still try to switch over so I can get rid of a dependency and get rustfmt working for that section of the code; it doesn't seem to do much inside macro invocations.

I suspect that stream::poll_fn could also be used in cases like this, but I haven't managed to figure out exactly how yet.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.