Hello everyone!
I'm trying to chaining two different futures together, so that, if we were failing on the certain step, then don't doing anything else and only return an error that have got on the particular stage.
This idea is pretty straightforward, and I had implemented it with the following code:
// Authorize before doing any action. If got an error, the send a message to a client
let auth_future = auth_middleware_local.borrow()
.process_request(&json_message, &handle_inner)
// propagate an error to the client
.map_err(|err| {
handle_error!(&connections_nested, &addr, engine_nested, err);
err
});
// Process the request and put a message in the queue
let rabbitmq_future = engine_local.borrow_mut().handle(
json_message, &addr, &connections_inner, &handle_inner
)
// propagate an error to the client
.map_err(|err| {
handle_error!(&connections_nested, &addr, engine_nested, err);
err
});
let processing_request_future = auth_future
// working with RabbitMQ only when auth_future was completed with success
.and_then(move |_| rabbitmq_future)
.then(move |_| ());
handle_inner.spawn(processing_request_future);
First of all, we're going to check and validate the specified token with using Redis, so that, if we will get an error, then send the response to a client in some format. Otherwise, we're applying transformations for a request and put a message in RabbitMQ queue.
But unfortunately, compiler mentioned that I'm doing it wrong:
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src/engine/mod.rs:77:9
|
77 | / Box::new(client_future
78 | | // 1. Create a channel
79 | | .and_then(|client| {
80 | | client.create_confirm_channel(ConfirmSelectOptions::default())
... |
249 | | })
250 | | )
| |_________^
|
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the method body at 68:5...
--> src/engine/mod.rs:68:5
|
68 | / pub fn handle(&self, message: Box<JsonValue>, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
69 | | let url = message["url"].as_str().unwrap();
70 | | let queue_name = format!("{}", Uuid::new_v4());
71 | | let event_name = message["event-name"].as_str().unwrap_or("null");
... |
250 | | )
251 | | }
| |_____^
note: ...so that the type `futures::Then<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, [closure@src/engine/mod.rs:79:23: 81:14]>, [closure@src/engine/mod.rs:82:22: 86:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:99:26: 99:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:89:23: 100:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:101:22: 105:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:116:26: 116:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:108:23: 117:14 queue_name:&std::string::String, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>]>, [closure@src/engine/mod.rs:118:22: 122:14]>, futures::Map<std::boxed::Box<futures::Future<Item=std::option::Option<bool>, Error=std::io::Error>>, [closure@src/engine/mod.rs:154:26: 154:49 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:126:23: 155:14 request_headers:&std::boxed::Box<std::vec::Vec<(std::string::String, std::string::String)>>, queue_name:&std::string::String, event_name:&&str, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>, message:&std::boxed::Box<json::JsonValue>]>, [closure@src/engine/mod.rs:156:22: 160:14]>, futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, futures::Map<futures::MapErr<futures::stream::StreamFuture<futures::stream::Take<lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>>>, [closure@src/engine/mod.rs:173:40: 173:54]>, [closure@src/engine/mod.rs:174:36: 174:83 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:170:31: 175:22 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:163:23: 176:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:177:22: 181:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:191:26: 191:42 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:184:23: 192:14 self:&&engine::Engine, connections:&&std::rc::Rc<std::cell::RefCell<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, client:&&std::net::SocketAddr]>, [closure@src/engine/mod.rs:193:22: 197:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:226:26: 226:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:218:23: 227:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:228:22: 232:14]>, std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:235:23: 237:14]>, [closure@src/engine/mod.rs:238:22: 242:14]>, std::result::Result<(), error::PathfinderError>, [closure@src/engine/mod.rs:244:19: 249:14]>` will meet its required lifetime bounds
--> src/engine/mod.rs:77:9
|
77 | / Box::new(client_future
78 | | // 1. Create a channel
79 | | .and_then(|client| {
80 | | client.create_confirm_channel(ConfirmSelectOptions::default())
... |
249 | | })
250 | | )
| |_________^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Future<Item=(), Error=error::PathfinderError> + 'static>, found std::boxed::Box<futures::Future<Item=(), Error=error::PathfinderError>>)
--> src/engine/mod.rs:77:9
|
77 | / Box::new(client_future
78 | | // 1. Create a channel
79 | | .and_then(|client| {
80 | | client.create_confirm_channel(ConfirmSelectOptions::default())
... |
249 | | })
250 | | )
| |_________^
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/proxy.rs:120:30
|
120 | .then(move |_| ());
| ^^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/proxy.rs:122:38
|
122 | handle_inner.spawn(processing_request_future);
| ^^^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
error: aborting due to 3 previous errors
For the first part of the output log, I did't quite understand what is trying say to a compiler, because I've done the same thing for auth_future which is works with Redis. In general, the implementation for generating auth_future
is simple:
pub type MiddlewareFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;
// ...
fn process_request(&self, message: &JsonMessage, handle: &Handle) -> MiddlewareFuture {
// ... some preparation before
let token_inner = token.clone();
let validation_struct = self.get_validation_struct();
let jwt_secret_inner = self.jwt_secret.clone();
Box::new(
redis_connection
// Get the User ID from Redis by the token
.and_then(move |connection| {
connection.send::<String>(resp_array!["GET", token])
})
// Connection issue or token is already deleted
.map_err(|_| {
let message = String::from("Token is expired.");
PathfinderError::AuthenticationError(message)
})
// Extracted user_id used here for additional JWT validation
.and_then(move |user_id| {
let mut validation_struct_inner = validation_struct.clone();
validation_struct_inner.set_audience(&user_id);
validate(&token_inner, &jwt_secret_inner, &validation_struct)
})
// The passed token is expired or has an invalid data
.map_err(|_| {
let message = String::from("Token is invalid.");
PathfinderError::AuthenticationError(message)
})
// Drop the result, because everything that we need was done
.map(|_| ())
)
}
And the similar things were done for rabbitmq_future
:
// Alias for the lapin future type
pub type LapinFuture = Box<Future<Item=lapin::client::Client<AMQPStream>, Error=io::Error> + 'static>;
// Alias for generic future for pathfinder app and RabbitMQ
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;
pub fn handle(&self, message: Box<JsonValue>, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
let url = message["url"].as_str().unwrap();
let queue_name = format!("{}", Uuid::new_v4());
let event_name = message["event-name"].as_str().unwrap_or("null");
let endpoint = self.router.clone().match_url_or_default(url);
let request_headers = self.prepare_request_headers(message, endpoint);
let client_future = self.rabbitmq_client.borrow().get_future(handle);
Box::new(client_future
// 1. Create a channel
.and_then(|client| {
client.create_confirm_channel(ConfirmSelectOptions::default())
})
.map_err(|err| {
let message = format!("Error during creating a channel: {}", err);
error!("{}", message);
err
})
// flatten and chained calls with .and_then(|client| ...) and .map_err(|err| ... )
// ...
// 9. Close the channel
.and_then(|channel| {
channel.close(200, "Close the channel.")
})
.map_err(|err| {
let message = format!("Error during closing the channel: {}", err);
error!("{}", message);
err
})
// Transforms a LapinFuture to RabbitMQFuture here
.then(move |result| {
match result {
Ok(_) => Ok(()),
Err(err) => Err(PathfinderError::Io(err))
}
})
)
}
So, I have a two different questions related to the compiler errors, about what probably I'm doing in a wrong way:
- Why the stuff that was done for
auth_future
is correct, but forrabbitmq_future
is not? I din't quite understand, why each of type alias, specified with'static
lifetime is correct, but it only works for the first? - What's wrong with the
.then(move |_| ());
part of code, if I'm trying to resolve here a final future and return here aOk(())
?