Async_stream to buffer_unordered: type annotations needed

Context:
I'm not sure this is even possible, but I got something like this to work for a Hyper client and now I'm trying to do the same but using a Hyper HTTP connection.

I am trying to setup up a Stream, or concurrent iterator if I understand correctly, that uses a Hyper HTTP connection.

I've got to the point where I can generate the stream using the async_stream crate, specifically server_stream below seems to compile.

However I'm having considerable trouble working out how to pass the resulting stream to buffer_unordered(...) in make_server_stream below.

fn server_stream<'a, F: 'a>(
    session: std::sync::Arc<std::sync::Mutex<<F as futures_util::Future>::Output>>,
    count: usize,
) -> impl futures::stream::Stream<Item = F> + 'a
where
    F: futures_util::Future + futures_util::Future<Output = F> + std::fmt::Debug,
{
    stream! {
        for i in 0..count {
            let sc = session.clone();
            let s = std::sync::Arc::try_unwrap(sc).unwrap();
            let s = s.into_inner().unwrap();
            yield s.await;
        }
    }
}

async fn make_server_stream<'a, F: 'a, S: 'a>(
    session: std::sync::Arc<std::sync::Mutex<F>>,
    count: usize,
) -> impl futures_util::Future + futures_util::Stream<Item = S> + 'a
where
    F: futures_util::Future + futures_util::Stream + std::fmt::Debug,
    S: futures_util::Stream + std::iter::Iterator,
{
    let concurrency_limit = 512;
    let sc = session.clone();
    let s = server_stream(sc, count);
    pin_mut!(s); // needed for iteration
    while let Some(f) = s.next().await {
        f.buffer_unordered(concurrency_limit);
    }
}

I won't recount the last few nights trying to get this working, but in case it helps, the cargo check error that was the last straw and brought me here is

error[E0282]: type annotations needed               
   --> regatta/src/calibrate.rs:147:9                                                                   
    |                                       
144 |     let s = server_stream(sc, count);                                                             
    |         - consider giving `s` a type
...                                                           
147 |         f.buffer_unordered(concurrency_limit);
    |         ^ cannot infer type       
    |                                                                      
    = note: type must be known at this point

Appreciate any hints or tips.

It's not immediately obvious what's wrong, but I have various comments on your code:

  1. Calling .clone() on an Arc, then trying to unwrap it with try_unwrap is guaranteed to fail. The try_unwrap method only works if there are no other clones of that Arc, which there are in your case.
  2. The type annotations on F are mostly non-sensical: You say that it's a Future twice, and that the output is itself. It doesn't make sense for a future to produce itself when .awaited.
  3. Putting a future in an Arc/Mutex doesn't make sense.
  4. You require S to be both a Stream and Iterator. Don't do that. Just require one of them.
  5. The make_server_stream function returns something that is both a future, and a stream. Furthermore, the item type of the stream is another stream. This doesn't make any sense either.
  6. The F argument to make_server_stream is required to be both a Future and a Stream, and is in an Arc/Mutex. Neither of those make any sense.

Please try to explain what you are trying to do in words.

3 Likes

Thanks for the feedback and comments - much appreciated. I'll fix those.

Please try to explain what you are trying to do in words.

I'm trying to have connections passed to a Hyper HTTP connection, with the following characteristics/features (some of which are in place):

  • A single Tokio runtime is started (working).
  • A system thread is started per cpu/core, and passed the Tokio RT handle to work/spawn with (working).
  • Setup a Stream for connections to the Hyper HTTP connection (current issue).
  • Use buffer_unordered to control the level of concurrency (current issue).
  • Restrict stream etc to current thread (next).

Constraints:

  • No more system threads created than cpu/core count (ignoring the Tokio RT thread pool)
  • Do not use Tokio spawn more than once in each system thread.
  • Do not use channels.

I have something like the above program working for Hyper client connections.

The Tokio RT will be used for the client and server code (a calibration exercise).
The reason for the constraints on threads/spawns and focus on Streams/current-thread is that I have seen this can improve performance by at least an order of magnitude

Right now, in addition to the mistakes you identified, I think I have the Stream build focused on the wrong object. I think I need to bring in the TCP stream/address pair in to the scope of the stream.
I had initially thought to have two sets of streams. Now, I think not.

I hope I've described what I am trying to do in a way that is understandable?

If you want to limit the number of connections being handled at the same time, I would probably prefer Tokio's Semaphore above the buffer_unordered combinator.

I also don't really understand why you talk about spawning extra threads.

Thanks for the tip. I'll look into that.

I likely have a misunderstanding: Right now my working setup runs using one Hyper server (setup and run per the examples, i.e. not using the HTTP connection 'object'), however it leaves approx 5% of the CPU idle.

I am trying to work out/learn what is the point of congestion (as well as what is performant and what is not).
AFAICT, restricting futures to the current thread has improved throughput in the client case (my benchmark is not tracking latency for the moment).
If I remove the one line that makes the client connection I see approx 5M "psuedo-requests" per second and the CPU is maxed (client futures are run in current thread).

This makes me think the reason the CPU is not maxed out when I allow the client connect to the server is that the server is the bottleneck.

So, I am now trying to improve the server element of the setup.

Of course the other option is to setup the Tokio RT so that it's thread pool is 2*CPU count, and that is on my list of things to explore - however that changes the whole Tokio setup just for a one-off calibration exercise which I really would like to avoid.

Hopefully that does not seem too crazy.

Generally I would probably just try to write an ordinary loop rather than go through the Stream machinery.

This combinator is designed for when you want to aggregate the outputs in some way. In your case it's better to tokio::spawn each connection, with Tokio's Semaphore for concurrency-limiting.

I would not spawn any threads on my own at all, and use only the Tokio thread pool.

If you have only a single async task on each system thread, that completely negates the entire point of async/await. This sounds very counter-productive. I would tokio::spawn once per connection and have many spawned tasks per system thread.

I don't think channels are necessary for this, so that's fine.

1 Like

OK, I'll try that.
I do seem to have a misunderstanding or what Stream etc were.

OK, I will try that. Again my misunderstanding, I had in mind 'complete concurrently' as the aggregation motive.

Point taken. Thanks.

Thanks for the tips. Again my misunderstanding - I was thinking the number of async tasks that would be running (concurrently) on the thread would be the count passed to buffer_unordered(count).

My reason for steering away from generating many async tasks using Tokio spawn, and preferring futures generated by a Stream & buffer_unordered(count) was the performance improvement reported here: Scalable Benchmarking with Rust Streams

If I understand correctly, proper use of the Tokio spawn & Semaphore should perform no worse than Stream & buffer_unordered, and should result in writing more idiomatic code.

Again I appreciate you sharing your insights and experience, I'll report back the end results to aid future fellow travelers.

Yes, spawning new async tasks is fast. It's possible that buffer_unordered can beat it for many very small tasks, but tokio::spawn should work great for handling web server connections.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.