How to share a struct (impl Handler) between multiple threads?

I try to implement a multi-thread web-server. Considering the following code:

pub trait Handler: Sync {
    fn handle_request(&self, request: &Request) -> Response;

    fn handle_bad_request(&self, e: &ParseError) -> Response {
        error!("Failed to parse request: {}", e);
        Response::new(StatusCode::BadRequest, None)
    }
}

pub struct Server {
    addr: IpAddr,
    port: u16,
}

impl Server {
    pub fn new(addr: IpAddr, port: u16) -> Self {
        Self { addr, port }
    }

    pub fn run(self, handler: impl Handler) {
        let socket_addr = SocketAddr::new(self.addr, self.port);
        info!("Listening on {}", socket_addr);
        let listener = TcpListener::bind(socket_addr).unwrap();

        let mut handles = Vec::new();
        for _n in 0..16 {
            if let Ok(thread_listener) = listener.try_clone() {
                handles.push(thread::spawn(|| {
                    loop {
                        match listener.accept() {
                            Ok((stream, remote_addr)) => Self::process_request(&handler, stream, remote_addr),
                            Err(e) => error!("Failed to establish the connection: {}", e),
                        }
                    }
                }));
            }
        }

        for handle in handles.drain(..) {
            drop(handle.join());
        }
    }

    fn process_request(handler: &impl Handler, mut stream: TcpStream, remote_addr: SocketAddr) {
        /* ... */
    }
}

...how can I get the compiler to understand that the Handler does live long enough, as we join() all threads before the function returns? Currently I get this error:

error[E0373]: closure may outlive the current function, but it borrows `handler`, which is owned by the current function

I also tried to give each thread a separate clone of the handler - which I think should not be necessary, as Handler is totally "stateless". Anyways, the clone approach also doesn't work for me:

let mut handles = Vec::new();
for _n in 0..16 {
	if let Ok(thread_listener) = listener.try_clone() {
		handles.push(thread::spawn(move || {
			let thread_handler = handler.clone();
			loop {
				match listener.accept() {
					Ok((stream, remote_addr)) => Self::process_request(&thread_handler, stream, remote_addr),
					Err(e) => error!("Failed to establish the connection: {}", e),
				}
			}
		}));
	}
}

Error is:

error[E0277]: `impl Handler` cannot be shared between threads safely

But it's not "shared", really. Each thread has it's own clone that is moved into the thread/closure :exploding_head:

Thanks for any suggestions!

In order to share Handler across threads safely, it must be Send before it can be Sync to begin with (read more on that here).

This should be thread_listener instead of listener.

You can avoid errors like this by making your closure take ownership with the move keyword, so captured variables are taken by value, not reference:

thread::spawn(move || { ... })

Here a minimal example that compiles on the playground:

use std::thread;
use std::net::*;

pub struct Request;
pub struct Response;
pub struct ParseError;

pub trait Handler: Send + Sync {
    fn handle_request(&self, request: &Request) -> Response;

    fn handle_bad_request(&self, e: &ParseError) -> Response;
}

pub struct Server {
    addr: IpAddr,
    port: u16,
}

impl Server {
    pub fn new(addr: IpAddr, port: u16) -> Self {
        Self { addr, port }
    }

    pub fn run<H>(self, handler: H) where H: Handler + Clone + 'static {
        let socket_addr = SocketAddr::new(self.addr, self.port);
        let listener = TcpListener::bind(socket_addr).unwrap();

        let mut handles = Vec::new();
        for _n in 0..16 {
            if let Ok(thread_listener) = listener.try_clone() {
                let thread_handler = handler.clone();
                handles.push(thread::spawn(move || {
                    loop {
                        match thread_listener.accept() {
                            Ok((stream, remote_addr)) => Self::process_request(&thread_handler, stream, remote_addr),
                            Err(_) => (),
                        }
                    }
                }));
            }
        }

        for handle in handles.drain(..) {
            drop(handle.join());
        }
    }

    fn process_request(_handler: &impl Handler, mut _stream: TcpStream, _remote_addr: SocketAddr)
    {
        /* ... */
    }
}
3 Likes

Thanks !!!

Got it working like this now:

pub trait Handler: Clone + Send {
    fn handle_request(&self, request: &Request) -> Response;

    fn handle_bad_request(&self, e: &ParseError) -> Response {
        error!("Failed to parse request: {}", e);
        Response::new(StatusCode::BadRequest, None)
    }
}

pub struct Server {
    addr: IpAddr,
    port: u16,
}

impl Server {
    pub fn new(addr: IpAddr, port: u16) -> Self {
        Self { addr, port }
    }

    pub fn run(self, handler: impl Handler + 'static) {
        let socket_addr = SocketAddr::new(self.addr, self.port);
        info!("Listening on {}", socket_addr);
        let listener = TcpListener::bind(socket_addr).unwrap();

        let mut handles = Vec::new();
        for _n in 0..16 {
            if let Ok(thread_listener) = listener.try_clone() {
                let thread_handler = handler.clone();
                handles.push(thread::spawn(move || {
                    loop {
                        match thread_listener.accept() {
                            Ok((stream, remote_addr)) => Self::process_request(&thread_handler, stream, remote_addr),
                            Err(e) => error!("Failed to establish the connection: {}", e),
                        }
                    }
                }));
            }
        }

        for handle in handles.drain(..) {
            drop(handle.join());
        }
    }

    fn process_request(handler: &impl Handler, mut stream: TcpStream, remote_addr: SocketAddr) {
        /* .... */
    }
}

Still, some questions remain in my head:

  1. Why is 'static lifetime required for the Handler parameter? After all, we give each thread its own "owned" clone, so I don't get why it is required. Also, how is it even possible that I can still pass the Handler instance from my main() function, which most definitely is not static ???

  2. Why does the Handler need to be Send (or Sync)? We now give each thread its own clone, so it's never necessary to "send" an instance from one thread to another!

  3. Is there a solution that doesn't require cloning? I think provided that Handler is Sync, there should be no need to clone the Handler object at all and just pass a reference to each thread. But how?

You can implement your trait on references like this:

impl<'a> Hanlder for &'a SomeStruct

If you were to pass &'a SomeStruct to the run method of your server, the reference is moved into the closure, which means &'a may be outlived by the closure. So in case you pass a reference to run, it must have a 'static lifetime.

We send the clone from the main thread to the spawned thread.

Not really. Either we use the move keyword, in which case the first closure gets handler or we don't move handler and have the lifetime problem again. If you are worried about the cost of cloning, use an Arc, a special pointer that is cheaply clonable and implements Send and Sync.

Thanks for explanation! Still, don't get why this works:

pub fn run(self, handler: impl Handler + 'static) { ... }
fn main() {
    /* ... */
    let server = Server::new(addr, port);
    server.run(WebsiteHandler::new(&public_path));
}

We are definitely not passing a reference of 'static lifetime :sweat_smile:

Yes, sorry, my bad, my explanation was wrong. As long as public_path doesn't contain any fields that aren't owned or have a 'static lifetime, your code will compile. Answer found in this reddit post.

There is also a description of what the 'static lifetime in trait bounds does in rust by example.

1 Like

It means it doesn't/can't contain any non-static references. Meaning it can only have owned types or 'static references.

:bulb:

The way I prefer to think about Type: 'lifetime bounds is that the type is valid for the lifetime. So a 'static bounds means there's no lifetime where the type would be invalid. More here.

If you're joining everything within a function, an alternative is scoped threads. Those don't have to be 'static. Take note of the difference with regard to panics if you don't want them to propagate.