Multiple requests with pause using Hyper client


#1

Hello, I would like to build a simple application to synchronize my photos with cloud storage.
For that I am going to use storage HTTP API.
I decided to take Hyper to build a HTTP client for uploading.
I would also like to have parallel uploads, say 10 photos at time.

The idea is very simple:

  • There is source path to a photos directory.
  • List that folder and prepare chunks 10 photos each.
  • Upload the 1st chunk, take a little pause, then upload the 2nd one, and so on.

The problem is that I don’t understand how to enforce a pause between chunks.
I have tried to make it but it doesn’t work.
You can find relevant code here.

As far as I understand I couldn’t call tokio::run multiple times, because it blocks the code (at least I couldn’t manage to get it work).
So, do I need to prepare a single future with chunk-sleep-chunk-sleep…?

Could you please help me to figure it out?


#2

You can probably make this work with loop_fn() - each iteration of the loop is an upload future join()'d with a timeout future. You then tokio::run this future returned by loop_fn.


#3

@vitalyd thank you very much for the help!

It looks like I managed to get it worked.
Though I am not sure the code is somewhat you suggested :slight_smile:

let mut chunks = fetch(&dir).into_iter();

let f = futures::future::loop_fn((), move |_| {
    let chunk = chunks.next();
    futures::future::result(chunk.ok_or(()))
        .and_then(|chunk| {
            let timeout = tokio_timer::sleep(Duration::from_secs(10)).map_err(|_| ());
            chunk.join(timeout)
        })
        .and_then(|_| Ok(Loop::Continue(())))
});

rt::run(f);

Did you mean something like that?

I wonder what is a right way to iterate over chunks within loop_fn?


#4

Yeah, that’s sort of the gist of it.

You can move chunks into the loop itself (that’s the initial arg, which you’re using () for at the moment). This value is given to your closure on the first iteration. On subsequent iterations, your closure receives whatever Continue returned. So the idea would be that you keep peeling one chunk off, run a fetch for it (with a timeout attached). If it’s the last chunk, you Loop::Break to signal you’re done. Otherwise, you Loop::Continue with the rest of the chunks.


#5

By the way, do you want to wait a fixed time between consecutive fetches without taking into account how long the upload itself took? In other words, if your pause is 10 seconds and an upload took, say, 12 seconds, do you want to start the next one immediately or wait 10 seconds from the time the previous one finished?

If it’s a fixed schedule (i.e. don’t take completion time into account), you can create all the futures upfront and attaching the corresponding timeout to them based on the schedule.


#6

Yeah, I thought about it. I just couldn’t write it from the first time and I thought that it is not the right way (there were errors about mutable borrowing).
Now I managed to make it like

pub fn call(dir: PathBuf) -> Result<()> {
    let f = future::loop_fn(fetch(&dir), |mut chunks| {
        if let Some(chunk) = chunks.pop() {
            let timeout = tokio_timer::sleep(Duration::from_secs(5)).map_err(|_| ());
            let a = chunk.join(timeout).and_then(|_| Ok(Loop::Continue(chunks)));
            Either::A(a)
        } else {
            Either::B(future::ok(Loop::Break(())))
        }
    });

    rt::run(f);

    Ok(())
}

#7

To be honest I didn’t think about that.
As you can see I am stumbling with more basic things :slight_smile:

I guess fixed schedule would be enough for now.
Though I am not sure where I should attach timeout and to what futures.
I tried to make it this way, but it seems to have another semantics: it just adds 10 seconds to each future and all of them still fully concurrent.


#8

See if this helps on how to do the staggering.


#9

Thanks, it is a really great example for me.
Today I tried to use futures_ordered but I wasn’t sure if it was what I need.
rt::run requires something that implements Future, so I though into_future must be called on stream.