Alright, now that we have a full example, I can show you how to fix it. The basic issue is that you're using the ready!
macro as if it was await, but it's not. It's a lot more similar to ?
. Let me expand the macro for you:
impl Stream for TodoStream {
type Item = Value;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let uri = format!("{}/{}", URI, self.item);
let mut fut = CLIENT.get(&uri).send();
let res = match fut.poll_unpin(cx) {
Poll::Ready(t) => t.unwrap(),
Poll::Pending => return Poll::Pending,
};
let res: Value = match res.json().poll_unpin(cx) {
Poll::Ready(t) => t.unwrap(),
Poll::Pending => return Poll::Pending,
};
Poll::Ready(Some(res))
}
}
So what happens when you poll this steam? Well first it initializes a connection to some url, immediately polls this connection, and naturally this connection is not yet ready as you just created it, so it returns pending. But since you now return from the function, the future you just created is dropped and the connection is canceled.
If you try to poll it again, it will reestablish a new connection and immediately cancel it again. To get around this you have to store the futures in the stream type and repeatedly call poll on them. Here's one way to do that:
use futures::stream::{Stream, StreamExt};
use futures::future::{FutureExt, BoxFuture};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;
use serde_json::{json, Value};
use lazy_static::lazy_static;
use reqwest::{Response, Error};
lazy_static! {
static ref CLIENT: reqwest::Client = reqwest::Client::new();
}
static URI: &str = "https://jsonplaceholder.typicode.com/todos";
#[tokio::main]
async fn main() {
let mut todos = TodoStream::new(1);
while let Some(res) = todos.next().await {
dbg!(res);
}
}
struct TodoStream {
state: State,
}
impl TodoStream {
pub fn new(item: u64) -> Self {
TodoStream {
state: State::NotStarted(item),
}
}
}
enum State {
NotStarted(u64),
Connecting(BoxFuture<'static, Result<Response, Error>>),
ToJson(BoxFuture<'static, Result<Value, Error>>),
Done(),
}
impl Stream for TodoStream {
type Item = Result<Value, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = Pin::as_mut(&mut self); // remove the pin
match &mut this.state {
State::NotStarted(item) => {
let uri = format!("{}/{}", URI, item);
let fut = CLIENT.get(&uri).send();
this.state = State::Connecting(Box::pin(fut));
self.poll_next(cx)
},
State::Connecting(fut) => match Pin::new(fut).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => match result {
Ok(response) => {
let fut = response.json();
this.state = State::ToJson(Box::pin(fut));
self.poll_next(cx)
},
Err(err) => {
this.state = State::Done();
Poll::Ready(Some(Err(err)))
},
},
},
State::ToJson(fut) => match Pin::new(fut).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(json_result) => {
this.state = State::Done();
match json_result {
Ok(json) => Poll::Ready(Some(Ok(json))),
Err(err) => Poll::Ready(Some(Err(err))),
}
},
},
State::Done() => Poll::Ready(None),
}
}
}
So the conclusion is that you probably want to change your code to use async functions instead of manually implementing stream or future.