Hello everyone!
I'm trying to deal with a case when necessary to establish a new connection with remote RabbitMQ node only once, so that potential users could use separate channels (in the scope of the same connection). But I don't know how to reach that in asynchronous code properly. So, I have a couple of questions:
- How can I guaranteed that
init()
will be called only once, so that it will returns always the same connection during execution in event loop? Potentially, it can be done in constructor, however, I'm not sure that possible right now, especially in case when I need to establish the single connection with RabbitMQ node before creating new channels for clients. - Does the tokio provide ways to assign setup hooks for initializing an object (or maybe even certain fields) before the start? Or it can be done somehow else?
In general, I see it something like this, though this code won't compile:
use amq_protocol::uri::{AMQPUri};
use futures::future::{Future};
use lapin_futures_rustls::{AMQPConnectionRustlsExt};
use lapin_futures_rustls::lapin::channel::{Channel, ConfirmSelectOptions};
use lapin_futures_rustls::lapin::client::{Client, ConnectionOptions};
use lapin_futures_tls_internal::{AMQPStream};
use tokio::reactor::{Handle};
use tokio_current_thread::{spawn};
use tokio_io::{AsyncRead};
use tokio_tcp::{TcpStream};
use tokio_tls_api::{TlsStream};
/// Alias for the Lapin client with TLS.
pub type LapinClient = Client<AMQPStream<TlsStream<TcpStream>>>;
/// Alias for the lapin future type.
pub type LapinFuture = Box<Future<Item=LapinClient, Error=io::Error> + 'static>;
/// Alias for the lapin's channel.
pub type LapinChannelFuture = Box<Future<
Item=Channel<AsyncRead + Send + Sync + 'static>,
Error=io::Error
> + Send + 'static>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;
/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient
{
uri: AMQPUri,
client: Option<LapinFuture>
}
impl RabbitMQClient {
pub fn new(cli: &CliOptions) -> RabbitMQClient {
let schema = match cli.rabbitmq_secured {
true => "amqps",
false => "amqp",
};
let uri = format!(
"{}://{}:{}@{}:{}/{}",
schema.to_string(),
cli.rabbitmq_username.clone(),
cli.rabbitmq_password.clone(),
cli.rabbitmq_host.clone(),
cli.rabbitmq_port,
cli.rabbitmq_virtual_host.clone()
);
RabbitMQClient {
uri: uri.parse().unwrap(),
client: None
}
}
// Can it be used in the `new()` instead?
pub fn init(&mut self) {
self.client = self.get_client();
}
pub fn get_channel(&self) -> LapinChannelFuture {
Box::new(
self.client.unwrap().and_then(move |client| {
client.create_confirm_channel(ConfirmSelectOptions::default())
})
)
}
fn get_client(&mut self) -> LapinFuture {
let address = self.get_address_to_rabbitmq().parse().unwrap();
Some(
TcpStream::connect(&address).and_then(|stream| {
Client::connect(stream, ConnectionOptions::from_uri(self.uri))
})
.and_then(|(client, heartbeat)| {
spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
.into_future()
.map(|_| client)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
})
)
}
fn get_address_to_rabbitmq(&self) -> String {
format!("{}:{}", self.uri.authority.host, self.uri.authority.port)
}
}