I'm dealing with a simple issue in a complex system. I've simplified my code base into a ~50 line program here or here (git clone and cargo run it!):
use std::time::Duration;
use tokio::time::timeout;
use tokio::time::sleep;
use rand::random;
async fn manage_lag(data:String) {
// Example function that handles sending data to a webserver
// A similar function is called after hard_process in the async function, but it
dbg!(data);
}
async fn hard_process(data:String) {
// Function that randomly waits 1-5 seconds to simulate issues I'm dealing with.
sleep(Duration::from_secs(random::<u64>() % 5)).await;
println!("{}",data);
println!("Wow that was cool.");
}
#[tokio::main]
async fn main() {
let lots_of_data = vec!["business".to_string(),"numbers1".to_string(),"numbers2".to_string(),"numbers3".to_string(),"is this working?".to_string()];
let mut jobs = Vec::new();
for d in lots_of_data {
let a_data = d.clone();
let new_job = tokio::spawn(async move {
hard_process(a_data);
// This data ends up getting sent via HTTP
// If it takes too long, it never gets sent to this HTTP server (I need it to
// eventually)
});
}
for j in jobs {
match timeout(Duration::from_secs(3), j).await {
Ok(_) => (),
Err(e) => {
println!("Whoa, a job took too long!");
manage_lag(data); // Getting this data here is the issue!!! I need to send the
// data, but I'm not sure how to get it there.
},
}
}
}
TL;DR I need to get data from the given thread that fails for error logging purposes. I was told mutex and arc types would do this, but after 2 hours trying to implement it, I kept failing. I guess I don't fully understand how threads work.
Any recommendations on solving this will be greatly appreciated!
you are never actually collecting the job handles in the jobs Vec. (This is one of the reasons why it's better to use iterators and collect() rather than pushing/inserting in a loop.)
you are never awaiting several async functions.
After fixing these problems, the solution to your original question is easy (since you are cloning each piece of data anyway): just collect into a vec of (job, data) pairs, and iterate over those. Fixed playground.
It's a little hard to say what the best option is without more context, but you can definitely just use an Arc if you want to log what request data has taken too long.
It's important to note that a timeout firing doesn't necessarily mean the remote HTTP server you sent a request to didn't handle the request though.
use rand::random;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
async fn manage_lag(data: &str) {
dbg!(data);
}
async fn hard_process(data: Arc<String>) {
sleep(Duration::from_secs(random::<u64>() % 5)).await;
println!("{}", data);
println!("Wow that was cool.");
}
#[tokio::main]
async fn main() {
let lots_of_data = vec![
"business".to_string(),
"numbers1".to_string(),
"numbers2".to_string(),
"numbers3".to_string(),
"is this working?".to_string(),
]
.into_iter()
// Mapping the data into an `Arc<String>` so we don't have to clone a whole string
.map(|s| Arc::new(s));
let jobs: Vec<_> = lots_of_data
.map(|d| {
let a_data = d.clone();
let new_job = tokio::spawn(async move {
hard_process(a_data).await;
});
// Map the input data to include the job, and return the data as well.
(new_job, d)
})
.collect();
for (j, data) in jobs {
match timeout(Duration::from_secs(3), j).await {
Ok(_) => (),
Err(e) => {
// The request could have succeeded and timed out only on sending a response back.
// So be careful what assumptions you make here
println!("Whoa, a job took too long!");
manage_lag(&data).await;
}
}
}
}
Also worth noting: The way you wrote your timeouts will inevitably mean that later jobs have much longer apparent timeouts than 3 seconds since you await each timeout future in the loop. If you want the timeouts to apply to each job at the point it was started, you need to use something like FuturesUnordered to make sure the timeout futures are all polled immediately after they're created.
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryFutureExt;
use rand::random;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
async fn manage_lag(data: &str) {
dbg!(data);
}
async fn hard_process(data: Arc<String>) {
sleep(Duration::from_secs(random::<u64>() % 5)).await;
println!("{}", data);
println!("Wow that was cool.");
}
#[tokio::main]
async fn main() {
let lots_of_data = vec![
"business".to_string(),
"numbers1".to_string(),
"numbers2".to_string(),
"numbers3".to_string(),
"is this working?".to_string(),
]
.into_iter()
// Mapping the data into an `Arc<String>` so we don't have to clone a whole string
.map(|s| Arc::new(s));
let mut jobs: FuturesUnordered<_> = lots_of_data
.map(|d| {
let a_data = d.clone();
let new_job = tokio::spawn(async move {
hard_process(a_data).await;
});
// Start the timeout here where the job was defined. You could also do it inside the spawn, though the difference will be minor most of the time
timeout(Duration::from_secs(3), new_job).map_err(|_| d)
})
// Collect into a `FuturesUnordered`. Every future added to this type is polled so they can complete in any order. We don't have to worry about blocking the timeout for subsequent tasks.
.collect();
// Wait for one of the jobs to finish. This will not necessarily yield the jobs in the order they were in in the original Vec.
while let Some(result) = jobs.next().await {
match result {
Ok(_) => (),
Err(data) => {
// The request could have succeeded and timed out only on sending a response back.
// So be careful what assumptions you make here
println!("Whoa, a job took too long!");
manage_lag(&data).await;
}
}
}
}
If you run both versions several times, you'll see that the first version times out far less than this version, and almost always on the first or second item in the list.
Thank you @H2CO3 and @semicoleon , this has gotten me closer to the solution I think.
I realized I left out a few more pieces of information that might be complicating this a bit more. Here is my current issue:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryFutureExt;
use rand::random;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
#[derive(Debug, Clone)]
struct Service {
ip: String,
port: u32,
}
async fn status(s: Arc<Service>,status:&bool,web_server:&String) {
// Send status to HTTP server (both when it fails and not)
println!("{s:?} is {status} -> Sent to {web_server}!!!");
}
async fn hard_process(s: &Arc<Service>) {
println!("{s:?}");
println!("Checking the given Service.");
sleep(Duration::from_secs(random::<u64>() % 5)).await; // Simulating a complex task I don't want to share
}
#[tokio::main]
async fn main() {
let web_server:String = "10.120.0.80".to_string(); // SIMPLIFIED! I have multiple other types I
// need to pass through the status at
// different points in this system.
let lots_of_data = vec![
Service {
ip:"10.0.0.2".to_string(),
port:443u32,
},
Service {
ip:"192.168.0.3".to_string(),
port:8080u32,
},
].into_iter()
// Mapping the data into an `Arc<Service>` so we don't have to clone a whole string
.map(|s| Arc::new(s));
let mut jobs: FuturesUnordered<_> = lots_of_data
.map(|sss| {
let a_data = sss.clone();
let new_job = tokio::spawn(async move {
hard_process(&a_data).await;
status(a_data,&true,&web_server).await;
});
// Start the timeout here where the job was defined. You could also do it inside the spawn, though the difference will be minor most of the time
timeout(Duration::from_secs(3), new_job).map_err(|_| sss)
})
// Collect into a `FuturesUnordered`. Every future added to this type is polled so they can complete in any order. We don't have to worry about blocking the timeout for subsequent tasks.
.collect();
// Wait for one of the jobs to finish. This will not necessarily yield the jobs in the order they were in in the original Vec.
while let Some(result) = jobs.next().await {
match result {
Ok(_) => (),
Err(data) => {
// The request could have succeeded and timed out only on sending a response back.
// So be careful what assumptions you make here
println!("Whoa, a job took too long!");
status(data,&false,&web_server).await;
}
}
}
}
This is simulating sending the reqwest::Client type to this status function. I need to be able to pass through some other types (like a cookie and port) as well but I simplified it to get the point across.
Maybe this isn't the best way to handle my errors? I should probably restructure a few things. I liked the vector of jobs and data but I feel like the FuturesUnordered fit this project better. Maybe I'm wrong.
You can fix those errors by throwing more Arc's at the problem
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryFutureExt;
use rand::random;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
#[derive(Debug, Clone)]
struct Service {
ip: String,
port: u32,
}
async fn status(s: Arc<Service>, status: &bool, web_server: &String) {
// Send status to HTTP server (both when it fails and not)
println!("{s:?} is {status} -> Sent to {web_server}!!!");
}
async fn hard_process(s: &Arc<Service>) {
println!("{s:?}");
println!("Checking the given Service.");
sleep(Duration::from_secs(random::<u64>() % 5)).await; // Simulating a complex task I don't want to share
}
#[tokio::main]
async fn main() {
let web_server = Arc::new("10.120.0.80".to_string()); // SIMPLIFIED! I have multiple other types I
// need to pass through the status at
// different points in this system.
let lots_of_data = vec![
Service {
ip: "10.0.0.2".to_string(),
port: 443u32,
},
Service {
ip: "192.168.0.3".to_string(),
port: 8080u32,
},
]
.into_iter()
// Mapping the data into an `Arc<Service>` so we don't have to clone a whole string
.map(|s| Arc::new(s));
let mut jobs: FuturesUnordered<_> = lots_of_data
.map(|sss| {
let a_data = sss.clone();
let web_server = web_server.clone();
let new_job = tokio::spawn(async move {
hard_process(&a_data).await;
status(a_data, &true, &web_server).await;
});
// Start the timeout here where the job was defined. You could also do it inside the spawn, though the difference will be minor most of the time
timeout(Duration::from_secs(3), new_job).map_err(|_| sss)
})
// Collect into a `FuturesUnordered`. Every future added to this type is polled so they can complete in any order. We don't have to worry about blocking the timeout for subsequent tasks.
.collect();
// Wait for one of the jobs to finish. This will not necessarily yield the jobs in the order they were in in the original Vec.
while let Some(result) = jobs.next().await {
match result {
Ok(_) => (),
Err(data) => {
// The request could have succeeded and timed out only on sending a response back.
// So be careful what assumptions you make here
println!("Whoa, a job took too long!");
status(data, &false, &web_server).await;
}
}
}
}