Can 'mio' Events be consumed from more than one thread?

Can mio events be consumed from more than one thread?

It appears that mio::net::TcpListener can not be cloned, which is unlike the normal std::net::TcpListener, so I couldn't find a way to give each thread its own independent clone.

Is that not possible?

Also, I tired using Arc<T> to pass a reference to the same mio::net::TcpListener to all threads. This gave an error at runtime. It appears only one Poll can register() a specific TcpListener :weary:

So, having multiple consumer threads is not possible, or is there a way ???

Thank you!


After all, mio::net::TcpListener and mio::Poll implement the Sync trait, so it should be possible to "share" the same instance between multiple threads, right? Just, how to accomplish this?

You can create multiple tcp listeners and bind them to the same port if you set the SO_REUSEPORT option on it. If you create one listener in each of your threads, then the OS will distribute new connections evenly among the listeners for you.

1 Like

You can create multiple tcp listeners and bind them to the same port if you set the SO_REUSEPORT option on it. If you create one listener in each of your threads, then the OS will distribute new connections evenly among the listeners for you.

Any idea how to accomplish this with mio::net::TcpListener though? :thinking:

You might need to use socket2 to set the option on the socket when creating it.

1 Like

Well, I think I found a way:

Clone the std::net::TcpListener, then convert into mio::net::TcpListener :bulb:

use std::io::ErrorKind;
use std::net::{TcpListener, SocketAddr};
use std::thread;
use mio::net::TcpListener as MioListener;
use mio::{Events, Interest, Poll, Token};

const INCOMING: Token = Token(0);

fn main() {
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    let server = TcpListener::bind(addr).expect("Failed to create the server socket!");

    let mut thread_handle = Vec::new();
    for _n in 0..8 {
        let my_server = server.try_clone().expect("Failed to clone the server!");
        my_server.set_nonblocking(true).expect("Failed to set socket non-blocking mode!");

        thread_handle.push(thread::spawn(move || {
            let mut poll = Poll::new().expect("Failed to create the poll!");
            let mut events = Events::with_capacity(128);
            let mut thread_server = MioListener::from_std(my_server);

            poll.registry().register(&mut thread_server, INCOMING, Interest::READABLE).expect("Failed to register event!");

            loop {
                match poll.poll(&mut events, None) {
                    Ok(()) => {
                        for event in events.iter() {
                            match event.token() {
                                INCOMING => {
                                    loop {
                                        match thread_server.accept() {
                                            Ok((_stream, _addr)) => {
                                                println!("Accepted connection! [{:?}]", thread::current().id());
                                            },
                                            Err(error) => {
                                                match error.kind() {
                                                    ErrorKind::WouldBlock => break,
                                                    _ => println!("Error! [{:?}]", thread::current().id()),
                                                }
                                            },
                                        }
                                    }
                                },
                                _ => unreachable!(),
                            }
                        }
                    },
                    Err(err) => println!("Poll error: {}", err),
                };
            }
        }));
    }

    thread_handle.drain(..).for_each(|handle| {
        handle.join().expect("Failed to join the thread!")
    });
}

My understanding is that cloned file descriptors don't work with mio on all platforms, which is why I suggested creating separate sockets per thread.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.