Propagate an error in futures

Hello everyone!

I'm trying to cover the corner cases when required action can't be done because of raised error on the certain steps. And before returning an original error I'd like to do some amount of useful work, like here:

.and_then(move |channel| {
    let queue_name_bind_clone = queue_name_bind.clone();
    let channel_clone = channel.clone();

    channel.queue_bind(
        &queue_name_bind,
        &endpoint_link.get_response_exchange(),
        &endpoint_link.get_microservice(),
        &QueueBindOptions::default(),
        &FieldTable::new()
    )
        // Process the errors only when exchange point wasn't created yet
        .map_err(|err| {
            channel_clone.queue_delete(&queue_name_bind_clone, &QueueDeleteOptions::default())
            	.and_then(move |_| {
                	channel_clone.close(200, "Connection closed: Invalid response exchange.")
            	})
           	    .then(move |_| Err(err))
        })
       // All is ready, go to the next step and re-use the channel with the next `.and_then` block 
       .map(|_| channel)
})
.map_err(|err| {
    let message = format!("Error during linking the response queue with exchange. Reason -> {}", err);
    error!("{}", message);
    err
})

Will lead to the following compiling error:

error[E0271]: type mismatch resolving `<[closure@src/engine/mod.rs:114:30: 119:22 channel_copy:_, queue_name_bind_copy:_] as std::ops::FnOnce<(std::io::Error,)>>::Output == std::io::Error`
   --> src/engine/mod.rs:102:14
    |
102 |             .and_then(move |channel| {
    |              ^^^^^^^^ expected struct `futures::Then`, found struct `std::io::Error`
    |
    = note: expected type `futures::Then<futures::AndThen<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:115:115: 117:26 channel_copy:_]>, 

Any ideas how it could be solved so that I could return the catched error, but have done a work before?

 // Process the errors only when exchange point wasn't created yet
        .map_err(|err| {
            channel_clone.queue_delete(&queue_name_bind_clone, &QueueDeleteOptions::default())
            	.and_then(move |_| {
                	channel_clone.close(200, "Connection closed: Invalid response exchange.")
            	})
           	    .then(move |_| Err(err))
        })

Is queue_delete() there returning a future? It seems like it, but want to confirm.

In general, a closure you provide to map_err() does not produce any futures itself, it only applies some transformation (mapping) function to the error type, synchronously.

If you want to do something in that block that itself will contain future chains, then you should use combinators that take closures that return an IntoFuture. For example, to handle an error path with its own future chain you can use the or_else() or then() combinators (instead of map_err, which won't work out well).

Is queue_delete() there returning a future? It seems like it, but want to confirm.

Yes, it's a future that returns Box<Future<Item = (), Error = io::Error>>. The implementation of it you can see here. The same is applying to the close() function.

I tried change the previous piece of code to this:

.and_then(move |channel| {
    let queue_name_bind_clone = queue_name_bind.clone();
    let channel_clone = channel.clone();

    channel.queue_bind(
        &queue_name_bind,
        &endpoint_link.get_response_exchange(),
        &endpoint_link.get_microservice(),
        &QueueBindOptions::default(),
        &FieldTable::new()
    )
        // Process the errors only when exchange point wasn't created yet
        .or_else(|err| { // Replaced the `map_err` onto `or_else`
            channel_clone.queue_delete(&queue_name_bind_clone, &QueueDeleteOptions::default())
            	.and_then(move |_| {
                	channel_clone.close(200, "Connection closed: Invalid response exchange.")
            	})
           	    .then(move |_| Err(err))
        })
       // All is ready, go to the next step and re-use the channel with the next `.and_then` block 
       .map(|_| channel)
})
.map_err(|err| {
    let message = format!("Error during linking the response queue with exchange. Reason -> {}", err);
    error!("{}", message);
    err
})

It compiles successfully, but nothing happens when occurred an error: queue and channel aren't closed. :confused:

Are you sure queue_bind is actually returning an error? Can you sprinkle some logs/printlns and trace through what's actually being called with what values?

Is not or_else returns success always? I have not checked your code deeply but I got the impression that or_else will return Ok(Err(err))?

I don’t think it’s returning success: Err(err) in the then() will return a future in an error state (with err as the error). There’s an IntoFuture impl for Result that makes this work.

But at any rate, I think @Relrin is saying the or_else isn’t even executed. There might be something silly there that’s hard to spot by looking at these code snippets so a log/println trace would help to visualize what’s actually happening.

Maybe I am missing something, but I can see that or_else returns a future which is not flattened as it would with and_then and this future maybe even not spawned for execution and, when returned, it is seen map_err as a successful result

Maybe I didn't mock this up properly, but here's a sketch that I think roughly mimics the snippets above:

extern crate futures;
extern crate tokio_core;

use futures::prelude::*;
use futures::future::FutureResult;
use tokio_core::reactor::Core;

fn main() {
    let mut core = Core::new().unwrap();

    let main_fut: FutureResult<_, String> = Ok(()).into_future();
    let main_fut = main_fut.and_then(|_| {
        println!("Entering main_fut success case");
        // Pretend queue_bind future resolves with an error
        let queue_bind_fut: FutureResult<(), String> = Err("queue_bind failed".into()).into_future();
        // Do some async work when that error occurs
        queue_bind_fut
            .or_else(|e| {
                // Pretend queue_delete operation (future) succeeds
                let queue_delete_fut: FutureResult<_, ()> = Ok(()).into_future();
                queue_delete_fut.and_then(|_| {
                    println!("queue_delete success");
                    Ok(())
                })
                // Now complete the or_else portion with the original error,
                // yielding a future in an error state
                .then(move |_| Err(e))
            })
    });
    // When we run the whole chain, it'll fail with "queue bind failed" error that was propagated
    core.run(main_fut).unwrap();
}

Playground

1 Like

@vitalyd @avkonst I've done something similar with my code, just added a few println! calls:

.and_then(move |channel| {
    let queue_name_bind_copy = queue_name_bind.clone();
    let channel_copy = channel.clone();

    channel.queue_bind(
        &queue_name_bind,
        &endpoint_link.get_response_exchange(),
        &endpoint_link.get_microservice(),
        &QueueBindOptions::default(),
        &FieldTable::new()
    )
        // Process the errors only when exchange point wasn't created yet
        .or_else(move |err| {
            println!("inside or_else");
            let queue_delete_options = QueueDeleteOptions {
                if_unused: false,
                if_empty: false,
                ..Default::default()
            };

            channel_copy.queue_delete(&queue_name_bind_copy, &queue_delete_options)
                .and_then(move |_| {
                    println!("inside queue_delete");
                    channel_copy.close(200, "Connection closed: Invalid response exchange.")
                })
                .then(move |res| {
                    match res {
                        Ok(_) => println!("OK."),
                        Err(r) => println!("{:?}", r)
                    };
                    println!("inside then");
                    Err(err)
                })
        })
        .map(move |_| {
            println!("inside map");
            channel
        })
})
.map_err(|err| {
    let message = format!("Error during linking the response queue with exchange. Reason -> {}", err);
    error!("{}", message);
    err
})

This is the output from the reverse proxy, when it is trying to connect and bind a queue with not existing exchange point, but the queue is created successfully:

Listening on: 0.0.0.0:8080
inside or_else
Error { repr: Custom(Custom { kind: Other, error: StringError("Could not purge queue: NotConnected") }) }
inside then
[2018-02-02][10:29:51][pathfinder::engine][ERROR] Error during linking the response queue with exchange. Reason -> failed to handle frame: NotConnected
[2018-02-02][10:29:52][pathfinder::engine][ERROR] Error during publishing a message. Reason -> failed to handle frame: NotConnected
[2018-02-02][10:29:52][pathfinder::engine][ERROR] Error during consuming the response message. Reason -> failed to handle frame: NotConnected
[2018-02-02][10:29:52][pathfinder::engine][ERROR] Error during sending a message to the client. Reason -> failed to handle frame: NotConnected
[2018-02-02][10:29:52][pathfinder::engine][ERROR] Error during deleting the queue. Reason -> failed to handle frame: NotConnected
[2018-02-02][10:29:52][pathfinder::engine][ERROR] Error during closing the channel. Reason -> failed to handle frame: NotConnected

Probably, why it isn't closed correctly is the failed queue_delete() future.

Yup, that’s what it looks like. So I think the mechanics with the futures are fine (as far as this example is concerned, at least).