Can mio events be consumed from more than one thread?
It appears that mio::net::TcpListener can not be cloned, which is unlike the normal std::net::TcpListener, so I couldn't find a way to give each thread its own independent clone.
Is that not possible?
Also, I tired using Arc<T> to pass a reference to the samemio::net::TcpListener to all threads. This gave an error at runtime. It appears only onePoll can register() a specific TcpListener
So, having multiple consumer threads is not possible, or is there a way ???
Thank you!
After all, mio::net::TcpListener and mio::Poll implement the Sync trait, so it should be possible to "share" the same instance between multiple threads, right? Just, how to accomplish this?
You can create multiple tcp listeners and bind them to the same port if you set the SO_REUSEPORT option on it. If you create one listener in each of your threads, then the OS will distribute new connections evenly among the listeners for you.
You can create multiple tcp listeners and bind them to the same port if you set the SO_REUSEPORT option on it. If you create one listener in each of your threads, then the OS will distribute new connections evenly among the listeners for you.
Any idea how to accomplish this with mio::net::TcpListener though?
Clone the std::net::TcpListener, then convert into mio::net::TcpListener
use std::io::ErrorKind;
use std::net::{TcpListener, SocketAddr};
use std::thread;
use mio::net::TcpListener as MioListener;
use mio::{Events, Interest, Poll, Token};
const INCOMING: Token = Token(0);
fn main() {
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let server = TcpListener::bind(addr).expect("Failed to create the server socket!");
let mut thread_handle = Vec::new();
for _n in 0..8 {
let my_server = server.try_clone().expect("Failed to clone the server!");
my_server.set_nonblocking(true).expect("Failed to set socket non-blocking mode!");
thread_handle.push(thread::spawn(move || {
let mut poll = Poll::new().expect("Failed to create the poll!");
let mut events = Events::with_capacity(128);
let mut thread_server = MioListener::from_std(my_server);
poll.registry().register(&mut thread_server, INCOMING, Interest::READABLE).expect("Failed to register event!");
loop {
match poll.poll(&mut events, None) {
Ok(()) => {
for event in events.iter() {
match event.token() {
INCOMING => {
loop {
match thread_server.accept() {
Ok((_stream, _addr)) => {
println!("Accepted connection! [{:?}]", thread::current().id());
},
Err(error) => {
match error.kind() {
ErrorKind::WouldBlock => break,
_ => println!("Error! [{:?}]", thread::current().id()),
}
},
}
}
},
_ => unreachable!(),
}
}
},
Err(err) => println!("Poll error: {}", err),
};
}
}));
}
thread_handle.drain(..).for_each(|handle| {
handle.join().expect("Failed to join the thread!")
});
}