Context:
I'm trying to develop an minimal reproducible example of some Hyper/tokio/futures hang behavior.
So far I can reproduce the hang using Criterion to generate multiple samples/runs, but can only produce the hang say 10 out of 100 samples. A key feature of the hang (and is present in earlier reports) requires the components to be under high req/sec load.
I'm trying to generate a very high req/sec count for a trivial hello world. However, I am struggling to saturate the CPU's on my desktop.
The current performance profile of a long run looks like this:
Approximately 5-10% of the CPU (blue) is idle. Green is memory and network is the remaining panel.
Some print statements ("Run Stream. Thread: 1"
in the code below) and the time interval between them indicate that my use of Tokio task::spawn
and await
results in two streams that run in series, on the same system thread, and not parallel.
Notes:
- My understanding is that currently rust Streams are best described as
ConcurrentIterator
, hence my use ofasync-streams
crate is for the helper macro, and does not alter the situation from using Futures Streams. - I suspected that running within Criterion might be causing issues, so I have extracted the code reported here outside of Criterion.
- I have tried running the streams within a Tokio local set - in case that aided Tokio deciding to spawn each task in a separate thread. No change.
- I have tried to setup system threads, pass the Tokio runtime in and then spawn a Tokio task - but got horrible tangled, and would not say I ever got that approach to work
What would be the idiomatic Tokio way to run these two tasks in parallel:
async fn capacity(mut client: std::sync::Arc<Client<String>>) {
let benchmark_start = tokio::time::Instant::now();
let t1 = run_stream(client.clone());
let t2 = run_stream(client.clone());
t1.await;
t2.await;
let elapsed = benchmark_start.elapsed().as_micros() as f64;
println!(
"Throughput: {:.1} request/s [{} in {}]",
1000000.0 * 2. * client.nrequests as f64 / elapsed,
client.nrequests, elapsed
);
}
where:
async fn run_stream<'client>(client: std::sync::Arc<Client<String>>) -> () {
println!("Run Stream. Thread: {:?}", std::thread::current().id());
let task = tokio::spawn(async move {
let client = client.as_ref();
let stream = make_stream(client);
let mut counted =0;
futures_util::pin_mut!(stream);
while let Some(client) = stream.next().await {
counted += 1;
debug!("Stream next polled. Thread: {:?}", std::thread::current().id());
}
debug!(
"Cumulative requests: {:?}",
counted
);
});
task.await.expect("Streamed requests");
}
fn make_stream<'client>(
client: &'client Client<String>,
) -> impl futures::Stream<Item = usize> + 'client {
let concurrency = client.concurrency;
let stream = async_stream::stream! {
// Allocate each stream to one of the servers started
let it = client.addresses.iter().cycle().take(client.nstreamed).cloned();
let vec = it.collect::<Vec<String>>();
let urls = vec!["http://".to_owned(); client.nstreamed];
let urls = urls.into_iter()
.zip(vec)
.map(|(s,t)|s+&t)
.collect::<Vec<String>>();
let urls = urls.into_iter()
.map(|u|u.parse::<hyper::Uri>().unwrap())
.collect::<Vec<hyper::Uri>>();
for url in urls.iter() {
let query_start = tokio::time::Instant::now();
debug!("URL String: {} URL Parsed: {}", &url, url);
let mut response = client.session.get(url.clone()).await.expect("Hyper response");
let body = hyper::body::to_bytes(response.body_mut()).await.expect("Body");
// let (parts, body) = response.into_parts();
// This is for Surf client use case
//let mut response = session.get("/").await.expect("Surf response");
//let body = response.body_string().await.expect("Surf body");
debug!("\nSTATUS:{:?}\nBODY:\n{:?}", response.status(), body);
yield futures::future::ready(1);
}
};
stream.buffer_unordered(concurrency)
}