Returning futures


#1

Hi

I’m using lapin_futures to connect to a Rabbitmq broker. I’m creating a higher level wrapper module that will, for example, only allow me to publish a message with the correct type expected by my application.

Currently the wrapper code looks like this:

pub struct Rabbitmq<'a> {
    channel: &'a Channel<TcpStream>,
}

impl<'a> Rabbitmq<'a> {
    pub fn publish<T: serde::Serialize>(
        &self,
        exchange: &str,
        queue: &str,
        message: &Message<T>
    ) -> Box<Future<Error=Error, Item=Option<bool>> + 'static>
    {
        let ser = serde_json::to_string(message).unwrap();
        self.channel.basic_publish(
            exchange,
            queue,
            ser.as_bytes(),
            &BasicPublishOptions::default(),
            BasicProperties::default()
        )   
    }
}

which is used in main() like this:

    let future = TcpStream::connect(&addr, &handle).and_then(|stream| {
        Client::connect(stream, &ConnectionOptions::default())
    }).and_then(|(client, _)| {
        client.create_channel()
    }).and_then(|channel| {
        let broker = Rabbitmq::new(&channel);
        broker.publish("x", "hello", &message)
    });

    core.run(future).unwrap();

This works fine. However, If I try to move the create_channel part into the publish method, I can no longer make it work:

pub struct Rabbitmq<'a> {
    client: &'a Client<TcpStream>,
}

impl<'a> Rabbitmq<'a> {
    pub fn publish<T: serde::Serialize>(
        &self,
        exchange: &str,
        queue: &str,
        message: &Message<T>
    ) ->  Box<Future<Error=Error, Item=Option<bool>> + 'static>
    {
        let ser = serde_json::to_string(message).unwrap();
        Box::new(self.client.create_channel().and_then(|channel| {
            channel.basic_publish(
                exchange,
                queue,
                ser.as_bytes(),
                &BasicPublishOptions::default(),
                BasicProperties::default()
            )
        }))
    }
}

This gives me the following error: cannot infer an appropriate lifetime due to conflicting requirements.

I’m just starting with both Rust and Tokio, so basically I’m into trial and error mode, trying to clone and/or Box things but nothing seems to work.

Can anyone give me a hand?

Cheers,
Andre


#2

The difference basically boils down to

  • in your first future, every reference that is not done being used by the time the future is returned is cloned
  • in your second future, every reference needs to survive past the first create_channel call to be used in the and_then closure

The references you are giving to publish in the second example are not 'static, so you cannot return a boxed future that has a static lifetime (edit: unless you either a. don’t use the reference in the returned future, or b. get ownership over a copy of the referent)


#3

Makes sense. Changing exchange and queue to String (or calling to_string() inside publish) fixed it.

Thanks!


#4

Glad I was able to help! As an aside, your first example (chaining the future with .and_then) is likely more efficient than cloning inside publish. The lapin_futures api you are using clones some of the &strs you pass it, so if you clone in publish, you’re effectively double cloning.