Hi, I'm new on Rust, I google so many source and read documents but I can not found basic solution. I'm trying get some api requests as async but I can not get/push results from/to out of foreach. How can I fix this code.
let mut results = vec![];
let _work = &request // request is result of many async hyper client results
.for_each(|b| {
async {
match b {
Ok(b) => {
results.push(b)
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
println!("All: {:?}", results); // I need here, println just for test
And I want to ask, almost all replies for questions like this and similars showed solutions over println. ( I said, I'm new on Rust and I google many times for many errors )
println is not real world solution, for example for my question If I use println in scopes there is no problem. Am I missing something?
I apologize, where do you get request from? I need to know more information about the type that request is to help; given I'm not familiar with the majority of the rust async ecosystem/crates. Also, what specific error are you getting/(perceived) unexpected behaviour are you experiencing?
But I will proceed to continue given the assumption your code is following async_std::stream::Stream::for_each, or at least a very similar api.
What error do you have here? What is a type of b? In general this code should work - when you are finished awaiting your result, the closure should no longer be alive, so the borrow or results should not be held - but you gave as really small context, so it is either me missing something, or your problem is not actually related to your example. What I would ask you is to provide minimal working (failing) example.
What you can actually do with your code is to use try_collect like:
use futures::stream::TryStreamExt;
let results = request.try_collect().await;
However in this code you are actually loosing all benefits of async code - it is in fact just synchronized collection of messages and there is no reason to use any tokio/hyper over some blocking API (which may be much simpler to debug) for this purpose.
Regarding the for_each, it actually depends on which crate they're using to get their for_each from. E.g. the futures crate provides a different version with this signature:
Thanks for your reply, I'm sorry for missing information, here is the minimal working example;
use futures::{stream, StreamExt};
use tokio;
use reqwest::Client;
use hyper::{header, Body, Request, Method, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
async fn get_results(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/get") => {
let addresses = vec!["http://httpbin.org/ip".to_string(),"http://httpbin.org/user-agent".to_string()];
let worker = addresses.len();
let client = Client::new();
let mut results: Vec<String> = vec![];
let request = stream::iter(addresses)
.map(|address| {
let client = &client;
async move {
let resp = client.get(&address).send().await?;
resp.text().await
}
})
.buffer_unordered(worker);
let _work = request
.for_each(|b| {
async {
match b {
Ok(b) => {
results.push(b)
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
println!("All: {:?}", results); // I need here, println just for test
let body = format!("Hello");
Ok(Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "text/html; charset=utf-8").body(body.into()).unwrap())
}
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = ([127, 0, 0, 1], 3000).into();
let server = Server::bind(&addr).serve(make_service_fn(|_| {
async { Ok::<_, hyper::Error>(service_fn(get_results)) }
}));
println!("Listening on http://{}", addr);
let _ret = server.await;
Ok(())
}
Depencies are;
[dependencies]
tokio = { version = "0.2", features = ["full"] }
reqwest = { version = "0.10" }
hyper = "0.13"
futures = "0.3"
Error is:
error: captured variable cannot escape FnMut closure body
--> src\main.rs:29:21
|
28 | .for_each(|b| {
| - inferred to be a FnMut closure
29 | / async {
30 | | match b {
31 | | Ok(b) => {
32 | | results.push(b)
... |
35 | | }
36 | | }
| |_____________________^ returns a reference to a captured variable which escapes the closure body
|
= note: FnMut closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
The issue is that all the closure below does is that because of the async block, it immediately constructs a future, and this future is given a reference to both results and b, then returns this future.
let _work = request
.for_each(|b| {
async {
match b {
Ok(b) => {
results.push(b)
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
The error is caused by the fact that you're now returning something that contains a reference to results, and you're not allowed to do this. Of course the return of b is also an issue (it should have been moved into the future to work), but it didn't get to that problem. You can instead do this:
let _work = request
.for_each(|b| {
match b {
Ok(b) => {
results.push(b)
},
Err(e) => eprintln!("Error: {}", e),
}
async { () }
})
.await;
In this case the value b is immediately pushed into the vector, and a future that does nothing and immedaitely returns () is returned afterwards.
Thanks for your detailed answer @alice , I understand now. Your solutions is very simple and nice, I was thinking about "I need implement some functions, some extra depencies etc. "
The tokio crate also provides a StreamExt trait, which has a collect method. Using this you could also write it like this:
let _work = request
.map(|b| {
match b {
Ok(b) => {
results.push(b);
Ok(())
},
Err(e) => Err(e),
}
// or b.map(|b| { results.push(b); () })
})
.collect()
.await?;
Note that collecting a stream of () results in just a (). Then it uses that you can collect Result<T, E> for any type T you can collect. Thus you can collect Result<(), E>. This will stop on first error, and allow you to pass it along to the caller of get_results instead of just randomly printing it to the screen and ignoring it.
Of course you can also just collect directly into a vector:
let results: Vec<String> = request.collect().await?;