Async requests error when return a tuple

Hi all,

I use this code to request some URLs in asynchronous mode. It inspired from multiple resources found on Internet.

I understand that the 1st part send the request and the 2nd read the response. I use buffered to keep the same order between the two parts.

use log::*;
use futures::{stream, StreamExt};
use std::time::Duration;

// Just a generic Result type to ease error handling for us.
// Errors in multithreaded async contexts needs some extra restrictions.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;



async fn app() -> Result<()> {
    info!("Starting program!");

    let max_parallel_requests = 2;
    let timeout = Duration::from_secs(5);

    let client = reqwest::Client::new();

    let urls = (9000..9010).map(|port| format!("http://127.0.0.1:{}", port));

    let bodies = stream::iter(urls)
        .enumerate()
        .map(|(i, url)| {
            let client = &client;
            async move {
                info!("#{} => Requesting {}", i, url);
                let resp = client.get(url).timeout(timeout).send().await?;
                resp.bytes().await
            }
        })
        .buffered(max_parallel_requests);

    bodies
        .enumerate()
        .for_each(|(i, b)| async move {
            match b {
                Ok(b) => info!("#{} => Got {} bytes", i, b.len()),
                Err(e) => error!("#{} => Got an error: {}", i, e),
            }
        })
        .await;

    Ok(())
}

fn main() {
    env_logger::init();

    let rt = tokio::runtime::Runtime::new().unwrap();
    match rt.block_on(app()) {
        Ok(_) => info!("Done"),
        Err(e) => error!("An error occurred: {}", e),
    };
}

I tried to return a tuple (i, resp.bytes().await) in the map function but there's an error :

the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)

Do you have a explanation of this error ?
Thanks for you help.

Please post full errors.

The code above is working.
The code below does not work :

use futures::{stream, StreamExt};
use log::*;
use std::time::Duration;

// Just a generic Result type to ease error handling for us.
// Errors in multithreaded async contexts needs some extra restrictions.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

async fn app() -> Result<()> {
    info!("Starting program!");

    let max_parallel_requests = 2;
    let timeout = Duration::from_secs(5);

    let client = reqwest::Client::new();

    let urls = (9000..9010).map(|port| format!("http://127.0.0.1:{}", port));

    let bodies = stream::iter(urls)
        .enumerate()
        .map(|(i, url)| {
            let client = &client;
            async move {
                info!("#{} => Requesting {}", i, url);
                let resp = client.get(url).timeout(timeout).send().await?;
                (i, resp.bytes().await)
            }
        })
        .buffered(max_parallel_requests);

    bodies
        .for_each(|(i, b)| async move {
            match b {
                Ok(b) => info!("#{} => Got {} bytes", i, b.len()),
                Err(e) => error!("#{} => Got an error: {}", i, e),
            }
        })
        .await;

    Ok(())
}

fn main() {
    env_logger::init();

    let rt = tokio::runtime::Runtime::new().unwrap();
    match rt.block_on(app()) {
        Ok(_) => info!("Done"),
        Err(e) => error!("An error occurred: {}", e),
    };
}

And the error :

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
  --> src/main.rs:25:28
   |
23 |               async move {
   |  ________________________-
24 | |                 info!("#{} => Requesting {}", i, url);
25 | |                 let resp = client.get(url).timeout(timeout).send().await?;
   | |                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `(usize, std::result::Result<bytes::bytes::Bytes, reqwest::Error>)`
26 | |                 (i, resp.bytes().await)
27 | |             }
   | |_____________- this function should return `Result` or `Option` to accept `?`
   |
   = help: the trait `Try` is not implemented for `(usize, std::result::Result<bytes::bytes::Bytes, reqwest::Error>)`
   = note: required by `from_error`

Ah, I see.

So resp.bytes().await is an Result<Bytes>, and therefore you can use the question mark when returning it.

However (i, resp.bytes().await) is an (usize, Result<Bytes>), and not a result, hence you can't use the question mark.

You could go for this instead:

resp.bytes().await.map(|bytes| (i, bytes))

This uses Result::map to produce an Result<(usize, Bytes)> instead.

1 Like

Thank you Alice !

I update the second part to :

    bodies
        .for_each(|b| async move {
            match b {
                Ok((i, b)) => info!("#{} => Got {} bytes", i, b.len()),
                Err(e) => error!("#? => Got an error: {}", e),
            }
        })
        .await;

So, I can print the index if the result is ok. But I cannot do the same way if there's an error.
Do you any advice ?

You can do this.

async move {
    info!("#{} => Requesting {}", i, url);
    let bytes = async {
        let resp = client.get(url).timeout(timeout).send().await?;
        resp.bytes().await
    }.await;
    (i, bytes)
}

By adding an extra async block, the question mark applies to the inner block, which does return a Result, but it is afterwards wrapped in a tuple.

1 Like

Thank you again. It's perfect !
I'm starting to love this language... Quite hard but fun.

1 Like