[SOLVED] Chaining two different futures together

Hello everyone!

I'm trying to chaining two different futures together, so that, if we were failing on the certain step, then don't doing anything else and only return an error that have got on the particular stage.

This idea is pretty straightforward, and I had implemented it with the following code:

// Authorize before doing any action. If got an error, the send a message to a client
let auth_future = auth_middleware_local.borrow()
    .process_request(&json_message, &handle_inner)
     // propagate an error to the client
    .map_err(|err| {
        handle_error!(&connections_nested, &addr, engine_nested, err);
        err
    });

// Process the request and put a message in the queue
let rabbitmq_future = engine_local.borrow_mut().handle(
    json_message, &addr, &connections_inner, &handle_inner
)
    // propagate an error to the client
    .map_err(|err| {
        handle_error!(&connections_nested, &addr, engine_nested, err);
        err
    });

let processing_request_future = auth_future
    // working with RabbitMQ only when auth_future was completed with success
    .and_then(move |_| rabbitmq_future)
    .then(move |_| ());

handle_inner.spawn(processing_request_future);

First of all, we're going to check and validate the specified token with using Redis, so that, if we will get an error, then send the response to a client in some format. Otherwise, we're applying transformations for a request and put a message in RabbitMQ queue.

But unfortunately, compiler mentioned that I'm doing it wrong:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/engine/mod.rs:77:9
    |
77  | /         Box::new(client_future
78  | |             // 1. Create a channel
79  | |             .and_then(|client| {
80  | |                 client.create_confirm_channel(ConfirmSelectOptions::default())
...   |
249 | |             })
250 | |         )
    | |_________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the method body at 68:5...
   --> src/engine/mod.rs:68:5
    |
68  | /     pub fn handle(&self, message: Box<JsonValue>, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
69  | |         let url = message["url"].as_str().unwrap();
70  | |         let queue_name = format!("{}", Uuid::new_v4());
71  | |         let event_name = message["event-name"].as_str().unwrap_or("null");
...   |
250 | |         )
251 | |     }
    | |_____^
note: ...so that the type `futures::Then<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, [closure@src/engine/mod.rs:79:23: 81:14]>, [closure@src/engine/mod.rs:82:22: 86:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:99:26: 99:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:89:23: 100:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:101:22: 105:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:116:26: 116:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:108:23: 117:14 queue_name:&std::string::String, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>]>, [closure@src/engine/mod.rs:118:22: 122:14]>, futures::Map<std::boxed::Box<futures::Future<Item=std::option::Option<bool>, Error=std::io::Error>>, [closure@src/engine/mod.rs:154:26: 154:49 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:126:23: 155:14 request_headers:&std::boxed::Box<std::vec::Vec<(std::string::String, std::string::String)>>, queue_name:&std::string::String, event_name:&&str, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>, message:&std::boxed::Box<json::JsonValue>]>, [closure@src/engine/mod.rs:156:22: 160:14]>, futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, futures::Map<futures::MapErr<futures::stream::StreamFuture<futures::stream::Take<lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>>>, [closure@src/engine/mod.rs:173:40: 173:54]>, [closure@src/engine/mod.rs:174:36: 174:83 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:170:31: 175:22 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:163:23: 176:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:177:22: 181:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:191:26: 191:42 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:184:23: 192:14 self:&&engine::Engine, connections:&&std::rc::Rc<std::cell::RefCell<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, client:&&std::net::SocketAddr]>, [closure@src/engine/mod.rs:193:22: 197:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:226:26: 226:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:218:23: 227:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:228:22: 232:14]>, std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:235:23: 237:14]>, [closure@src/engine/mod.rs:238:22: 242:14]>, std::result::Result<(), error::PathfinderError>, [closure@src/engine/mod.rs:244:19: 249:14]>` will meet its required lifetime bounds
   --> src/engine/mod.rs:77:9
    |
77  | /         Box::new(client_future
78  | |             // 1. Create a channel
79  | |             .and_then(|client| {
80  | |                 client.create_confirm_channel(ConfirmSelectOptions::default())
...   |
249 | |             })
250 | |         )
    | |_________^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Future<Item=(), Error=error::PathfinderError> + 'static>, found std::boxed::Box<futures::Future<Item=(), Error=error::PathfinderError>>)
   --> src/engine/mod.rs:77:9
    |
77  | /         Box::new(client_future
78  | |             // 1. Create a channel
79  | |             .and_then(|client| {
80  | |                 client.create_confirm_channel(ConfirmSelectOptions::default())
...   |
249 | |             })
250 | |         )
    | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
   --> src/proxy.rs:120:30
    |
120 |                             .then(move |_| ());
    |                              ^^^^ the trait `futures::Future` is not implemented for `()`
    |
    = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0277]: the trait bound `(): futures::Future` is not satisfied
   --> src/proxy.rs:122:38
    |
122 |                         handle_inner.spawn(processing_request_future);
    |                                      ^^^^^ the trait `futures::Future` is not implemented for `()`
    |
    = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error: aborting due to 3 previous errors

For the first part of the output log, I did't quite understand what is trying say to a compiler, because I've done the same thing for auth_future which is works with Redis. In general, the implementation for generating auth_future is simple:

pub type MiddlewareFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;

// ...
fn process_request(&self, message: &JsonMessage, handle: &Handle) -> MiddlewareFuture {
     // ... some preparation before
    let token_inner = token.clone();
    let validation_struct = self.get_validation_struct();
    let jwt_secret_inner = self.jwt_secret.clone();
    Box::new(
        redis_connection
            // Get the User ID from Redis by the token
            .and_then(move |connection| {
                connection.send::<String>(resp_array!["GET", token])
            })
            // Connection issue or token is already deleted
            .map_err(|_| {
                let message = String::from("Token is expired.");
                PathfinderError::AuthenticationError(message)
            })
            // Extracted user_id used here for additional JWT validation
            .and_then(move |user_id| {
                let mut validation_struct_inner = validation_struct.clone();
                validation_struct_inner.set_audience(&user_id);
                validate(&token_inner, &jwt_secret_inner, &validation_struct)
            })
            // The passed token is expired or has an invalid data
            .map_err(|_| {
                let message = String::from("Token is invalid.");
                PathfinderError::AuthenticationError(message)
            })
            // Drop the result, because everything that we need was done
            .map(|_| ())
    )
}

And the similar things were done for rabbitmq_future:

// Alias for the lapin future type
pub type LapinFuture = Box<Future<Item=lapin::client::Client<AMQPStream>, Error=io::Error> + 'static>;
// Alias for generic future for pathfinder app and RabbitMQ
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;

pub fn handle(&self, message: Box<JsonValue>, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
    let url = message["url"].as_str().unwrap();
    let queue_name = format!("{}", Uuid::new_v4());
    let event_name = message["event-name"].as_str().unwrap_or("null");
    let endpoint = self.router.clone().match_url_or_default(url);

    let request_headers = self.prepare_request_headers(message, endpoint);
    let client_future = self.rabbitmq_client.borrow().get_future(handle);

    Box::new(client_future
        // 1. Create a channel
        .and_then(|client| {
            client.create_confirm_channel(ConfirmSelectOptions::default())
        })
        .map_err(|err| {
            let message = format!("Error during creating a channel: {}", err);
            error!("{}", message);
            err
        })

        // flatten and chained calls with .and_then(|client| ...) and .map_err(|err| ... )
        // ...

        // 9. Close the channel
        .and_then(|channel| {
            channel.close(200, "Close the channel.")
        })
        .map_err(|err| {
            let message = format!("Error during closing the channel: {}", err);
            error!("{}", message);
            err
        })

        // Transforms a LapinFuture to RabbitMQFuture here
        .then(move |result| {
            match result {
                Ok(_) => Ok(()),
                Err(err) => Err(PathfinderError::Io(err))
            }
        })
    )
}

So, I have a two different questions related to the compiler errors, about what probably I'm doing in a wrong way:

  1. Why the stuff that was done for auth_future is correct, but for rabbitmq_future is not? I din't quite understand, why each of type alias, specified with 'static lifetime is correct, but it only works for the first?
  2. What's wrong with the .then(move |_| ()); part of code, if I'm trying to resolve here a final future and return here a Ok(())?
  1. Will let someone else take a look. (too much code)
  2. The .then closure return a Future, use the ok function to make one.
  3. // Authorize before doing any action.
    To me rabbitmq_future is started at the let I'm not sure if your intent is to or not.
    One way to not start is to use future::lazy function.
    More likely to make code more readable, maybe change to;
let rabbitmq_future = auth_future.and_then(|_| {
  engine_local. ...
  ...
});
let processing_request_future = rabbitmq_future.then(|_| ok(()));

If you look at this (very long!) part of the error message:

note: ...so that the type `futures::Then<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<futures::MapErr<futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, [closure@src/engine/mod.rs:79:23: 81:14]>, [closure@src/engine/mod.rs:82:22: 86:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:99:26: 99:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:89:23: 100:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:101:22: 105:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:116:26: 116:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:108:23: 117:14 queue_name:&std::string::String, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>]>, [closure@src/engine/mod.rs:118:22: 122:14]>, futures::Map<std::boxed::Box<futures::Future<Item=std::option::Option<bool>, Error=std::io::Error>>, [closure@src/engine/mod.rs:154:26: 154:49 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:126:23: 155:14 request_headers:&std::boxed::Box<std::vec::Vec<(std::string::String, std::string::String)>>, queue_name:&std::string::String, event_name:&&str, endpoint:&std::boxed::Box<engine::router::endpoint::Endpoint>, message:&std::boxed::Box<json::JsonValue>]>, [closure@src/engine/mod.rs:156:22: 160:14]>, futures::AndThen<std::boxed::Box<futures::Future<Item=lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>, Error=std::io::Error>>, futures::Map<futures::MapErr<futures::stream::StreamFuture<futures::stream::Take<lapin_futures_rustls::lapin::consumer::Consumer<lapin_futures_tls_api::AMQPStream>>>, [closure@src/engine/mod.rs:173:40: 173:54]>, [closure@src/engine/mod.rs:174:36: 174:83 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:170:31: 175:22 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:163:23: 176:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:177:22: 181:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:191:26: 191:42 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:184:23: 192:14 self:&&engine::Engine, connections:&&std::rc::Rc<std::cell::RefCell<std::collections::HashMap<std::net::SocketAddr, futures::sync::mpsc::UnboundedSender<tungstenite::Message>>>>, client:&&std::net::SocketAddr]>, [closure@src/engine/mod.rs:193:22: 197:14]>, futures::Map<std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:226:26: 226:37 channel:lapin_futures_rustls::lapin::channel::Channel<lapin_futures_tls_api::AMQPStream>]>, [closure@src/engine/mod.rs:218:23: 227:14 queue_name:&std::string::String]>, [closure@src/engine/mod.rs:228:22: 232:14]>, std::boxed::Box<futures::Future<Item=(), Error=std::io::Error>>, [closure@src/engine/mod.rs:235:23: 237:14]>, [closure@src/engine/mod.rs:238:22: 242:14]>, std::result::Result<(), error::PathfinderError>, [closure@src/engine/mod.rs:244:19: 249:14]>` will meet its required lifetime bounds

you will notice that there are closures in there that are capturing references to something from their environment. I can see at least queue_name: &String and client: &&SocketAddr as fields of the closures. As such, these closures are not 'static and will make the overall type not satisfy Box<Future<...> + 'static> requirements.

I'm guessing these things are captured by reference in the code you didn't show, so take a look there and see what's up.

Engine, is the part of Proxy struct, as it was specified in struct definition:

pub struct Proxy {
    engine: Rc<RefCell<Box<Engine>>>,
    connections: Rc<RefCell<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    auth_middleware: Rc<RefCell<Box<Middleware>>>,
}

The auth_future and rabbitmq_future calls are placed inside of the processing, like this:

pub fn run(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_local = self.engine.clone();
        let connections_local = self.connections.clone();
        let auth_middleware_local = self.auth_middleware.clone();
        let handle_local = handle.clone();

        accept_async(stream)
            .and_then(move |ws_stream| {
                let (tx, rx) = mpsc::unbounded();
                connections_local.borrow_mut().insert(addr, tx);
                let (sink, stream) = ws_stream.split();

                let handle_inner = handle_local.clone();
                let connections_inner = connections_local.clone();
                let ws_reader = stream.for_each(move |message: Message| {

                  // ...

                   let auth_future = auth_middleware_local.borrow()
                            .process_request(&json_message, &handle_inner);

                    let rabbitmq_future = engine_local.borrow_mut().handle(
                        json_message, &addr, &connections_inner, &handle_inner
                    );

                    let processing_request_future = auth_future
                        .and_then(move |_| rabbitmq_future)
                        .map_err(|err| {
                            handle_error!(&connections_nested, &addr, engine_nested, err);
                            ()
                        });

                     handle_inner.spawn(processing_request_future);
                    Ok(())
               })

               ...
               Ok(())
            });

    // Run the server
    core.run(server).unwrap();
}

All of those things that were mentioned in this topic (calls to generating futures) always will be inner part of processing and won't be generated somewhere else, so that should not outlive other piece of code (like the Engine or Proxy instances). Should I add the 'static lifetimes to each part of Proxy or to do something for this case?

It doesn’t look like the issue is with Proxy. As mentioned, there are closures being defined that are capturing bits of their environment (ie stack arguments). Those closures will make the resulting future not 'static. I think the places where this capture is happening aren’t part of the code snippets you’ve shown thus far.

To avoid this issue, you can either move those environment values into the closure or put them inside something like Rc and move clones of them into the closures.

To avoid this issue, you can either move those environment values into the closure or put them inside something like Rc and move clones of them into the closures.

In the actual moment I have only one issue with environment, that used for working with RabbitMQ:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/engine/mod.rs:68:40
   |
68 |         let transmitter = &connections.borrow_mut()[&client];
   |                                        ^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the method body at 67:5...
  --> src/engine/mod.rs:67:5
   |
67 | /     pub fn handle(&self, message: JsonMessage, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
68 | |         let transmitter = &connections.borrow_mut()[&client];
69 | |
70 | |         let message_nested = message.clone();
...  |
253| |         )
254| |     }
   | |_____^
note: ...so that reference does not outlive borrowed content
  --> src/engine/mod.rs:68:28
   |
68 |         let transmitter = &connections.borrow_mut()[&client];
   |                            ^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Future<Error=error::PathfinderError, Item=()> + 'static>, found std::boxed::Box<futures::Future<Error=error::PathfinderError, Item=()>>)
  --> src/engine/mod.rs:77:9
   |
77 | /         Box::new(client_future
78 | |             // 1. Create a channel
79 | |             .and_then(|client| {
80 | |                 client.create_confirm_channel(ConfirmSelectOptions::default())
...  |
252| |             })
253| |         )
   | |_________^

What the more interesting here, that all of those parameters are coming outside of the scope and which are living more longer that anything else (and yeah, they aren't a static variables, but locating in heap).

In the code I'm just doing the following things:

  1. Passing all of those variables inside an engine call, so that
  • json_message.clone() is Rc<Box<JsonValue>>
  • &addr is &std::net::SocketAddr, when a client have been connected to the server
  • &connections_inner is &Rc<RefCell<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>
  • &handle_inner is &tokio_core::reactor::Handle
let rabbitmq_future = engine_local.borrow().handle(
    json_message.clone(), &addr, &connections_inner, &handle_inner
);
  1. Extracting a transmitter from a hashmap (which is the part of Proxy, if you will look at the previous post) and will try to consume it inside a future later:
pub fn handle(&self, message: JsonMessage, client: &SocketAddr, connections: &ActiveConnections, handle: &Handle) -> RabbitMQFuture {
    let transmitter = &connections.borrow_mut()[&client];
    ....

Honestly, I'm stuck here strong enough and don't how to deal with it. Do you have any ideas how to solve it?
P.S. Also you can look onto the code on GitHub, here, in proxy.rs and engine/mod.rs files.

I’m on mobile right now but a few comments based on skimming that code:

  1. RabbitMQFuture is a type alias for Box<Future<...> + 'static. As mentioned upthread, this future can not contain references to anything other than other 'static references.
  2. If you have a move closure inside which you attempt to clone something that came from its environment (to get an owned value), you need to make sure you move the clone into the closure and not the original. Otherwise, it moves the original and calls clone in it, which isn’t what you want. I think the query_name parameter may have that problem.
  3. The transmitter is a mpsc Sender, which means it’s cloneable. Similar to above, clone it and then move the clone into the closure.

You have long combinator chains - go through them carefully and make sure they’re moving the right thing. If the compiler complains about lifetimes, look at the error message to see what some of the closures are capturing by reference and fix that.

Most of things I fixed (like with a passing and getting transmitter), but were left a couple of issues, which are related to queue_name, endpoint and event_name variables.

For example, lets take the data which is stored in queue_name varable and used inside of closures. In ideally, I'd like to go the following way:

  1. Store in the queue_name variable a unique UUID4 identifier as a string:
pub fn handle(&self, message: JsonMessage, transmitter: mpsc::UnboundedSender<Message>, handle: &Handle) -> RabbitMQFuture {
    // ...
    let queue_name = Rc::new(format!("{}", Uuid::new_v4()));
  1. Use it inside of each closure via using clone(), for example:
.and_then(|channel| {
    let queue_name_parameter = queue_name.clone();
    let queue_declare_options = QueueDeclareOptions {
        passive: false,
        durable: true,
        exclusive: true,
        auto_delete: false,
        ..Default::default()
    };

    channel.queue_declare(&queue_name_parameter, &queue_declare_options, &FieldTable::new())
        .map(|_| channel)
})
.map_err(|err| {
    let message = format!("Error during declaring the queue: {}", err);
    error!("{}", message);
    err
})

But unfortunately compile is failing and returns an error:

error[E0373]: closure may outlive the current function, but it borrows `queue_name`, which is owned by the current function
  --> src/engine/mod.rs:83:23
   |
83 |             .and_then(|channel| {
   |                       ^^^^^^^^^ may outlive borrowed value `queue_name`
84 |                 let queue_name_parameter = queue_name.clone();
   |                                            ---------- `queue_name` is borrowed here
help: to force the closure to take ownership of `queue_name` (and any other referenced variables), use the `move` keyword
   |
83 |             .and_then(move |channel| {
   |                       ^^^^^^^^^^^^^^

And even if well change the signature to move |channel|, it wont help as well:

error[E0382]: capture of moved value: `queue_name`
   --> src/engine/mod.rs:161:23
    |
83  |             .and_then(move |channel| {
    |                       -------------- value moved (into closure) here
...
161 |             .and_then(|(channel, endpoint)| {
    |                       ^^^^^^^^^^^^^^^^^^^^^ value captured here after move
    |
    = note: move occurs because `queue_name` has type `std::rc::Rc<std::string::String>`, which does not implement the `Copy` trait

Clone queue_name outside the closure and then make the closure a move one. So something like this:

let queue_name = Rc:new(...);
let queue_name2 = Rc::clone(&queue_name);
let queue_name3 = Rc::clone(&queue_name);
let future = ... // start building the future chain
.and_then(move |...| // use queue_name here)
.and_then(move |...| // use queue_name2 here)
.and_then(move |...| // use queue_name3 here)

You create as many clones as you need and each closure requiring one moves in its own clone. In the above snippet, I’m also moving the original into a closure, but only after we’ve already cloned off separate copies.

That's make sense to me. Maybe from the some point of view it looks slightly too verbose, but works good. At least code looks much more readable right now. Thank you so much! :slight_smile:

The way closure captures are done in Rust is both good and bad, IMO. The good is that there aren’t many options (eg no capture lists and capture types like C++) and compiler ensures you don’t have lifetimes mismatches. The bad is - there aren’t many options :slight_smile:. A good rule of thumb is that the compiler will prefer to capture references, so if you need closures that don’t borrow from their environment you’ll need to move clones into them.

The other (related to above) aspect here is futures and building up a chain of them via combinators. It’s helpful to think of futures as recipes of what should happen once they’re actually scheduled and polled. So when you’re building up a future chain, you’re just making this recipe. Part of building that recipe requires capturing some parts of the environment so that the future can use it later on. So although some code may look like it’s cloning something right there and then, it’s actually calling that clone much later and against a reference that the closure captured. This can get particularly hairy if you have long chains, with nested chains and nested closures. So it’s important to carefully look at what exactly the closures are using from their environment and whether the clones are cloned in the right places. This will come with more tokio/futures experience though.

Also, worth mentioning that once you hand off a value to a closure (future), that closure can yield that value back (if appropriate); this would allow you to pass that value to the next closure in the chain as an argument, rather than each closure grabbing their own clone separately. Of course which is better, or even feasible, depends on the circumstances.

1 Like