[SOLVED] Extracting the exact count of message from a stream with futures

Hello there!

I'm trying to write a simple async application that will work with RabbitMQ via Request/Response pattern, so that after sending a request, processing for a some time by a server, read the response from an appropriate queue and to do something further with it.

Can anybody help with Rust futures and will hint me what I'm doing wrong?

Code:

// Consume a response message (the first incoming message) from the queue
.and_then(|channel| {
    channel.basic_consume(
        &queue_name,
        "response_consumer",
        &BasicConsumeOptions::default(),
        &FieldTable::new()
    )
        .map(move |stream| {
            stream
                  // we're only interested in the first incoming message in the message queue
                  .take(1) 
                  // and return it with a channel for the next processing stage
                  .map(move |message| (channel, message))
        })
})
.map_err(|err| {
    let message = format!("Error during consuming the response message: {}", err);
    error!("{:?}", message);
    err
})

// Processing the extracted message here
.and_then(|(channel, message)| {  // <-- error here?
    println!("Message: {:?}", std::str::from_utf8(&message.data).unwrap());
    // ... some useful code code
    channel.basic_ack(message.delivery_tag);
    Ok(channel)
})
.map_err(|err| {
    error!("{:?}", message);
    err
})

// Other chained futures futures via `.and_then`, like this
// .and_then(|channel| { 
//    ...
// })
//

Rust compiler noticed me that the

error[E0308]: mismatched types
   --> src/main.rs:190:24
    |
190 |             .and_then(|(channel, message)| {
    |                        ^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found tuple
    |
    = note: expected type `futures::stream::Map<futures::stream::Take<lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>>, [closure@src/main.rs:179:36: 179:69 channel:_]>`
               found type `(_, _)`

error: aborting due to previous error

stream.take(1).map(...) gives you back a mapped Stream, not a Future (which is what it appears you want to have). That’s why your and_then is receiving a Map<Take<...>> and not the item from the stream itself, which is what the compiler is telling you.

You can do one of two things:

  1. Turn the Stream into a future via Stream::into_future. This gives you a future that resolves to the next element of the stream and the stream itself. This can be used to “peel off” items one by one out of a Stream.
  2. Stick with the stream abstraction but then use a combinator such as Stream::for_each to process items as they become available. for_each will drive the stream to completion (ie no more items from it).
1 Like

Got it! I've applied those changes to my piece of code, it successfully compiled and works:

// Get only the first message from the response queue
.and_then(|channel| {
    channel.basic_consume(
        &queue_name,
        "response_consumer",
        &BasicConsumeOptions::default(),
        &FieldTable::new()
    )
        .and_then(move |stream| {
            stream.take(1)
                  .into_future()
                  .map_err(|(err, _)| err)
                  .map(move |(message, _)| (channel, message))
        })

})
.map_err(|err| {
    let message = format!("Error during consuming the response message: {}", err);
    error!("{}", message);
    err
})

// Process the taken message
.and_then(|(channel, message)| {
    let message = message.unwrap();
    // ... some useful code here
    channel.basic_ack(message.delivery_tag);
    Ok(channel)
})
.map_err(|err| {
    let message = format!("Error during sending a message to the client: {}", err);
    error!("{}", message);
    err
})

Thank you for the answer!

1 Like