Hello everyone
I have a couple of questions about implementing thread-safe structs for usage with Tokio:
- How to implement custom structure that will be thread-safe when working with Tokio?
- Does it have a sense to wrap those struct that marked with
Send
+Sync
intoArc<RwLock>
or will enough to use only theArc
for read-only access across the threads? But in the case when necessary to init rabbitmq once (in theinit
future)?
Just as example, why I'm curious about it and how to make it properly I will show the following code, on what I'm writing right now:
pub struct Proxy {
engine: Arc<RwLock<Box<Engine>>>,
rabbitmq_client: Arc<RwLock<Box<RabbitMQClient>>>,
connections: Arc<Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
auth_middleware: Arc<RwLock<Box<Middleware>>>
}
impl Proxy {
/// Returns a new instance of a reverse proxy application.
pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {
// Instantiating middlewares, based on CLI options
...
Proxy {
engine: Arc::new(RwLock::new(engine)),
rabbitmq_client: Arc::new(RwLock::new(rabbitmq_client)),
connections: Arc::new(Mutex::new(HashMap::new())),
auth_middleware: Arc::new(RwLock::new(auth_middleware)),
}
}
pub fn run(&self, address: SocketAddr) {
let handle = Handle::default();
let listener = TcpListener::bind(&address).unwrap();
println!("Listening on: {}", address);
let init = future::lazy(|| -> FutureResult<(), PathfinderError> {
self.rabbitmq_client.clone().write().unwrap().init();
future::ok::<(), PathfinderError>(())
});
let server = listener.incoming().for_each(move |stream| {
let addr = stream.peer_addr().expect("Connected stream should have a peer address.");
let engine_local = self.engine.clone();
let rabbimq_local = self.rabbitmq_client.clone();
let connections_local = self.connections.clone();
let auth_middleware_local = self.auth_middleware.clone();
let handle_local = handle.clone();
accept_async(stream)
.and_then(move |ws_stream| {
}
// Do something else
.or_else(|err| {
debug!("{}", err.description());
Ok(())
})
});
// Run the server
let server_future = init
.map_err(|_err| ())
.and_then(|_| server.map_err(|_err| ()));
run(server_future);
}
At the first glance it looks great, because all of those fields can be shared across the threads safely, until we will try to make sure that the code compiles:
error[E0621]: explicit lifetime required in the type of `cli`
--> src/proxy.rs:58:51
|
47 | pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {
| --- consider changing the type of `cli` to `&'static cli::CliOptions`
...
58 | auth_middleware: Arc::new(RwLock::new(auth_middleware)),
| ^^^^^^^^^^^^^^^ lifetime `'static` required
error[E0277]: `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
--> src/proxy.rs:169:9
|
169 | run(server_future);
| ^^^ `(dyn auth::middleware::Middleware + 'static)` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `(dyn auth::middleware::Middleware + 'static)`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn auth::middleware::Middleware + 'static)>`
= note: required because it appears within the type `std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::RwLock<std::boxed::Box<(dyn auth::middleware::Middleware + 'static)>>>`
Altough, as for example, one of the existing middlewares has pretty a simple implementation (other were written by analogy):
/// Type alias for future result type.
pub type MiddlewareFuture = Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static>;
/// A trait for types which can be used as middleware during processing a request from a client.
pub trait Middleware {
/// Applied transforms and checks to an incoming request. If it failed,
/// then should return a `PathfinderError` instance.
fn process_request(&self, message: JsonMessage) -> MiddlewareFuture;
}
/// Default struct which is used for reverse proxy without an authentication
/// layer.
pub struct EmptyMiddleware<'a, T: 'a> {
phantom: PhantomData<&'a T>
}
impl<'a, T: Send + Sync> EmptyMiddleware<'a, T> {
pub fn new(_cli: &CliOptions) -> EmptyMiddleware<T> {
EmptyMiddleware {
phantom: PhantomData
}
}
}
impl<'a, T: Send + Sync> Middleware for EmptyMiddleware<'a, T> {
/// Returns an empty future which is doesn't do anything.
fn process_request(&self, _message: JsonMessage) -> MiddlewareFuture {
Box::new(lazy(move || Ok(())))
}
}