[Tokio/Hyper/Futures] Parallelize Futures with Result

I am downloading an m3u8 stream and it's quite slow with hyper (even slower, than python + requests).
I simply followed the hyper/examples and ended up with the code below. It fetches a chunk each iteration, awaits it and then writes the decrypted data to a fd.

for (media_sequence, segment) in playlist.segments().iter().enumerate() {
    // send request to the server
    let res = client
        .borrow() // borrow from AtomicRefCell
        .get(segment.uri().parse::<Uri>()?)
        .await?;
    // convert the request to a Vec<u8>, that contains the data
    let body = res.into_body().try_concat().await?.to_vec();
    // decrypt the body
    let decrypted_segment = decrypt_segment(
            &body,
            &encryption_key,
            segment.iv(),
            media_sequence
        )?;
    // write to the file:
    fd.write_all(&decrypted_segment)?;
}

Currently everything is done synchronously and I think this loop can be optimized to run on parallel, but I encountered several problems:

  1. How? I couldn't find anything more, than tokio::spawn and this doesn't allow returning and every "?" has to be replaced with .expect("<insert something went wrong text>"), which makes your code very messy and one single https error could crash the entire program!
  2. What's the best way to write to a file? (I tried Arc<AtomicRefCell<File>>, but this ended in total chaos)
  3. Is this even possible with tokio or should I look for another framework?

First thing first: you're actually sending request one by one and do all processing on each loop, before going to next one.

So What would be better is to first create number of requests and makes them to be processed in parallel (consider to use join or select)
Then use blocking from tokio to run processing, if it is CPU intensive.

Do you need to write all results into single file?
I would suggest to store results of all requests into some struct, and then after you're finished serialize struct into file

1 Like

Thanks so much for your answer. I didn't know, that you can use the futures crate with tokio.
I decided to wrap File in a Mutex and pre-calculate the offsets where each chunk belongs in the file, because the files can become quite large and ended up with this code:

// TODO: use tokio::fs::File once https://github.com/tokio-rs/tokio/issues/1356
//       is resolved!
let mut fd = Arc::new(Mutex::new(File::create(&filepath)?));

let progress_bar = Arc::new(Mutex::new(self.pb.clone()));
// the queue will be filled with futures
let mut queue = vec![];

for (media_sequence, segment) in playlist.segments().iter().enumerate() {
    let mpb = progress_bar.clone();
    let mclient = self.client.clone();
    let muri = segment.uri().clone();
    let miv = segment.iv().clone();
    let mchunk_map = chunk_map.clone();
    let chunk_number = media_sequence.clone();
    let mencryption_key = encryption_key.clone();
    let mfd = fd.clone();

    let future = async move {
        // TODO: implement some kind of retry mechanism, in case the request fails
        let res = mclient
            .borrow()
            .get(muri.parse::<Uri>()?)
            .await?;
        {
            mpb.lock().unwrap().inc(res.len() as u64);
        }
        let body = res.into_body().try_concat().await?.to_vec();
        // TODO: might make this async too?
        let decrypted_segment = decrypt_segment(
                &body,
                &mencryption_key,
                miv,
                chunk_number
            )?;

        let result: Result<()> = {
            let mut fd = mfd.lock().unwrap();
            let offset = *mchunk_map.get(&chunk_number).unwrap() as u64;
            fd.seek(SeekFrom::Start(offset))?;
            fd.write_all(&decrypted_segment)?;
            Ok(())
        };
        result
        // let result: Result<()> = Ok(());
        // result
    };
    queue.push(future);
    if queue.len() == 10 {
        future::try_join_all(queue).await?;
        queue = vec![];
    }
}

// await all remaining futures in parallel and return any error,
// that occured in any of them!
future::try_join_all(queue).await?;
self.pb.finish();

Sadly parallelizing the code didn't bring any speed improvements (maybe ~4 seconds faster) (I also tried more parallel tasks, but this only increased the chance of an Ssl-Error or a Server-Error), therefore I think it's some kind of underlying issue with hyper (or my code, I dunno). The io/decrypting isn't an issue either :confused: (I commented it out and benchmarked solely the hyper stuff).

One thing that comes to my eye is that you have a lot of clones which is likely to imply allocations.
I know oyu have to fight borrow checker, but since playlist is some sorta collection I'd suggest to iterate it manually and avoid cloning so that you could move into asyn block.

Another thing is that if you use all that lockings and file access on the same thread as your futures are being executed you're not going to be very concurrent as it will be just blocking other futures.
Since you mentioned tokio issue with blocking I guess you cannot really use it right now.
Then it would be better if you do not write file into your futures, process all results and only after that write it.
Note that from what I gather with async await and tokio you basically use single thread to process all these futures, so as another option you could manually create runtime and spawn these futures on tokio runtime, this way you could use multi-threading to make it actually parallel.

In addition to that you do not need mclient.borrow() if it is hyper client, you can use it as it is without a need for runtime borrows.

1 Like

Most of those clones are values wrapped Arcs, because yes I don't want to fight the borrow-checker. (Futures are especially hard because they require 'static lifetimes and I really don't want to have a &'static mut self in my code).

That was the problem! I just changed the last part to

if queue.len() == 20 {
    for future in queue {
        tokio::spawn(async move {
            future.await.expect("failed to get chunk!");
        });
    }
    //future::try_join_all(queue).await?;
    queue = vec![];
}

and the time got down from ~264 seconds to ~100 :heart:.

Thanks :blush:

async/await should mitigate it as you no longer would need to care about 'static.
It was problem only for futures 0.1
So ideally you could get rid of every unnecessary shared pointer too :slight_smile:

Other than that good luck getting around new futures.