Lifetime errors with Futures

#1

I am writing future heavy code and I am new to rust. Here is what I got so far:

enum TimePass {
    Ready,
    Waiting,
}

struct MongoStore {
    client: Collection,
}

trait StoreWeather {
    fn insert_day(&mut self, raw: Value) -> Result<String, String>;
}

impl StoreWeather for MongoStore {
    fn insert_day(&mut self, raw: Value) -> Result<String, String> {
        let mut day = Document::new();
        day.insert("raw", Bson::from(raw));
        match self.client.insert_one(day, None) {
            Ok(t) => Ok(String::from("mongo: success")),
            Err(e) => Err(String::from("mongo: failed to insert")),
        }
    }
}

struct WeatherStream<S, I>
where
    S: StoreWeather + 'static ,
    I: Iterator + Send + 'static,
{
    uri_iter: I,
    chunk: u64,
    interval: Interval,
    fetch_future: Option<Box<Future<Item = bool, Error = ()> + 'static + Send>>,
    storage_client: Arc<Mutex<Box<S>>>,
    time_pass: TimePass,
}

impl<S, I> WeatherStream<S, I>
where
    S: StoreWeather + 'static,
    I: Iterator<Item = Uri> + Send + 'static,
{
    fn new(uri_iter: I, chunk: u64, duration: Duration, storage_client: S) -> Self {
        Self {
            uri_iter,
            chunk,
            interval: Interval::new_interval(duration),
            fetch_future: None,
            storage_client: Arc::new(Mutex::new(Box::new(storage_client))),
            time_pass: TimePass::Ready,
        }
    }

    //change number of requests to a parameter.
    pub fn fetch_chunk(&self) -> impl Future<Item = bool, Error = ()> {
        let client = build_https_client().unwrap(); //TODO catch error
        let requests: Vec<_> = (0..self.chunk)
            .zip(self.clone().uri_iter)
            .map(move |(_, uri)| {
                client
                    .clone()
                    .get(uri)
                    .map(|res| {
                        println!("Response: {}", res.status());
                        println!("Headers: {:#?}", res.headers());
                    })
                    .map_err(|_| ())
            })
            .collect();

        future::join_all(requests)
            .map(|res: Vec<()>| res.len() as u64 == self.clone().chunk)
            .map_err(|_| ())
    }
}

impl<S, I> Stream for WeatherStream<S, I>
where
    S: StoreWeather + 'static,
    I: Iterator<Item = Uri> + Send + 'static,
{
    type Item = bool;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, ()> {
        loop {
            match self.time_pass {
                TimePass::Ready => {
                    if let Some(_) = self.fetch_future {
                        try_ready!(self.fetch_future.poll().map_err(|_| ()));
                    } else {
                        self.fetch_future = Some(Box::new(self.fetch_chunk()));
                        try_ready!(self.fetch_future.poll().map_err(|_| ()));
                    }
                    self.time_pass = TimePass::Waiting;
                    self.fetch_future = None;
                }
                TimePass::Waiting => {
                    //TODO do a peak on iter to not waste a day of waiting at end.
                    match self.interval.poll() {
                        Ok(Async::Ready(value)) => {
                            self.time_pass = TimePass::Ready;
                            return Ok(Async::Ready(Some(false)));
                        }
                        Ok(Async::NotReady) => return Ok(Async::NotReady),
                        Err(err) => return Err(()),
                    }
                }
            };
        }
    }
}



//TODO create an iterator over Url's. Store interator into weather_stream. Make generic over iterator of type
//Hyper::Uri.

//taken from https://github.com/drager/httper/blob/master/src/client/mod.rs#L260, which was given by the
//author "drager" himself through gitter chat in hyperium/hyper on Jan 15 2019 23:29 Pacific Standard Time

// Build a HTTPS client.
// Returns a Result that contains the client on success.

fn build_https_client() -> Result<
    hyper::client::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
    Box<std::error::Error>,
> {
    let tls_connector = native_tls::TlsConnector::builder().build()?;

    let mut http_connector = hyper::client::HttpConnector::new(4);
    http_connector.enforce_http(false);

    let https_connector = hyper_tls::HttpsConnector::from((http_connector, tls_connector));

    let client = hyper::client::Client::builder().build(https_connector);

    Ok(client)
}

pub fn start_fetch_job() {
    let client = MongoStore {
        client: mongodb::Client::connect("localhost", 27017)
            .expect("Failed to initialize standalone client.")
            .db("weather_data")
            .collection("raw"),
    };

    let uri_iter = fetch_sources::DarkSky::new(
        "https://api.darksky.net/forecast/39d0ddfakedeb86",
        "37.8267,-122.4233",
        Local::now(),
        Local::now() - chrono::Duration::weeks(60),
    );

    let weather_stream =
        WeatherStream::new(uri_iter, 100,std::time::Duration::from_secs(3),  client);
    hyper::rt::run(weather_stream.take(3).for_each(|num| Ok(())));
}

I am receiving lifetime errors:

error: cannot infer an appropriate lifetime
   --> src/fetch/fetch_job.rs:105:18
    |
88  |     pub fn fetch_chunk(&self) -> impl Future<Item = bool, Error = ()> {
    |                                  ------------------------------------ this return type evaluates to the `'static` lifetime...
...
105 |             .map(|res: Vec<()>| res.len() as u64 == self.clone().chunk)
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...but this borrow...
    |
note: ...can't outlive the anonymous lifetime #1 defined on the method body at 88:5
   --> src/fetch/fetch_job.rs:88:5
    |
88  | /     pub fn fetch_chunk(&self) -> impl Future<Item = bool, Error = ()> {
89  | |         let client = build_https_client().unwrap(); //TODO catch error
90  | |         let requests: Vec<_> = (0..self.chunk)
91  | |             .zip(self.clone().uri_iter)
...   |
106 | |             .map_err(|_| ())
107 | |     }
    | |_____^
help: you can add a constraint to the return type to make it last less than `'static` and match the anonymous lifetime #1 defined on the method body at 88:5
    |
88  |     pub fn fetch_chunk(&self) -> impl Future<Item = bool, Error = ()> + '_ {
    |                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I am not sure how to fix this, I have heard about RwLock but never used it before. Is anyone willing to give a couple of hints of how to properly write this code so I will not have this lifetime issues?

0 Likes

#2

What I am trying to accomplish overall, is I need to gather weather data from a weather api, they have a limit for each day. The api limit is the self.chunk and the self.interval is the future that waits a day or longer. The self.uri_iter() iterates through uri’s for the api reqeusts. The reason why I have a .map(|res: Vec<()>| res.len() as u64 == self.clone().chunk) is to determine if the iterator ended, if it did I need to stop the tokio stream. I will also need help on how to properly stop this stream. Currently I just have weather_stream.take(3) but I need this stream to continue until the iterator is finished generating uri’s.

0 Likes

#3

Here is my iterator:

#[derive(Clone,Copy)]
pub struct DarkSky {
    pub base: &'static str,
    pub coordinates: &'static str,
    pub start: DateTime<Local>,
    pub end: DateTime<Local>,
    pub current: DateTime<Local>,
}


impl DarkSky {
    pub fn new(
        base: &str,
        coordinates: &str,
        start: DateTime<Local>,
        end: DateTime<Local>,
    ) -> Self {
        Self {
            base,
            coordinates,
            start,
            end,
            current: start,
        }
    }
}

impl Iterator for DarkSky {
    type Item = hyper::Uri;

    fn next(&mut self) -> Option<Self::Item> {
        let date = self.current - chrono::Duration::days(1);
        if date > self.end {
            None
        } else {
            self.current = date.clone();
            Some(
                format!("{}/{} {}", self.base, self.coordinates, date.timestamp())
                    .parse::<hyper::Uri>()
                    .unwrap(),
            )
        }
    }
}
0 Likes

#4

Again I am not trying to ask how to fix this code, rather how to approach rust Futures properly and how to handle lifetimes. I have read the book and docs but I still am confused with this whole lifetime thing when it comes to situations such as using futures.

0 Likes

#5

You generally can’t use references in closures of futures. That includes &self.

You must use owned or copyable values and move || closures. If you clone data from self, clone it before putting it in any closure.

All that is because futures aren’t executed where you define them. They will be executed later, elsewhere, after your function returns and all its arguments and variables are destroyed.

0 Likes

#6
        let mut lock = Arc::new(self.clone());
        let mut cloned = lock.clone();
        let requests: Vec<_> = (0..self.chunk)
            .zip(cloned.uri_iter)

now I get this

error[E0507]: cannot move out of an `Arc`
  --> src/fetch/fetch_job.rs:95:18
   |
95 |             .zip(cloned.uri_iter)
   |                  ^^^^^^^^^^^^^^^ cannot move out of an `Arc`
0 Likes

#7

Try the following version of fetch_chunk:

pub fn fetch_chunk(&self) -> impl Future<Item = bool, Error = ()> {
        let client = build_https_client().unwrap(); //TODO catch error
        let requests: Vec<_> = (0..self.chunk)
            .zip(self.clone().uri_iter)
            .map(move |(_, uri)| {
                client
                    .get(uri)
                    .map(|res| {
                        println!("Response: {}", res.status());
                        println!("Headers: {:#?}", res.headers());
                    })
                    .map_err(|_| ())
            })
            .collect();

        let chunk = self.chunk;
        future::join_all(requests)
            .map(move |res: Vec<()>| res.len() as u64 == chunk)
            .map_err(|_| ())
    }

I don’t see a need for Arc from the error in your original post.

Also, you’ll get faster and better help if you create a minimal repro on the Rust playground - if some types/crates aren’t available there, just stub them out with something similar.

0 Likes