Listening on multiple threads in Warp with Tokio

hey, I'm having a hard time tuning a web server to handle incoming websocket connections as fast as possible, hopefully someone here knows what I'm doing wrong. I'm using Warp's websocket example and when I was not getting the performance I was expecting I tried to bind warp to multiple ports:

    let mut futures = Vec::new();
    for i in 0..30 {
        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3030 + i);
        futures.push(warp::serve(routes.clone()).bind(socket));
    }
    futures::future::join_all(futures).await;

When I adjusted the stress testing client to use multiple ports I got a massive speed improvement (like: opening 50k new connections in 2 seconds instead of 20), which I guess means that Warp is not doing any kind of socket multiplexing.

As I don't want to configure a load balancer in front of the app for now, I tried to recreate the same setup with a single port, but there's virtually no difference in performance:

    let mut handles = Vec::new();
    for i in 1..30 {
        let routes_clone = routes.clone();
        let tcp = TcpBuilder::new_v4().unwrap();
        let listener = tcp.reuse_address(true).unwrap()
            .reuse_port(true).unwrap()
            .bind("0.0.0.0:3030").unwrap()
            .listen(1000).unwrap();        
            
        let handle = tokio::task::spawn(async move { 
            let mut listener = TcpListener::from_std(listener).unwrap();
            let stream = listener.incoming();
            warp::serve(routes_clone.clone()).run_incoming(stream).await
        });
        handles.push(handle);
    }
    futures::future::join_all(handles).await;

What am I doing wrong? I'm guessing I should spawn those tasks handling incoming connections on different threads (ie. force them to each work in its own thread), but I'm not sure if there's a way to ensure those tasks are running on different threads. I tried also block_in_place, but then inside the block_in_place I still need to spawn a task, which I guess ends up on another thread anyway.

Well the first thing I would try is avoid join_all because of its poor performance. Spawn a bunch of tasks. Once you have a vector of JoinHandle, you can just loop through it.

for handle in handles {
    handle.await.expect("Panic in task");
}

Another problem with join_all is that if the task is fallible, the Vec<Result<...>> return type doesn't trigger a #[must_use] warning.

I don't know what you mean with block_in_place. It's generally a bad idea to use it.

Thanks for the reply!

Unfortunately it doesn't change the performance characteristics. Changing the code to spawn separate tasks and do await in a loop has a similar performance of opening 50k connections in about 20s.

A version that listens on multiple ports does the same in under 8s.

Sure, I'm aware, this isn't a production code yet, I was just trying to benchmark it, so I don't care much about failures.

I agree, I was just trying to somehow force tokio to run each of the tasks in a separate thread, but I guess it's irrelevant now as it didn't work anyway.

For reference, this is the code I tried to not use join_all:

    let mut handles = Vec::new();
    for i in 1..12 {
        let routes_clone = routes.clone();
        let handle = tokio::task::spawn(async move {
            let tcp = TcpBuilder::new_v4().unwrap();
            let listener = tcp.reuse_address(true).unwrap()
                .reuse_port(true).unwrap()
                .bind("0.0.0.0:3030").unwrap()
                .listen(10000).unwrap();

                listener.set_nonblocking(true).unwrap();

            let mut listener = TcpListener::from_std(listener).unwrap();
            let stream = listener.incoming();
            warp::serve(routes_clone.clone()).run_incoming(stream).await
        });

        handles.push(handle);
    }
    for handle in handles {
        handle.await.unwrap();
    }
1 Like

I think that I have a way to confirm that the problem is with how I use the tcp socket. When I change the code that uses TcpListener::from_std to bind on multiple ports I see the same worse performance:

    let mut handles = Vec::new();
    let port = 3030;
    for i in 0..12 {
        let routes_clone = routes.clone();
        let handle = tokio::task::spawn(async move {
            let tcp = TcpBuilder::new_v4().unwrap();
            let listener = tcp.reuse_address(true).unwrap()
                .reuse_port(true).unwrap()
                .bind(format!("0.0.0.0:{}", port + i)).unwrap()
                .listen(10000).unwrap();

            listener.set_nonblocking(true).unwrap();

            let mut listener = TcpListener::from_std(listener).unwrap();
            let stream = listener.incoming();
            warp::serve(routes_clone.clone()).run_incoming(stream).await
        });

        handles.push(handle);
    }
    for handle in handles {
        handle.await.unwrap();
    }

Which would indicate that there's something wrong there when compared to the way Warp binds to different ports.

If anyone is interested in running the full example, here is the code: https://github.com/drogus/warp-accept-benchmark

I'm sorry for wasting people's time, but it seems that I just didn't understand the problem thoroughly.

I initially started using multiple ports when testing stuff locally in order to overcome the outbound connection limit per one port. And then I also noticed a big performance improvement, which got me started on this whole investigation. It turns out that the accept rate can also be slower when connecting on a single port on a single machine. During the weekend I've written a tool for stress testing websockets from multiple machines (I know that tools like this exist, but honestly I wasted more time trying to set up Gatling or Tsung to do what I want than I spent writing the tool and at least I learned some new stuff while doing it). When opening connections from a number of servers there's virtually no difference between one and multiple ports.

And btw, with a warmed up server I was able to open 260k connections in less than 10s using the Warp websockets example, which is pretty damn good (it was a compute optimized instance with 96 cores, but still). I think it could go higher than that, but I couldn't open more than 260k connections, because of some limitation on the target server. But I'll be definitely working on the problem some more.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.