Initializing a struct in existing event loop

I’ve rewritten the existing RabbitMQ client, so that it contains only two methods for now:

  • connect(uri: &AMQPUri) that returns a new instance of RabbitMQ client with initialized LapinClient instance, otherwise an error
  • get_channel(&self) creates a new channel that re-using an existing LapinClient instance.
/// Alias for the lapin client with TLS.
pub type LapinClient = Client<TcpStream>;
/// Alias for the lapin channel.
pub type LapinChannel = Channel<TcpStream>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static>;

/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient {
    uri: AMQPUri,
    client: Arc<LapinClient>
}

impl RabbitMQClient {
    /// Initializes the inner fields of RabbitMQ client for future usage.
    pub fn connect(uri: &AMQPUri)
        -> impl Future<Item=Self, Error=io::Error> + Sync + Send + 'static
    {
        let address = get_address_to_rabbitmq(uri);
        let uri_local = uri.clone();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(uri_local))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| error!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| RabbitMQClient { uri: uri_local, client: Arc::new(client) })
                .map_err(|error| {
                    error!("Occurred an error during spawning heartbeat future: {:?}", error);
                    io::Error::new(io::ErrorKind::Other, "Spawn error.")
                })
        })
    }

    /// Returns a lapin channel as future, based on the lapin client instance.
    pub fn get_channel(&self)
        -> impl Future<Item=LapinChannel, Error=io::Error> + Sync + Send + 'static
    {
        let client = self.client.clone();
        client.create_confirm_channel(ConfirmSelectOptions::default())
    }
}

The last thing to me that was left is how to fix a lifetime error in futures chain:

/// Run the server on the specified address and port.
    pub fn run(&self, address: SocketAddr) {
        let listener = TcpListener::bind(&address).unwrap();
        info!("Listening on: {}", address);

        // Run the server
        let server_future = self.get_rabbitmq_client()
            .map_err(|error| {
                error!("Lapin error: {:?}", error);
                ()
            })
            .and_then(|rabbitmq: Arc<RabbitMQClient>|
                self.request_handler(&listener, rabbitmq)
                    .map_err(|_error| ())
            );
        run(server_future);
    }

    fn get_rabbitmq_client(&self)
        -> impl Future<Item=Arc<RabbitMQClient>, Error=PathfinderError> + Sync + Send + 'static
    {
        let amqp_uri = self.amqp_uri.clone();
        RabbitMQClient::connect(amqp_uri.as_ref())
            .map(|client| Arc::new(client))
            .map_err(|error| {
                error!("Error in RabbitMQ Client. Reason: {:?}", error);
                PathfinderError::Io(error)
            })
    }

    fn request_handler(&self, listener: &TcpListener, rabbitmq: Arc<RabbitMQClient>)
        -> impl Future<Item=(), Error=std::io::Error> + Sync + Send + 'static
    {
      // ...
    }

Do you have any ideas how to prolong the lifetime of rabbitmq, so that it can be used later?

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements                                                                           
  --> src/proxy.rs:75:23                                                                                                                                     
   |                                                                                                                                                         
75 |               .and_then(|rabbitmq| {                                                                                                                    
   |  _______________________^                                                                                                                               
76 | |                 self.request_handler(&listener, rabbitmq)                                                                                             
77 | |                     .map_err(|_error| ())                                                                                                             
78 | |             });                                                                                                                                       
   | |_____________^                                                                                                                                         
   |                                                                                                                                                         
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 65:5...                                                     
  --> src/proxy.rs:65:5                                                                                                                                      
   |                                                                                                                                                         
65 | /     pub fn run(&self, address: SocketAddr) {                                                                                                          
66 | |         let listener = TcpListener::bind(&address).unwrap();                                                                                          
67 | |         info!("Listening on: {}", address);                                                                                                           
68 | |                                                                                                                                                       
...  |                                                                                                                                                       
79 | |         run(server_future);                                                                                                                           
80 | |     }                                                                                                                                                 
   | |_____^                                                                                                                                                 
   = note: ...so that the types are compatible:                                                                                                              
           expected &&proxy::Proxy                                                                                                                           
              found &&proxy::Proxy                                                                                                                           
   = note: but, the lifetime must be valid for the static lifetime...                                                                                        
note: ...so that the type `futures::AndThen<futures::MapErr<impl std::marker::Send+std::marker::Sync+futures::Future, [closure@src/proxy.rs:71:22: 74:14]>, futures::MapErr<impl std::marker::Send+std::marker::Sync+futures::Future, [closure@src/proxy.rs:77:30: 77:41]>, [closure@src/proxy.rs:75:23: 78:14 self:&&proxy::Proxy, listener:&tokio_tcp::TcpListener]>` will meet its required lifetime bounds
  --> src/proxy.rs:79:9                                                                                                                                      
   |                                                                                                                                                         
79 |         run(server_future);                                                                                                                             
   |         ^^^                                                                                                                                                                          

Do you have any ideas how to prolong the lifetime of rabbitmq , so that it can be used later?

Uh, I’ve finally figured out what the real issues were in the recent post:

  1. Is the issue with borrowing inside of the RabbitMQClient::connect method. For make it safe we will need to declare a bunch of copies of AMQPUri for using them inside of each nested future. An another approach to deal with it is to remove the uri field for this struct, because it only used for instantiating RabbitMQClient and nowhere else, using it only inside of Client::connect(...) call. So, the correct code must looks like this one:
/// Initializes the inner fields of RabbitMQ client for future usage.
pub fn connect(uri: &AMQPUri)
    -> impl Future<Item=Self, Error=io::Error> + Sync + Send + 'static
{
    let address = get_address_to_rabbitmq(uri);
    let uri_local = uri.clone();
    let uri_inner = uri.clone();

    TcpStream::connect(&address).and_then(|stream| {
        Client::connect(stream, ConnectionOptions::from_uri(uri_local))
    })
    .and_then(move |(client, heartbeat)| {
        spawn(heartbeat.map_err(|err| error!("Heartbeat error: {:?}", err)))
            .into_future()
            .map(|_| RabbitMQClient { uri: uri_inner, client: Arc::new(client) })
            .map_err(|error| {
                error!("Occurred an error during spawning heartbeat future: {:?}", error);
                io::Error::new(io::ErrorKind::Other, "Spawn error.")
            })
    })
}
  1. The similar issue but with the TcpListener instance. Because the listener was passed inside, we have no guarantees that everything will work as expected, it can be destroyed in the any moment. For solving this issue the developer could wrap the TcpListener instance into something like Rc/Arc (depends on what you’re expecting from the code) or move the code back into Proxy::run function.