Tokio/Hyper Futures Question


#1

Hey! I’m trying to make a log parser that performs async HTTP requests when it gets a LogResult struct. I’m very new to Rust. I’ve tried a bunch of things. This is where I’m currently at: https://gist.github.com/brigand/f862c01374059e809741251d28760f22

I had everything working until I tried to add the http request. I could do rx.for_each and log the LogResult structs. Now I’m running into issues with the write_record function and the future it needs to return.

From what I’ve read, I’m supposed to return a Box of the future from write_record, but I can’t figure out how to pass it to core.run.

Any help would be appreciated!


#2

I think rx.for_each() is the way to go - this will be the future that you pass to Core, i.e. core.run(rx.for_each(...)). This future resolves when the stream is done (i.e. rx has no more log records to yield).

Inside for_each, you call your write_record function. This function should yield a Future<Item=(), Error=WriteRecordError>. The Core will run this returned future to completion before the next item in the stream is consumed. So, you don’t need to pass that future to Core::run(), you should Core::run() the “root” future, which will be your rx.for_each(), as mentioned above.


#3

Thanks for the response. The issue with for_each is that it runs with no concurrency in this case (or at least how I tried to use it). This compiles and runs, but it’s receive log, send http request, wait for response, receive log, etc.

    let handle = core.handle();
    let f2 = rx.map_err(|e| WriteRecordError::Silent).and_then(move |res| {
        match res {
            Ok(log) => {
                write_record(handle.clone(), log)
            }
            Err(_) => Box::new(::futures::future::err::<(), WriteRecordError>(WriteRecordError::Silent)),
        }
    }).for_each(|_| {
        Ok(())
    });

    core.run(f2);

(Got this functional after posting my question).


#4

Ah, if you want them to run concurrently then you need to build up the hyper chain and then Handle::spawn it; for_each should then just return Ok(()) so that next Item from the stream is pulled as soon as it’s available.


#5

Should also mention that if you’re going with the concurrency approach, you’ll want to make sure the reactor doesn’t stop until all the background futures have completed. One approach would be for the Core to run a joined future, consisting of the rx.for_each() and a futures::unsync::oneshot::Receiver. If your code knows which log record is the last one, it can send() a message on that oneshot channel once its hyper request completes - that will complete the joined future, and the reactor will exit.

Alternatively, you can track the # of in-flight hyper requests out there, and do another iteration of Core::run (after the main one completes) against a future that resolves when this count is 0 (this would be a custom Future impl that you’d write).


#6

I started researching the event loop ending early right before you posted that. This is very helpful, thanks!