I've rewritten the existing RabbitMQ client, so that it contains only two methods for now:
-
connect(uri: &AMQPUri)
that returns a new instance of RabbitMQ client with initialized LapinClient instance, otherwise an error -
get_channel(&self)
creates a new channel that re-using an existing LapinClient instance.
/// Alias for the lapin client with TLS.
pub type LapinClient = Client<TcpStream>;
/// Alias for the lapin channel.
pub type LapinChannel = Channel<TcpStream>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static>;
/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient {
uri: AMQPUri,
client: Arc<LapinClient>
}
impl RabbitMQClient {
/// Initializes the inner fields of RabbitMQ client for future usage.
pub fn connect(uri: &AMQPUri)
-> impl Future<Item=Self, Error=io::Error> + Sync + Send + 'static
{
let address = get_address_to_rabbitmq(uri);
let uri_local = uri.clone();
TcpStream::connect(&address).and_then(|stream| {
Client::connect(stream, ConnectionOptions::from_uri(uri_local))
})
.and_then(|(client, heartbeat)| {
spawn(heartbeat.map_err(|err| error!("Heartbeat error: {:?}", err)))
.into_future()
.map(|_| RabbitMQClient { uri: uri_local, client: Arc::new(client) })
.map_err(|error| {
error!("Occurred an error during spawning heartbeat future: {:?}", error);
io::Error::new(io::ErrorKind::Other, "Spawn error.")
})
})
}
/// Returns a lapin channel as future, based on the lapin client instance.
pub fn get_channel(&self)
-> impl Future<Item=LapinChannel, Error=io::Error> + Sync + Send + 'static
{
let client = self.client.clone();
client.create_confirm_channel(ConfirmSelectOptions::default())
}
}
The last thing to me that was left is how to fix a lifetime error in futures chain:
/// Run the server on the specified address and port.
pub fn run(&self, address: SocketAddr) {
let listener = TcpListener::bind(&address).unwrap();
info!("Listening on: {}", address);
// Run the server
let server_future = self.get_rabbitmq_client()
.map_err(|error| {
error!("Lapin error: {:?}", error);
()
})
.and_then(|rabbitmq: Arc<RabbitMQClient>|
self.request_handler(&listener, rabbitmq)
.map_err(|_error| ())
);
run(server_future);
}
fn get_rabbitmq_client(&self)
-> impl Future<Item=Arc<RabbitMQClient>, Error=PathfinderError> + Sync + Send + 'static
{
let amqp_uri = self.amqp_uri.clone();
RabbitMQClient::connect(amqp_uri.as_ref())
.map(|client| Arc::new(client))
.map_err(|error| {
error!("Error in RabbitMQ Client. Reason: {:?}", error);
PathfinderError::Io(error)
})
}
fn request_handler(&self, listener: &TcpListener, rabbitmq: Arc<RabbitMQClient>)
-> impl Future<Item=(), Error=std::io::Error> + Sync + Send + 'static
{
// ...
}
Do you have any ideas how to prolong the lifetime of rabbitmq
, so that it can be used later?
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src/proxy.rs:75:23
|
75 | .and_then(|rabbitmq| {
| _______________________^
76 | | self.request_handler(&listener, rabbitmq)
77 | | .map_err(|_error| ())
78 | | });
| |_____________^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 65:5...
--> src/proxy.rs:65:5
|
65 | / pub fn run(&self, address: SocketAddr) {
66 | | let listener = TcpListener::bind(&address).unwrap();
67 | | info!("Listening on: {}", address);
68 | |
... |
79 | | run(server_future);
80 | | }
| |_____^
= note: ...so that the types are compatible:
expected &&proxy::Proxy
found &&proxy::Proxy
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `futures::AndThen<futures::MapErr<impl std::marker::Send+std::marker::Sync+futures::Future, [closure@src/proxy.rs:71:22: 74:14]>, futures::MapErr<impl std::marker::Send+std::marker::Sync+futures::Future, [closure@src/proxy.rs:77:30: 77:41]>, [closure@src/proxy.rs:75:23: 78:14 self:&&proxy::Proxy, listener:&tokio_tcp::TcpListener]>` will meet its required lifetime bounds
--> src/proxy.rs:79:9
|
79 | run(server_future);
| ^^^