Implementing thread-safe structs for tokio

Hello everyone

I have a couple of questions about implementing thread-safe structs for usage with Tokio:

  1. How to implement custom structure that will be thread-safe when working with Tokio?
  2. Does it have a sense to wrap those struct that marked with Send + Sync into Arc<RwLock> or will enough to use only the Arc for read-only access across the threads? But in the case when necessary to init rabbitmq once (in the init future)?

Just as example, why I’m curious about it and how to make it properly I will show the following code, on what I’m writing right now:

pub struct Proxy {
    engine: Arc<RwLock<Box<Engine>>>,
    rabbitmq_client: Arc<RwLock<Box<RabbitMQClient>>>,
    connections: Arc<Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    auth_middleware: Arc<RwLock<Box<Middleware>>>
}

impl Proxy {
    /// Returns a new instance of a reverse proxy application.
    pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {

        // Instantiating middlewares, based on CLI options
        ...

        Proxy {
            engine: Arc::new(RwLock::new(engine)),
            rabbitmq_client: Arc::new(RwLock::new(rabbitmq_client)),
            connections: Arc::new(Mutex::new(HashMap::new())),
            auth_middleware: Arc::new(RwLock::new(auth_middleware)),
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let handle = Handle::default();
        let listener = TcpListener::bind(&address).unwrap();
        println!("Listening on: {}", address);

        let init = future::lazy(|| -> FutureResult<(), PathfinderError> {
            self.rabbitmq_client.clone().write().unwrap().init();
            future::ok::<(), PathfinderError>(())
        });

        let server = listener.incoming().for_each(move |stream| {
            let addr = stream.peer_addr().expect("Connected stream should have a peer address.");

            let engine_local = self.engine.clone();
            let rabbimq_local = self.rabbitmq_client.clone();
            let connections_local = self.connections.clone();
            let auth_middleware_local = self.auth_middleware.clone();
            let handle_local = handle.clone();

            accept_async(stream)
                .and_then(move |ws_stream| {

                }
                
                // Do something else
                
                .or_else(|err| {
                    debug!("{}", err.description());
                    Ok(())
                })
        });

        // Run the server
        let server_future = init
            .map_err(|_err| ())
            .and_then(|_| server.map_err(|_err| ()));
        run(server_future);
    }

At the first glance it looks great, because all of those fields can be shared across the threads safely, until we will try to make sure that the code compiles:

error[E0621]: explicit lifetime required in the type of `cli`
  --> src/proxy.rs:58:51
   |
47 |     pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {
   |                --- consider changing the type of `cli` to `&'static cli::CliOptions`
...
58 |             auth_middleware: Arc::new(RwLock::new(auth_middleware)),
   |                                                   ^^^^^^^^^^^^^^^ lifetime `'static` required

error[E0277]: `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
   --> src/proxy.rs:169:9
    |
169 |         run(server_future);
    |         ^^^ `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn auth::middleware::Middleware + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn auth::middleware::Middleware + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>>`

Altough, as for example, one of the existing middlewares has pretty a simple implementation (other were written by analogy):

/// Type alias for future result type.
pub type MiddlewareFuture = Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static>;


/// A trait for types which can be used as middleware during processing a request from a client.
pub trait Middleware {
    /// Applied transforms and checks to an incoming request. If it failed,
    /// then should return a `PathfinderError` instance.
    fn process_request(&self, message: JsonMessage) -> MiddlewareFuture;
}


/// Default struct which is used for reverse proxy without an authentication
/// layer.
pub struct EmptyMiddleware<'a, T: 'a> {
    phantom: PhantomData<&'a T>
}


impl<'a, T: Send + Sync> EmptyMiddleware<'a, T> {
    pub fn new(_cli: &CliOptions) -> EmptyMiddleware<T> {
        EmptyMiddleware {
            phantom: PhantomData
        }
    }
}


impl<'a, T: Send + Sync> Middleware for EmptyMiddleware<'a, T> {
    /// Returns an empty future which is doesn't do anything.
    fn process_request(&self, _message: JsonMessage) -> MiddlewareFuture {
        Box::new(lazy(move || Ok(())))
    }
}

If you don’t need to mutate it, you don’t need RwLock/Mutex.

Arc is heap allocated the same way as Box, so Arc<Box<T>> is a double indirection (seems redundant).

I don’t see the type of auth_middleware in your code, so can’t help with the lifetime problem. But it sounds like you’re trying to store a reference in an owned struct. That won’t work, you’ll need to copy it.

If you don’t need to mutate it, you don’t need RwLock/Mutex.

Sounds interesting. However, does it make sense to use RwLock/Mutex in the case when before an actual usage by futures/threads I will do an extra initializing (i.e. like setting some contexts / generating base futures) and that data will be used later only for read-only?

I don’t see the type of auth_middleware in your code, so can’t help with the lifetime problem. But it sounds like you’re trying to store a reference in an owned struct. That won’t work, you’ll need to copy it.

The new(...) method doesn’t contain something special, just plain constructing objects:

    pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {
        let auth_middleware: Box<Middleware> = match cli.validate {
            true => Box::new(JwtTokenMiddleware::new(cli)),
            _ => Box::new(EmptyMiddleware::new(cli))
        };
        let rabbitmq_client = Box::new(RabbitMQClient::new(cli));

        Proxy {
            engine: Arc::new(RwLock::new(engine)),
            rabbitmq_client: Arc::new(RwLock::new(rabbitmq_client)),
            connections: Arc::new(Mutex::new(HashMap::new())),
            auth_middleware: Arc::new(RwLock::new(auth_middleware)),
        }
    }

No, locks are strictly for mutation possible across threads. If you prepare the object on a single thread, you can mutate it at will, and then put it in Arc.

In Rust mutability is not property of an object or memory, but merely controls how an object is accessed to enforce shared read only Vs mutable exclusive access at all times.

1 Like

Your lifetime issue is because EmptyMiddleware (and maybe JwtTokenMiddleware as well, although it’s not shown) ends up associating its lifetime parameter with the lifetime of the &CliOptions arg you give it. Why does EmptyMiddleware even have a lifetime parameter? And why does it need the T generic type parameter?

Also, I think everyone’s experience will be better if you show more code when asking for help - if you can’t tell what’s relevant or not, then show more rather than less :slight_smile:.

Your lifetime issue is because EmptyMiddleware (and maybe JwtTokenMiddleware as well, although it’s not shown) ends up associating its lifetime parameter with the lifetime of the &CliOptions arg you give it. Why does EmptyMiddleware even have a lifetime parameter? And why does it need the T generic type parameter?

That was the first idea that came in my mind to try to fix it (which was a weird decision). :sweat_smile:

So, currently I’m dealing with 2 issues around:

  1. Structures, that must implement the Middleware trait and have Sync/Send marks for being threadsafe. At the moment, they are practically the same (except for the name of the structure):
pub type MiddlewareFuture = Box<Future<Item=(), Error=PathfinderError> + Sync + Send + 'static>;

pub trait Middleware {
    fn process_request(&self, message: JsonMessage) -> MiddlewareFuture;
}

pub struct EmptyMiddleware;

impl EmptyMiddleware {
    pub fn new(_cli: &CliOptions) -> EmptyMiddleware {
        EmptyMiddleware {}
    }
}

impl Middleware for EmptyMiddleware {
    /// Returns an empty future which is doesn't doing anything.
    fn process_request(&self, _message: JsonMessage) -> MiddlewareFuture {
        Box::new(lazy(move || Ok(())))
    }
}

But in the moment I don’t actually know how to mark them all as Sync/Send. Should I implement those unsafe traits for the structs without any fields? Anyway, the compiler is noticing about not matched traits in new(cli: &CliOptions, engine: Box<Engine>) constructor of the Proxy struct:

error[E0308]: mismatched types
  --> src/proxy.rs:58:51
   |
58 |             auth_middleware: Arc::new(RwLock::new(auth_middleware)),
   |                                                   ^^^^^^^^^^^^^^^ expected trait `auth::middleware::Middleware + std::marker::Sync + std::marker::Send`, found trait `auth::middleware::Middleware`
   |
   = note: expected type `std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Sync + std::marker::Send + 'static)>`
              found type `std::boxed::Box<dyn auth::middleware::Middleware>`
  1. A lifetime issue in run(&self) method of the Proxy structure, that runs a bunch of futures:
pub fn run(&self, address: SocketAddr) {
    let handle = Handle::default();
    let listener = TcpListener::bind(&address).unwrap();
    println!("Listening on: {}", address);

    let rabbitmq_client = self.rabbitmq_client.clone();
    let init = future::lazy(|| -> FutureResult<(), PathfinderError> {
        rabbitmq_client.write().unwrap().init();
        future::ok::<(), PathfinderError>(())
    });

    let server = listener.incoming().for_each(move |stream| {
        let addr = stream.peer_addr().expect("Connected stream should have a peer address.");

        let engine_local = self.engine.clone();
        let rabbimq_local = self.rabbitmq_client.clone();
        let connections_local = self.connections.clone();
        let auth_middleware_local = self.auth_middleware.clone();
        let handle_local = handle.clone();

        // Processing logic
    });

    // Run the server
    let server_future = init
        .map_err(|_err| ())
        .and_then(|_| server.map_err(|_err| ()));
    run(server_future);
}

Probably that issue related with previous stuff (with Middlewares I mean):

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/proxy.rs:74:51
    |
74  |           let server = listener.incoming().for_each(move |stream| {
    |  ___________________________________________________^
75  | |             let addr = stream.peer_addr().expect("Connected stream should have a peer address.");
76  | |
77  | |             let engine_local = self.engine.clone();
...   |
163 | |                 })
164 | |         });
    | |_________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 63:5...
   --> src/proxy.rs:63:5
    |
63  | /     pub fn run(&self, address: SocketAddr) {
64  | |         let handle = Handle::default();
65  | |         let listener = TcpListener::bind(&address).unwrap();
66  | |         println!("Listening on: {}", address);
...   |
170 | |         run(server_future);
171 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &proxy::Proxy
               found &proxy::Proxy
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `futures::AndThen<futures::MapErr<futures::Lazy<[closure@src/proxy.rs:69:33: 72:10 rabbitmq_client:&std::sync::Arc<std::sync::RwLock<std::boxed::Box<rabbitmq::client::RabbitMQClient>>>], futures::FutureResult<(), error::PathfinderError>>, [closure@src/proxy.rs:168:22: 168:31]>, futures::MapErr<futures::stream::ForEach<tokio_tcp::Incoming, [closure@src/proxy.rs:74:51: 164:10 self:&proxy::Proxy, handle:tokio::reactor::Handle], futures::OrElse<futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_tcp::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<(), tungstenite::Error>, [closure@src/proxy.rs:85:27: 158:18 connections_local:std::sync::Arc<std::sync::Mutex<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, addr:std::net::SocketAddr, engine_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<engine::Engine>>>, auth_middleware_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Sync + std::marker::Send + 'static)>>>, rabbimq_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<rabbitmq::client::RabbitMQClient>>>]>, std::result::Result<(), std::io::Error>, [closure@src/proxy.rs:160:26: 163:18]>>, [closure@src/proxy.rs:169:42: 169:51]>, [closure@src/proxy.rs:169:23: 169:52 server:futures::stream::ForEach<tokio_tcp::Incoming, [closure@src/proxy.rs:74:51: 164:10 self:&proxy::Proxy, handle:tokio::reactor::Handle], futures::OrElse<futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_tcp::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<(), tungstenite::Error>, [closure@src/proxy.rs:85:27: 158:18 connections_local:std::sync::Arc<std::sync::Mutex<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, addr:std::net::SocketAddr, engine_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<engine::Engine>>>, auth_middleware_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Sync + std::marker::Send + 'static)>>>, rabbimq_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<rabbitmq::client::RabbitMQClient>>>]>, std::result::Result<(), std::io::Error>, [closure@src/proxy.rs:160:26: 163:18]>>]>` will meet its required lifetime bounds
   --> src/proxy.rs:170:9
    |
170 |         run(server_future);
    |         ^^^

Regarding #1, the compiler will automatically infer Send when it knows the concrete type. If you’re using trait objects, you’ll need to put the Send requirement on the trait object - this is just like from your other thread. So:

pub struct Proxy {
    ...
    auth_middleware: Arc<RwLock<Middleware + Send>>
}

You can also require Send for all Middleware impls:

trait Middleware: Send { ... }

Then you don’t need to add + Send because that’s implied.

For #2, you’re accessing self inside the closure, which means the closure captures a reference to self in this case. That’s going to make your closure non-'static. Instead, you should clone() the relevant values outside the closure, and then use those clones inside the closure. So you’re looking for something like:

let engine = self.engine.clone();
let rabbitmq = self.rabbitmq_client.clone();
let connections = self.connections.clone();
let auth_middleware = self.auth_middleware.clone();
let server = listener.incoming().for_each(move |stream| {
        let addr = stream.peer_addr().expect("Connected stream should have a peer address.");
        // use the cloned values from above

        // Processing logic
    });

You need to fully decouple that closure from self, or else the closure won’t be 'static.

You can also require Send for all Middleware impls:

trait Middleware: Send { ... }

Then you don’t need to add + Send because that’s implied.

In the case when I’d tried to add the Send requirement to the Middleware trait, so that it won’t needed to append + Send to everything, it produces an error:

error[E0277]: `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
   --> src/proxy.rs:170:9
    |
170 |         run(server_future);
    |         ^^^ `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn auth::middleware::Middleware + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn auth::middleware::Middleware + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>>`
    = note: required because it appears within the type `[closure@src/proxy.rs:85:27: 158:18 connections_local:std::sync::Arc<std::sync::Mutex<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, addr:std::net::SocketAddr, engine_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<engine::Engine>>>, auth_middleware_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>>, rabbimq_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<rabbitmq::client::RabbitMQClient>>>]`

However, if append the + Send as an extra requirement in some places, the compiler generates almost the same error, except the one more requirement for shared middlewares:

error[E0277]: `(dyn auth::middleware::Middleware + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/proxy.rs:170:9
    |
170 |         run(server_future);
    |         ^^^ `(dyn auth::middleware::Middleware + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn auth::middleware::Middleware + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn auth::middleware::Middleware + std::marker::Send + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Send + 'static)>`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Send + 'static)>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Send + 'static)>>>`
    = note: required because it appears within the type `[closure@src/proxy.rs:85:27: 158:18 connections_local:std::sync::Arc<std::sync::Mutex<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, addr:std::net::SocketAddr, engine_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<engine::Engine>>>, auth_middleware_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + std::marker::Send + 'static)>>>, rabbimq_local:std::sync::Arc<std::sync::RwLock<std::boxed::Box<rabbitmq::client::RabbitMQClient>>>]`

This is because Arc<T>: Send only if T: Send + Sync (because it allows accessing T across threads, it needs to be Sync as well). So for your case specifically (i.e. Arc based trait object) you’ll need to require Sync and Send on the trait (if you want to go that route):

trait Middleware: Send + Sync { ... }
1 Like