How to share an immutable reference between async tokio tasks executed inside a map function?

I'm trying to use reqwest+tokio to asynchronously download and save an image using the URL. There's a variable called data_directory that contains the location to save the image to and it needs to be an immutable shared state variable. How do I share this immutable reference passed to the function between tokio tasks? (See error message at the end)

This is what I'm trying to run:

pub async fn get_images_parallel(saved: &UserSaved, data_directory: &str) -> Result<(), ReddSaverError> {
    let tasks: Vec<_> = saved
        .data
        .children
        .clone()
        .into_iter()
        // filter out the posts where a URL is present
        // not that this application cannot download URLs linked within the text of the post
        .filter(|item| item.data.url.is_some())
        .filter(|item| {
            let url_unwrapped = item.data.url.as_ref().unwrap();
            // If the URLs end with jpg/png it is assumed to be an image
            url_unwrapped.ends_with("jpg") || url_unwrapped.ends_with("png")
        })
        .map(|item| {
            let directory = data_directory.clone();
            // since the latency for downloading an image from the network is unpredictable
            // we spawn a new async task using tokio for the each of the images to be downloaded
            tokio::spawn(async {
                let url = item.data.url.unwrap();
                let extension = String::from(url.split('.').last().unwrap_or("unknown"));
                let subreddit = item.data.subreddit;
                info!("Downloading image from URL: {}", url);


                let file_name = generate_file_name(&url, &directory, &subreddit, &extension);
                if check_path_present(&file_name) {
                    info!("Image already downloaded. Skipping...");
                } else {
                    let image_bytes = reqwest::get(&url).await?.bytes().await?;
                    let image = match image::load_from_memory(&image_bytes) {
                        Ok(image) => image,
                        Err(_e) => return Err(ReddSaverError::CouldNotCreateImageError),
                    };
                    save_image(&image, &subreddit, &file_name)?;
                    info!("Successfully saved image: {}", file_name);
                }

                Ok::<(), ReddSaverError>(())
            })
        })
        .collect();

    // wait for all the images to be downloaded and saved to disk before exiting the method
    for task in tasks {
        if let Err(e) = task.await? {
            return Err(e);
        }
    }

    Ok(())
}

I tried to use Arc with clone (not shown here) but I'm still encountering the same error.

Error message:

error[E0759]: `data_directory` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/utils.rs:68:53
   |
68 | pub async fn get_images_parallel(saved: &UserSaved, data_directory: &str) -> Result<(), ReddSaverError> {
   |                                                     ^^^^^^^^^^^^^^  ---- this data with an anonymous lifetime `'_`...
   |                                                     |
   |                                                     ...is captured here...
...
87 |             tokio::spawn(async {
   |             ------------ ...and is required to live as long as `'static` here

I'm new to Rust and this is my first time writing an application with it, so any help here would be appreciated. Thanks a lot!

Welcome to rust

When you call this function, it requires that every reference it captures lasts at least as long as 'static. Your compiler is telling you that data_directory is not guaranteed to last that long, and as such, is entirely possible for it to disappear from memory before tokio::spawn finishes it execution. If you're going to use tokio::spawn, make sure all your captured references last 'static. In this case, another way to make it work is to heap-allocate the &str by turning it into a String. You can update the function parameters, or, you can do it inside the function before calling tokio::spawn

e.g.,:

pub async fn get_images_parallel(saved: &UserSaved, data_directory: String)
1 Like

I would recommend using FuturesUnordered for this task.

use futures::stream::{TryStreamExt, FuturesUnordered};

pub async fn get_images_parallel(saved: &UserSaved, data_directory: &str) -> Result<(), ReddSaverError> {
    saved
        .data
        .children
        .clone()
        .into_iter()
        // filter out the posts where a URL is present
        // not that this application cannot download URLs linked within the text of the post
        .filter(|item| item.data.url.is_some())
        .filter(|item| {
            let url_unwrapped = item.data.url.as_ref().unwrap();
            // If the URLs end with jpg/png it is assumed to be an image
            url_unwrapped.ends_with("jpg") || url_unwrapped.ends_with("png")
        })
        .map(|item| {
            let directory = data_directory.clone();
            // since the latency for downloading an image from the network is unpredictable
            // we spawn a new async task using tokio for the each of the images to be downloaded
            async {
                let url = item.data.url.unwrap();
                let extension = String::from(url.split('.').last().unwrap_or("unknown"));
                let subreddit = item.data.subreddit;
                info!("Downloading image from URL: {}", url);


                let file_name = generate_file_name(&url, &directory, &subreddit, &extension);
                if check_path_present(&file_name) {
                    info!("Image already downloaded. Skipping...");
                } else {
                    let image_bytes = reqwest::get(&url).await?.bytes().await?;
                    let image = match image::load_from_memory(&image_bytes) {
                        Ok(image) => image,
                        Err(_e) => return Err(ReddSaverError::CouldNotCreateImageError),
                    };
                    save_image(&image, &subreddit, &file_name)?;
                    info!("Successfully saved image: {}", file_name);
                }

                Ok::<(), ReddSaverError>(())
            }
        })
        .collect::<FuturesUnordered<_>>()
        .try_collect::<()>()
        .await?;
    Ok(())
}

Your code will also be executor-independent this way.

2 Likes

Thanks @Kestrer - really appreciate your help! I was able to get it working with FuturesUnordered,

But, is it possible to get this to work with tokio? I'm using tokio elsewhere and was looking to see if I could get away without adding futures as a dependency.

Almost every async crates, including the tokio itself, relies on the futures crate. You can assume it as a stdlib for the async code.

1 Like

Thanks! I will read about the futures create and how it ties into the async ecosystem.

I'm a maintainer of Tokio. The reason we have not yet added an equivalent to FuturesUnordered in Tokio proper is that it's a complicated datastructure, and everyone already uses the futures crate anyway.

1 Like

Thanks @alice! I'm still learning and trying to understand the Rust ecosystem. Is there any best practice guide or rule of thumb on when to try and implement something using tokio directly/when to use futures?

When it comes to running many things concurrently, I think you should distinguish between a "parallel for loop" like yours, and a separate detached task that handles something. In the former case, you probably want one of futures' utilities, and in the latter case you probably want tokio::spawn.

An example of a "handler" would be the handler of a connection in a web server.

Though this isn't a perfect rule of thumb, and it can sometimes make sense to do one or the other, or even a mix. E.g. if the number of tasks is small and fixed, you probably want tokio::join! over something in futures.

1 Like

In general, you can use either. There are some things to be aware of though:

  • Tokio's IO traits are different to Futures' IO traits. If you're writing a library that uses async I/O, you'll have to make a decision as to which one to support.
  • Futures is runtime independent while many features of Tokio only work on the Tokio runtime, so if you can you should use Futures.
  • Tokio is sometimes better than Futures (e.g. its mutex and channel implementations), but there are also external crates for this like async-lock and async-channel that are also better than Futures if you wish to be certainly runtime independent.
1 Like

When it comes to other things than running things concurrently, if both Tokio and futures has the utility, the one in Tokio is typically best. For example this applies to the select! macro and anything in tokio::sync.

In some cases this is just because the implementation in futures is poor. In other cases, it's because the one in tokio::sync can talk to the Tokio runtime in a way that utilities in futures simply can't. See this blog post for the details.

1 Like

Thanks @Kestrer and @alice for the detailed explanations!