[Future-curl] Dispatch requests from inside resolution of prior request?

I'm attempting to execute an API request which returns an array of json objects. I need to to iterate over each object and fire off a secondary request to finish processing for that object.

I'm looking at the download_rust_lang test as boilerplate: https://github.com/alexcrichton/futures-rs/blob/master/futures-curl/tests/smoke.rs#L17

It looks like the response body needs to be saved into a LoopData during the write_function() callback. Based on the description of write_function, it sounds like this can be called many times as data becomes available and so you have to store it somewhere until the promise is fully resolved.

Given that, I have a few questions:

  • If there are multiple requests in flight at the same time, it seems that I would need to allocate multiple LoopData objects to hold them? Or equivalently, a map keyed by some kind of token so that all the in-flight requests can write their responses? The map doesn't need to be thread-safe since the loop is on one thread, right?

  • Is there any example or guidance about how to actually execute a secondary round of requests from inside the map() of the first promise resolution? I tried to queue them up into a vector of promises... but was unsure of what to do next. Do I collect() that vector and return that? Turn into a stream and for_each()?

I tried something basically like this (many details elided for brevity, but I think it conveys the idea):

let requests = session.perform(req).map(move |(mut resp, err)| {
    // ... deserialize finalized response ...

    let mut futures = Vec::new();
    for c in deserialized {
      // ... queue up secondary request based on response ...
      let mut req = Easy::new();
      req.get(true).unwrap();
      req.url(url).unwrap();
      req.write_function(move |data| {
          // ... save responses
      }).unwrap();
      
      let session_clone = session.clone();
      futures.push(session_clone.perform(req));
    }
    
   // Now what?  collect(futures)?  turn into a stream and for_each?
});

lp.run(requests).unwrap();

I'm sure it's pretty simple, but I've confused myself into a corner :slight_smile:

/cc @alexcrichton

Hello! Hopefully I can help shed some light on this as well.

Perhaps! That's ones one strategy yeah. Right now the curl crate requires the write_function is Send + 'static, so this is relatively limiting unfortunately. One way to do that is a LoopData per requests. You could also share Arc<LoopData> for all the requests and have them all put the response in there. Finally you could also just use Arc<Mutex<T>> if you'd like as well.

For this I'd probably recommend Arc<Mutex<T>> as the synchronization overhead won't be there really (it's all single-threaded). I probably need to make a non-Send easy handle to pair with the Send one!

Certainly! Basically everything you've said here makes sense, but there's some subtle interactions that are important to keep in mind. First, a Stream in general doesn't allow for concurrent processing of each element. The elements are just pulled off the stream one at a time. Similarly the collect combinator in general also doesn't allow this as it only pulls one at a time.

Here, though, this won't actually matter. All the HTTP requests are performed in the "background" as libcurl drives them to completion, and the handle you get, Perform, is just a proxy to that computation happening elsewhere. So for that reason, in this specific case, using either collect or Stream will allow all requests to complete in parallel.

With that in mind, you could do something like:

let requests = session.perform(req).and_then(move |(mut resp, err)| {
    // ... deserialize finalized response ...

    // Perform all requests, collecting to a temporary vector
    // to ensure that they're all spawned concurrently
    let futures = deserialized.into_iter().map(|c| {
        // ... queue up secondary request based on response ...
        let mut req = Easy::new();
        req.get(true).unwrap();
        req.url(url).unwrap();
        req.write_function(move |data| {
            // ... save responses
        }).unwrap();

        session.perform(req)
    }).collect::<Vec<_>>();

    // Use `stream::iter` to convert an iterator to a stream, flatten
    // the stream of futures to a stream of items, and then collect
    // the responses.
    stream::iter(futures).flatten().collect()
});

let responses = lp.run(requests).unwrap();

Ahh, I see. Thanks for the detailed explanation!

Here, though, this won't actually matter. All the HTTP requests are performed in the "background" as libcurl drives them to completion, and the handle you get, Perform, is just a proxy to that computation happening elsewhere. So for that reason, in this specific case, using either collect or Stream will allow all requests to complete in parallel.

Makes sense given how curl works. I'm assuming this is using curls multi_* functionality and basically batches them all up to give to curl to run at once?

If I was doing something non-curl related (just plain futures, futures-mio, etc) and wanted to run things in parallel, is that where the forget() method comes into play? E.g. set it all up then "fire" forget() and let it run to completion in parallel to other futures?

Thanks again, and the futures_* libraries are really great, I'm enjoying learning them, and excited to see what they can do! :slight_smile:

Yes, curl does indeed use the multi interfaces! You can see the Rust bindings here.

Also yes, forget() or otherwise independently spawning a future is a source of concurrency. You can also achieve this through combinators like select, join, or writing your own Future implementation that just polls a bunch of futures.

Great, thanks!