Hello there!
I'm trying to write a simple async application that will work with RabbitMQ via Request/Response pattern, so that after sending a request, processing for a some time by a server, read the response from an appropriate queue and to do something further with it.
Can anybody help with Rust futures and will hint me what I'm doing wrong?
Code:
// Consume a response message (the first incoming message) from the queue
.and_then(|channel| {
channel.basic_consume(
&queue_name,
"response_consumer",
&BasicConsumeOptions::default(),
&FieldTable::new()
)
.map(move |stream| {
stream
// we're only interested in the first incoming message in the message queue
.take(1)
// and return it with a channel for the next processing stage
.map(move |message| (channel, message))
})
})
.map_err(|err| {
let message = format!("Error during consuming the response message: {}", err);
error!("{:?}", message);
err
})
// Processing the extracted message here
.and_then(|(channel, message)| { // <-- error here?
println!("Message: {:?}", std::str::from_utf8(&message.data).unwrap());
// ... some useful code code
channel.basic_ack(message.delivery_tag);
Ok(channel)
})
.map_err(|err| {
error!("{:?}", message);
err
})
// Other chained futures futures via `.and_then`, like this
// .and_then(|channel| {
// ...
// })
//
Rust compiler noticed me that the
error[E0308]: mismatched types
--> src/main.rs:190:24
|
190 | .and_then(|(channel, message)| {
| ^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found tuple
|
= note: expected type `futures::stream::Map<futures::stream::Take<lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>>, [closure@src/main.rs:179:36: 179:69 channel:_]>`
found type `(_, _)`
error: aborting due to previous error