Multi-Threaded Sever Event Loop with Mio and Lifetimes

Hello, I wrote a server using Mio with it's poll API. I containerized the two main concepts Server and Connection into structs and built methods off of it. I used a "new" struct method to return a new initialized struct and then have a struct method called "start_loop," which basically is the equivalent of "listen" in any TCP server API and runs an infinite loop.

The problem is the server struct doesn't live long enough to run the start_loop function. I've tried initializing the server as a static value but that didn't work. How can I get this server up and running? I'd also love overall code feedback. I'm a Node, Ruby, Python and Go engineer and still a student of Rust!



use std::collections::HashMap;
use std::{io, thread};
use std::hash::Hash;
use std::io::{Read, Write};
use mio::{Events, Interest, Poll, Token, Registry};
use mio::event::Source;
use std::net::{TcpListener};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicI32, Ordering};
use mio::net::{TcpListener as MioListener, TcpStream};
#[derive()]
pub struct Server {
    token: Token,
    counter: Arc<AtomicI32>,
    poll: Arc<Poll>,
    events: Arc<Events>,
    listener: MioListener,
    worker_count: i32,

    requests: Arc<Mutex<HashMap<Token, Vec<u8>>>>,
    sockets: Arc<Mutex<HashMap<Token, Connection>>>
}

impl Server {
    pub fn new<'a>(listening_url: String, worker_count: i32) -> Self{
        let url: String = listening_url.parse().unwrap();
        let server = TcpListener::bind(url).unwrap();
        server.set_nonblocking(true).unwrap();
        let listener = MioListener::from_std(server);
        Server{
            token: Token(0),
            counter: Arc::new(AtomicI32::new(0)),
            poll: Arc::new(Poll::new().unwrap()),
            events: Arc::new(Events::with_capacity(1024)),
            requests: Arc::new(Mutex::new(HashMap::new())),
            sockets: Arc::new(Mutex::new(HashMap::new())),
            listener,
            worker_count,
        }
    }

    pub fn start_loop(&'static mut self) {
        let mut threads = Vec::new();
        for _n in 1..self.worker_count {
            let thread_handler = thread::spawn(|| {
                for event in self.events.iter() {
                    match event.token() {
                        Token(0) => loop {
                            match self.listener.accept() {
                                Ok((stream, _r)) => {
                                    self.counter.fetch_add(1, Ordering::Relaxed);
                                    let mut connection = Connection::new(stream, self.counter.clone());
                                    connection.register_r(self.poll.registry());
                                    self.sockets.clone().lock().unwrap().insert(self.token.clone(), connection);
                                    self.requests.clone().lock().unwrap().insert(self.token.clone(), Vec::with_capacity(128));
                                },
                                Err(error) => {
                                    match error.kind() {
                                        io::ErrorKind::WouldBlock => break,
                                        _ => println!("Error! [{:?}]", thread::current().id()),
                                    }
                                },
                            }
                        }
                        token if event.is_readable() => loop {
                                let mut mutex = self.sockets.clone();
                                let mut mutex_guard = mutex.lock().unwrap();
                                let mut connection = mutex_guard.get_mut(&token).unwrap();

                                // read request into buffer; returns number of bytes read
                                let bytes_read = connection.read();
                                match bytes_read {
                                    Ok(0) => {
                                        // Successful read of zero bytes means connection is closed
                                        self.sockets.clone().lock().unwrap().remove(&token);
                                        break;
                                    },
                                    Ok(_n) => {
                                        if connection.is_ready() {
                                            connection.register_w(self.poll.clone().registry());
                                        }
                                    }
                                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
                                    Err(e) => panic!("Unexpected error: {}", e)
                                }
                        },

                        token if event.is_writable() => {
                            let status_line = "HTTP/1.1 200 OK";
                            let contents = "<html><head></head><body><h1>Hello, World!</h1></body></html>";
                            let length = contents.len();
                            let response =
                                format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

                            let mut requests_mutex = self.requests.clone();
                            let mut requests_mutex_guard = requests_mutex.lock().unwrap();
                            let requests = requests_mutex_guard.get_mut(&token).unwrap();
                            requests.clear();

                            let mut sockets_mutex = self.sockets.clone();
                            let mut sockets_mutex_guard = sockets_mutex.lock().unwrap();
                            let socket = sockets_mutex_guard.get_mut(&token).unwrap();
                            socket.send_response(response);
                            socket.reregister_r(self.poll.clone().registry());
                        },

                        _ => unreachable!()
                    }
                }
            });

            threads.push(thread_handler);
        }
        for thread in threads {
            thread.join();
        }
    }
}

pub struct Connection {
    socket: TcpStream,
    token: Token,
    buffer: Vec<u8>
}
impl Connection {

    pub fn new(socket: TcpStream, id: Arc<AtomicI32>) -> Connection {
        Connection{
            token: Token(id.load(Ordering::Relaxed) as usize),
            buffer: Vec::with_capacity(1024),
            socket,
        }
    }

    pub fn register_r(&mut self, registry: &Registry) {
        self.socket.register(registry, self.token, Interest::READABLE).unwrap()
    }

    pub fn register_w(&mut self, registry: &Registry) {
        self.socket.register(registry, self.token, Interest::WRITABLE).unwrap()
    }

    pub fn register_rw(&mut self, registry: &Registry) {
        self.socket.register(registry, self.token, Interest::READABLE|Interest::WRITABLE).unwrap();
    }

    pub fn reregister_r(&mut self, registry: &Registry) {
        self.socket.reregister(registry, self.token, Interest::READABLE).unwrap()
    }

    pub fn reregister_w(&mut self, registry: &Registry) {
        self.socket.reregister(registry, self.token, Interest::WRITABLE).unwrap()
    }

    pub fn reregister_rw(&mut self, registry: &Registry) {
        self.socket.reregister(registry, self.token, Interest::READABLE|Interest::WRITABLE).unwrap()
    }

    pub fn write_bit(&mut self, bit: u8) {
        self.buffer.push(bit);
    }

    pub fn read(&mut self) -> std::io::Result<usize> {
        self.socket.read(self.buffer.as_mut_slice())
    }

    pub fn send_response(&mut self, response: String) {
        self.socket.write_all(response.as_bytes()).unwrap()
    }

    pub fn check_for_end_of_request(&mut self) -> bool {
        fn is_double_crnl(window: &[u8]) -> bool {
            window.len() >= 4 &&
                (window[0] == b'\r') &&
                (window[1] == b'\n') &&
                (window[2] == b'\r') &&
                (window[3] == b'\n')
        }

        self.buffer.windows(4)
            .any(is_double_crnl)
    }

    pub fn is_ready(&mut self) -> bool {
        self.check_for_end_of_request()
    }
}


fn main() {
    let mut server = Server::new("0.0.0.0:7878".to_string(), 10);
    server.start_loop()
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.