Threadpool performance really bad

Hi,

i am new to rust started experimenting with threadpools

i have written a simple tcp client server and client

  1. client sends continuously 4K buffers (when ever socket is writable)
  2. Server reads values and writes back

i am using mic for events

when i don't use any threads i am getting 750MBps on my mac i7 with 1 core fully consumed for server and 1 core for client.

i started experimenting with thread pool on server side so that it can handle more clients.

i tried

  1. create a thread pool with 4 threads

  2. spawn a function running loop

  3. cloned server tcplistener socket

  4. all the threads will be listening on this socket

  5. when ever there is a client one of them picks up and adds to its mio event loop.

    the performance for single client when down to 400MBps and fluctuating very badly,

i must be doing some thing really wrong here. could someone help me what is it.

i tired rayon pool, thread pool, scooped pool.

My CPU utilization went very high, all cores are busy. i have 2 physical cores each with 4 threads (total 8).

Hey! Sorry you're having this trouble. A couple of questions:

  1. Can you share the code here?
  2. Are you compiling with optimizations? (that's cargo build --release or rustc -O)

i am running as cargo run --release, compiled with release as well.

hers is the loop i am doing on each thread.
the main just creates TcpListener.
clones it for each spawn
creates thread pool calls spawn 4 times if 4 threads are in pool.

fn handle_client(server : TcpListener) {
let mut clients: HashMap<Token, TcpStream> = HashMap::new();
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
let mut buf = [0; 4096];

poll.register(&server, SERVER, Ready::readable(), PollOpt::edge())
    .unwrap();

 loop {
    poll.poll(&mut events, None).unwrap();

    for event in events.iter() {
        match event.token() {
            SERVER => {
                let client_con = match server.accept() {
                    Err(e) => {
                        println!("Accept error: {}", e);
                        return;
                    }
                    //Ok(None) => unreachable!("Accept has returned 'None'"),
                    Ok((sock, _)) => sock,
                };
                println!("Adding a client to my list");
                let tok = Token(1);
                clients.insert(tok, client_con);
                poll.register(
                    &clients[&tok],
                    tok,
                    Ready::readable(),
                    PollOpt::level(),
                ).unwrap();
            }
            token => {
                let ready = event.readiness();
                if ready.is_readable() {
                    let mut client = clients.get_mut(&token).expect("Failed to get Client stream");
                    let ret = client.read(&mut buf);
                    match ret {
                        Ok(_) => {
                           wire_results(&mut buf, &mut client)
                        }
                            _ => println!("Problem reading data from client.."),
                    }
                }
            }
        }
    }
}

}

How many client connections do you spawn, is it the same amount for the single thread and thread pool version?

Just 2

Also, if i create multiple threads with

use std::thread;

thread::spawn(move || {
// some work here
});

is there way i can specify that each thread should execute on a new CPU, if one available? it seems all of them are running on a single cpu or 2.

A Mac is probably not the best hardware to run these experiments, but ...

You have a single quadcore CPU with 8 logical cores (hyperthreading). Given the synthetic nature of the benchmark, you should not have more than 4 runnable tasks at a time since hyperthreading is likely to make the results worse here. Are you spawning 2 client threads? Or is there a single client thread sending over 2 connections?

Since you're using edge triggered notifications, your server should keep reading from the client until you hit WouldBlock error type. Right now it looks like you write to stdout (does that trigger btw?) and then loop back around to polling. poll in edge mode will not return until the underlying socket has changes, which is probably ok in this benchmark because the client is constantly sending. In a "real" case, you may end up blocking in poll even though the socket has bytes ready to be read (if you didn't consume them all previously, which is what reading until WouldBlock achieves).

Also, given the server isn't doing anything except echoing the bytes back (right?) adding more threads isn't going to really help - a single core doing nothing but evented socket I/O can probably saturate at 1Gb link. You would need a lot more clients (that don't share execution resources themselves, such as everything running on the local machine) to generate enough load to warrant adding additional pollers.

First thing I am using 10G link, so there is need to have more threads.

I don't understand your argument on edge triggered.

TCPlistener is in edge and client connection is on level, I thought this is best,

Also what is stdio got to do here, o am not seeing flood of messages on console.

Oh, my mistake - I missed the level triggering for client. However, that's an inefficient mode. You should use edge triggering but drain the socket until WouldBlock. Level will keep your selector (i.e. Poll) busy if you don't read everything out during a single iteration, and that might contribute to the high CPU load you're seeing.

As for 10G link, I'm not sure what you're trying to say. Is your real deployment a server with a 10G link? You're testing on a single (weak and unrepresentative of a server) mac, so it's not clear what you're trying to see exactly from that setup.

The stdio comment was when I thought you were using edge triggering and so I wanted to see if you were actually seeing WouldBlock err values. But you're doing level, so ignore that part.

My bad, I should have given more details on setup

I am just testing on mac to see if my code is proper,

Actual numbers on a setup with 10 g link and it's a Linux.

If I convert to edge even on client connection I am getting very low performance.

You wrote originally:

So is that Mac numbers or numbers using your linux + 10g networking? How about we start semi from scratch and you try to describe the setup again :slight_smile:.

So you're polling the client socket with edge and you're seeing low performance? Are you looping until you see WouldBlock when reading out of it? How are you doing the writing part on the server? How are you measuring performance?

As mentioned, it'll be more productive if you provide more information about what you're testing and how.

In general, I think a good design for a server is to have (1) an event loop that just accept()'s client connections and (2) moves those connections to a worker thread. The workers own their connections end-to-end. They have their own event (i/o) loop that does read/write readiness for just those connections.

Ok, will do that,

But you mentioned good design is only one thread accepting and distributing,
Could you please tell me why,

Do we also need load monitor on each threadpool thread so that we don't overload a single thread?

Assuming the connections are load balanced across worker threads (cores) more or less equally, there's no reason for a connection to be serviced in a ping-pong manner across the workers. Keeping it confined to a single workers allows skipping synchronization/locking that may otherwise be needed. If you pin the workers to their own dedicated CPUs, then you can also avoid costly CPU migration (e.g. CPU caches are cold on the CPU the worker migrated to) of the threads. As mentioned, a single CPU that's doing nothing but evented I/O should be able to do several Gb/s of traffic generation (on kernel networking stack). The "trick" to making this work is to prevent the worker from blocking so it can continue servicing the connections.

This is, by the way, roughly the nginx threading/connection handling model. Another open source software that uses a similar model is the seastar C++ library (used by ScyllaDB).

For a real server, yeah you'd want that. Whether you use a threadpool that multiplexes over connections or bind a connection to a specific worker, you'll want to keep (and expose) load statistics.

There must be some lag in my understanding here.

The code only shares TCPListener across multiple threads, when there is a client connection only one of them picks that. So client connection (TCPStream) should only be serviced by one. Is this not happening?

Also this will allow workers to pick clients rather than another thread pushing them through channel.
Assuming less loaded threads picks it mostly.

I think that part is ok in your example code above - I was merely making a general statement. I'm not sure why you associate Token(1) for every connection that's created though. That surely won't work and may indeed cause you to burn CPU since you won't be draining the 2nd connection, and its level triggering will keep firing. Maybe I'm missing something though.

It's just example code for 1 client I have fixed in my local code.

Ok. It's hard to discuss like this. As mentioned, I suggest you come back with full (relevant) code that you're actually running (e.g. the server write path isn't mentioned but it'll play a role in performance), your setup (hardware, how load is generated, etc), how you're measuring, and what numbers you see.

If you're sure you need threads for the new connection handling and I/O part, then you'll likely want to not test something that will be ran on Linux, with MIO on macOS. MIO abstracts away the underlying system libraries that provide async I/O. On Linux that is epoll and on macOS it is kqueue. MIO's public API resembles epoll more than anything, so know that what your testing against will be very different than what will be actually used.

Since you're looking at working on Linux, all info will be specific to it. Unless you know you need a new connection receiver and event loop per core, you shouldn't set it up that way. The whole point of asynchronous I/O is that many things can be done on one thread, because most of your operations are wait operations. Until actual implementation and measurements proves otherwise, you will want your setup to have 2 threads. One a listener, and one for I/O:

fn thread_one() {
    // 1. Wait for new connections
    
    // 2. Pass to event loop on success
}

fn thread_two() {
    // 1. Wait for event loop
    
    // 2. Read / Write data
    
    // 3. Handle I/O errors/disconnects

    // 4. Do stuff with data

    // 5. Re-arm socket
}

The usual part threading comes into play is step 4. Maybe you are making db calls, or doing heavy computation with the data, etc... That is where you want multiple threads to be used, when they are actually needed. The main reason this should be your default setup with Linux, is because your ethernet interrupts are all assigned to one core by default. You will be paying more in context switching and cache misses than what you gain from having multiple cores triggered by interrupts specific to one core. You'll need a lot more configuration of the kernel to support your current architecture plan if that is definitely the path you need to go down.

Your TcpListener shouldn't be setup in either Edge or Level mode. It should be blocking. Not sure if MIO lets you specify a backlog queue to the socket, but if so, use it. All your accepted sockets should be edge triggered by default. Edge Triggered mode, puts you in control of your performance. This means your read section per socket needs to change to this:

fn read(client: &mut TcpStream) -> Result<Vec<u8>, ()> {
    let mut buf = Vec::<u8>::with_capacity(4098);
    let mut tmp_buf = [0u8; 4098];
    loop {
        let r_result = client.read(&mut tmp_buf);
        if r_result.is_err() {
            let e = r_result.unwrap_err();
            if e.kind() == ErrorKind::WouldBlock {
                return Ok(buf);
            } else {
                return Err(());
            }
        }

        let n_read = r_result.unwrap();
        let slice = &tmp_buf[0..n_read];
        buf.extend_from_slice(slice);
    }
}

Trying working your setup into that system and see where that get you. I believe this is also the approach @vitalyd was recommending you move to. I just added some example code to help.

2 Likes