[SOLVED] Hyper and Tokio-core, I'm lost

Hey there!

I know Tokio-core is deprecated in favour of Tokio, but I'm working with Hyper 0.11.24. I would probably face the same issue with Tokio because I think I'm missing a core concept.

I can't make this code work. I want to download files in parallel using a throttle.

let future = lazy(|| {
    urls.iter().for_each(|url| {
        println!("Running the future"); // This is printed
        let fut = client
            .future_get(url.parse().unwrap()) // This returns a hyper::client::FutureResponse
            .and_then(|response| {
                println!("Got response!"); // This is never printed
                response.body().concat2()
            })
            .and_then(|_content| {
                println!("Got content!");
                Ok(())
            })
            .map_err(|_| ());

        handle.spawn(fut); // handle from the same tokio::reactor::Core I built my hyper client with
        thread::sleep(THROTTLE);

    });
    Ok(())
}).map_err(|_: ()| ());

core.run(future).unwrap(); // same tokio::reactor::Core I built my hyper client with

I do build the requests and spawn them on the executor, but it seems the hyper FutureResponse future is never executed. So first question is: what am I doing wrong here? I feel I'm missing a very important part of the puzzle.

Then, I also need to write the resulting bytes (_content) in the same order as the request are made in a file. I did not illustrate this here, but I'm not sure what would be the best approach. Any idea how I could achieve this?

Once spawned on the event loop, requests become separate tasks with no connection to the original future, which resolves as soon as all requests are spawned. To resolve the requests, the event loop must continue turning. Put the following line after core.run(...):

loop { core.turn(None) }

You should see the content arriving.

Now, to actually achieve throttling, the code would almost certainly have to be rearranged (thread::sleep is probably not what you want). Serializing output means going through logic which can track finished URLs and iterate them in list order.

1 Like

A few comments in addition to what @inejge said:

  1. You never want to sleep inside a future - this blocks the reactor and it’s not able to do any work at all while sleeping. In the new (reformed) tokio, you may end up blocking a threadpool thread, which is less damaging but is still not the best way to handle this. Consider using the loop_fn function to loop over the urls and spawn the hyper request. The body of the loop should return a Timeout future that resolves after THROTTLE period, and then the next iteration of the loop proceeds.
  2. Turning the reactor in a loop will never return control back to you even after all the urls have been fetched. Instead, create a oneshot channel; the core will wait on the receiver side of it, and the sender side will be given to the loop_fn. After all the urls have been processed, you send a message (can be just ()) on it, which will make the core exit the run() method.
1 Like

Thank you this is very helpful!

I feel like I'm getting closer but still can't make it too work.

Where should I use the oneshot channel you mentioned? And what if I want to have a list of receivers, ordered in the same way as the list of urls, so I can wait on the data one by one and append it to a file outside the loop_fn?

Also, it seems the Timeout callback gets called right away and that the throttle duration is not respected. Is this normal?

And the hyper requests are still not performed, it only builds FutureResponses.

Thank you and sorry for my beginner questions, it's not as trivial as I would have thought to do a parallel async I/O download :smile:

let future_loop = loop_fn(urls, |mut remaining_urls| {
    let url = remaining_urls.pop().unwrap();
    let request_future =
        client
          .future_get(url.parse().unwrap())
          .and_then(|response| {
              println!("Got response!"); // never printed
              response.body().concat2()
          })
          .and_then(|_| {
              println!("Got content!"); // never printed
              Ok(())
          })
          .map_err(|_| ());

    handle.spawn(request_future);

    Timeout::new(throttle, &handle).and_then(|_| {
        if remaining_urls.is_empty() {
            Ok(Loop::Break(()))
        } else {
            Ok(Loop::Continue(remaining_urls))
        }
    })
});

core.run(future_loop).unwrap()

So looking at your overall requirements, particularly the order of the responses, you can do the following instead:

let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);

let mut futures = Vec::new();
for (idx, url) in ["http://www.google.com", "http://www.bing.com"].iter().enumerate() {
     let url_fut = client.get(url.parse().unwrap()).and_then(|response| {
            println!("Got response!");
            response.body().concat2()
        }).map(move |content| {
            (url, content)
        }).map_err(|_| ());
        let fut = if idx != 0 {
            Either::A(Timeout::new(Duration::from_secs(2 * idx as usize), &handle).unwrap().map_err(|_| ()).then(|_| {
                url_fut
            }))
        } else {
            Either::B(url_fut)
        };
        futures.push(fut);
    }
let stream = futures::stream::futures_ordered(futures).for_each(|(url, content)| {
        println!("future for {} completed with content: {}", url, String::from_utf8_lossy(&content));
        Ok(())
    });
let _ = core.run(stream).unwrap();

println!("All done");

Basically, you create a list of futures, each representing fetching the content of some url. All but the first of these futures have a timeout in front of them to add the throttle. The timeout is computed based on the index of the url, which staggers their submission. You can use some other logic.

This list of futures is given to futures_ordered, which returns a Stream representing the completion of these futures; the stream is ordered by list of futures given, not by their completion.

There may be a better way to do this, but this one is relatively straightforward.

1 Like

Thanks a lot! That seems to be exactly what I need indeed. Using futures_ordered with combination with Timeout is a very neat trick.

Thanks again for taking the time. One thing that made me interested in Rust in the first place is its community. Rustaceans are not only brilliant, they are helpful as well :+1:

1 Like