How can I get variables outside of closure

Hi, I'm new on Rust, I google so many source and read documents but I can not found basic solution. I'm trying get some api requests as async but I can not get/push results from/to out of foreach. How can I fix this code.

let mut results = vec![];      
let _work = &request // request is result of many async hyper client results
    .for_each(|b| {
        async {
            match b {
                Ok(b) => {
                    results.push(b)
                },
                Err(e) => eprintln!("Error: {}", e),
            }
        }
    })
    .await;

println!("All: {:?}", results); // I need here, println just for test

And I want to ask, almost all replies for questions like this and similars showed solutions over println. ( I said, I'm new on Rust and I google many times for many errors )

println is not real world solution, for example for my question If I use println in scopes there is no problem. Am I missing something?

Thanks for your help.

I apologize, where do you get request from? I need to know more information about the type that request is to help; given I'm not familiar with the majority of the rust async ecosystem/crates. Also, what specific error are you getting/(perceived) unexpected behaviour are you experiencing?

But I will proceed to continue given the assumption your code is following async_std::stream::Stream::for_each, or at least a very similar api.

The signature is declared as follows:

fn for_each<F>(self, f: F) -> ImplFuture<()>
where 
    Self: Sized,
    F: FnMut(Self::Item);

So, you cannot provide async code inside the closure passed in, since it does not return anything (That includes a Future).

What error do you have here? What is a type of b? In general this code should work - when you are finished awaiting your result, the closure should no longer be alive, so the borrow or results should not be held - but you gave as really small context, so it is either me missing something, or your problem is not actually related to your example. What I would ask you is to provide minimal working (failing) example.

What you can actually do with your code is to use try_collect like:

use futures::stream::TryStreamExt;
let results = request.try_collect().await;

However in this code you are actually loosing all benefits of async code - it is in fact just synchronized collection of messages and there is no reason to use any tokio/hyper over some blocking API (which may be much simpler to debug) for this purpose.

Regarding the for_each, it actually depends on which crate they're using to get their for_each from. E.g. the futures crate provides a different version with this signature:

fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where
    F: FnMut(Self::Item) -> Fut,
    Fut: Future<Output = ()>,

which does allow async code.

1 Like

Thanks for your reply, I'm sorry for missing information, here is the minimal working example;

use futures::{stream, StreamExt};
use tokio;
use reqwest::Client;
use hyper::{header, Body, Request, Method, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};

async fn get_results(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {

    match (req.method(), req.uri().path()) {

        (&Method::GET, "/get") => {

            let addresses = vec!["http://httpbin.org/ip".to_string(),"http://httpbin.org/user-agent".to_string()];
            let worker = addresses.len();
            let client = Client::new();
            let mut results: Vec<String> = vec![];
            let request = stream::iter(addresses)
                .map(|address| {
                    let client = &client;
                    async move {
                        let resp = client.get(&address).send().await?;
                        resp.text().await
                    }
                })
                .buffer_unordered(worker);

            let _work = request
                .for_each(|b| {
                    async {
                        match b {
                            Ok(b) => {
                                results.push(b)
                            },
                            Err(e) => eprintln!("Error: {}", e),
                        }
                    }
                })
                .await;

            println!("All: {:?}", results); // I need here, println just for test
            let body = format!("Hello");

            Ok(Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "text/html; charset=utf-8").body(body.into()).unwrap())

        }
        _ => Ok(Response::builder()
            .status(StatusCode::NOT_FOUND)
            .body(Body::empty())
            .unwrap()),
    }

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

    let addr = ([127, 0, 0, 1], 3000).into();

    let server = Server::bind(&addr).serve(make_service_fn(|_| {
        async { Ok::<_, hyper::Error>(service_fn(get_results)) }
    }));

    println!("Listening on http://{}", addr);

    let _ret = server.await;

    Ok(())
}

Depencies are;

[dependencies]
tokio = { version = "0.2", features = ["full"] }
reqwest = { version = "0.10" }
hyper = "0.13"
futures = "0.3"

Error is:

error: captured variable cannot escape FnMut closure body
--> src\main.rs:29:21
|
28 | .for_each(|b| {
| - inferred to be a FnMut closure
29 | / async {
30 | | match b {
31 | | Ok(b) => {
32 | | results.push(b)
... |
35 | | }
36 | | }
| |_____________________^ returns a reference to a captured variable which escapes the closure body
|
= note: FnMut closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape

The issue is that all the closure below does is that because of the async block, it immediately constructs a future, and this future is given a reference to both results and b, then returns this future.

let _work = request
    .for_each(|b| {
        async {
            match b {
                Ok(b) => {
                    results.push(b)
                },
                Err(e) => eprintln!("Error: {}", e),
            }
        }
    })
    .await;

The error is caused by the fact that you're now returning something that contains a reference to results, and you're not allowed to do this. Of course the return of b is also an issue (it should have been moved into the future to work), but it didn't get to that problem. You can instead do this:

let _work = request
    .for_each(|b| {
        match b {
            Ok(b) => {
                results.push(b)
            },
            Err(e) => eprintln!("Error: {}", e),
        }
        async { () }
    })
    .await;

In this case the value b is immediately pushed into the vector, and a future that does nothing and immedaitely returns () is returned afterwards.

3 Likes

Thanks for your detailed answer @alice , I understand now. Your solutions is very simple and nice, I was thinking about "I need implement some functions, some extra depencies etc. :slight_smile:"

And thanks to @hashedone, @OptimisticPeach, You all put in something to my rust knowledge.

The tokio crate also provides a StreamExt trait, which has a collect method. Using this you could also write it like this:

let _work = request
    .map(|b| {
        match b {
            Ok(b) => {
                results.push(b);
                Ok(())
            },
            Err(e) => Err(e),
        }
        // or b.map(|b| { results.push(b); () })
    })
    .collect()
    .await?;

Note that collecting a stream of () results in just a (). Then it uses that you can collect Result<T, E> for any type T you can collect. Thus you can collect Result<(), E>. This will stop on first error, and allow you to pass it along to the caller of get_results instead of just randomly printing it to the screen and ignoring it.

Of course you can also just collect directly into a vector:

let results: Vec<String> = request.collect().await?;
1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.