Tokio spawn streams in parallel (async-streams crate)

I'm trying to develop an minimal reproducible example of some Hyper/tokio/futures hang behavior.
So far I can reproduce the hang using Criterion to generate multiple samples/runs, but can only produce the hang say 10 out of 100 samples. A key feature of the hang (and is present in earlier reports) requires the components to be under high req/sec load.

I'm trying to generate a very high req/sec count for a trivial hello world. However, I am struggling to saturate the CPU's on my desktop.
The current performance profile of a long run looks like this:
Screenshot from 2021-10-10 09-14-31

Approximately 5-10% of the CPU (blue) is idle. Green is memory and network is the remaining panel.
Some print statements ("Run Stream. Thread: 1" in the code below) and the time interval between them indicate that my use of Tokio task::spawn and await results in two streams that run in series, on the same system thread, and not parallel.


  1. My understanding is that currently rust Streams are best described as ConcurrentIterator, hence my use of async-streams crate is for the helper macro, and does not alter the situation from using Futures Streams.
  2. I suspected that running within Criterion might be causing issues, so I have extracted the code reported here outside of Criterion.
  3. I have tried running the streams within a Tokio local set - in case that aided Tokio deciding to spawn each task in a separate thread. No change.
  4. I have tried to setup system threads, pass the Tokio runtime in and then spawn a Tokio task - but got horrible tangled, and would not say I ever got that approach to work

What would be the idiomatic Tokio way to run these two tasks in parallel:

async fn capacity(mut client: std::sync::Arc<Client<String>>) {
    let benchmark_start = tokio::time::Instant::now();
    let t1 = run_stream(client.clone());
    let t2 = run_stream(client.clone());
    let elapsed = benchmark_start.elapsed().as_micros() as f64;
        "Throughput: {:.1} request/s [{} in {}]",
        1000000.0 * 2. * client.nrequests as f64 / elapsed,
        client.nrequests, elapsed


async fn run_stream<'client>(client: std::sync::Arc<Client<String>>) -> () {
    println!("Run Stream. Thread: {:?}", std::thread::current().id());
    let task = tokio::spawn(async move {
    let client = client.as_ref();
    let stream =  make_stream(client);
    let mut counted =0;
    while let Some(client) = {
        counted += 1;
        debug!("Stream next polled. Thread: {:?}", std::thread::current().id());
        "Cumulative requests: {:?}",
    task.await.expect("Streamed requests");

fn make_stream<'client>(
    client: &'client Client<String>,
) -> impl futures::Stream<Item = usize> + 'client {

    let concurrency = client.concurrency;

    let stream = async_stream::stream! {
        // Allocate each stream to one of the servers started
        let it = client.addresses.iter().cycle().take(client.nstreamed).cloned();
        let vec = it.collect::<Vec<String>>();
        let urls = vec!["http://".to_owned(); client.nstreamed];
        let urls = urls.into_iter()
        let urls = urls.into_iter()
        for url in urls.iter() {
            let query_start = tokio::time::Instant::now();
            debug!("URL String: {} URL Parsed: {}", &url, url);
            let mut response = client.session.get(url.clone()).await.expect("Hyper response");
            let body = hyper::body::to_bytes(response.body_mut()).await.expect("Body");
            // let (parts, body) = response.into_parts();
            // This is for Surf client use case
            //let mut response = session.get("/").await.expect("Surf response");
            //let body = response.body_string().await.expect("Surf body");
            debug!("\nSTATUS:{:?}\nBODY:\n{:?}", response.status(), body);

            yield futures::future::ready(1);

You can spawn both, then await both.

let t1 = tokio::spawn(run_stream(client.clone()));
let t2 = tokio::spawn(run_stream(client.clone()));

Why does your run_stream have an unused lifetime?

So that is a spawn in addition to the one already inside run_stream?

Hang-over from one of the prior iterations. Could that be causing the serial execution of t1 and t2?

Oh I didn't even notice the spawn inside the method since you didn't indent its contents. Anyway, as written now your spawned task is immediately awaited, so it doesn't cause any parallelism.

The lifetime is not the cause, no.

Thanks @alice: When I do add the additional tokio::spawn(..), as you show, I do now see additional threads started (whereas before the thread ID was the same):

Run Stream. Thread: ThreadId(9)
Run Stream. Thread: ThreadId(8)
Run Stream. Thread: ThreadId(10)
Run Stream. Thread: ThreadId(11)
Run Stream. Thread: ThreadId(12)
Run Stream. Thread: ThreadId(13)

However I don't see the CPU being saturated.
The profile appears largely as before:
Screenshot from 2021-10-12 19-03-17

I had expected two threads to saturate the CPU if one thread left only 5-10% idle.

In fact as I increase the 'thread' count from 2, 3, 4, 5, 6 I observe a slight decline in total throughput with each additional thread, and a corresponding slight increase in total time that matches the lower throughput.

This makes me think there is a bottleneck else where.
The full "HTTP/1.1 200 .... hello world" response string is stored in a global constant so there is no file reading

That appears to leave memory and network connections as the bottleneck.

This is not a powerful CPU, Intel Celeron(R) CPU G550T @ 2.20GHz × 2:

Is this type of CPU usage profile expected?

I'd appreciate any suggestions to try to fully saturate the CPU.

You had those threads before too! You just weren't using them. Tokio spawns all of its works threads on startup.

I don't know what it takes to saturate your CPU.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.