Initializing a struct in existing event loop

Hello everyone!

I'm trying to deal with a case when necessary to establish a new connection with remote RabbitMQ node only once, so that potential users could use separate channels (in the scope of the same connection). But I don't know how to reach that in asynchronous code properly. So, I have a couple of questions:

  1. How can I guaranteed that init() will be called only once, so that it will returns always the same connection during execution in event loop? Potentially, it can be done in constructor, however, I'm not sure that possible right now, especially in case when I need to establish the single connection with RabbitMQ node before creating new channels for clients.
  2. Does the tokio provide ways to assign setup hooks for initializing an object (or maybe even certain fields) before the start? Or it can be done somehow else?

In general, I see it something like this, though this code won't compile:

use amq_protocol::uri::{AMQPUri};
use futures::future::{Future};
use lapin_futures_rustls::{AMQPConnectionRustlsExt};
use lapin_futures_rustls::lapin::channel::{Channel, ConfirmSelectOptions};
use lapin_futures_rustls::lapin::client::{Client, ConnectionOptions};
use lapin_futures_tls_internal::{AMQPStream};
use tokio::reactor::{Handle};
use tokio_current_thread::{spawn};
use tokio_io::{AsyncRead};
use tokio_tcp::{TcpStream};
use tokio_tls_api::{TlsStream};


/// Alias for the Lapin client with TLS.
pub type LapinClient = Client<AMQPStream<TlsStream<TcpStream>>>;
/// Alias for the lapin future type.
pub type LapinFuture = Box<Future<Item=LapinClient, Error=io::Error> + 'static>;
/// Alias for the lapin's channel.
pub type LapinChannelFuture = Box<Future<
    Item=Channel<AsyncRead + Send + Sync + 'static>,
    Error=io::Error
> + Send + 'static>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;


/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient
{
    uri: AMQPUri,
    client: Option<LapinFuture>
}


impl RabbitMQClient {
    pub fn new(cli: &CliOptions) -> RabbitMQClient {
        let schema = match cli.rabbitmq_secured {
            true => "amqps",
            false => "amqp",
        };

        let uri = format!(
            "{}://{}:{}@{}:{}/{}",
            schema.to_string(),
            cli.rabbitmq_username.clone(),
            cli.rabbitmq_password.clone(),
            cli.rabbitmq_host.clone(),
            cli.rabbitmq_port,
            cli.rabbitmq_virtual_host.clone()
        );

        RabbitMQClient {
            uri: uri.parse().unwrap(),
            client: None
        }
    }

    // Can it be used in the `new()` instead?
    pub fn init(&mut self) {
        self.client = self.get_client();
    }

    pub fn get_channel(&self) -> LapinChannelFuture {
        Box::new(
            self.client.unwrap().and_then(move |client| {
                client.create_confirm_channel(ConfirmSelectOptions::default())
            })
        )
    }

    fn get_client(&mut self) -> LapinFuture {
        let address = self.get_address_to_rabbitmq().parse().unwrap();
        Some(
            TcpStream::connect(&address).and_then(|stream| {
                Client::connect(stream, ConnectionOptions::from_uri(self.uri))
            })
            .and_then(|(client, heartbeat)| {
                spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                    .into_future()
                    .map(|_| client)
                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
            })
        )
    }

    fn get_address_to_rabbitmq(&self) -> String {
        format!("{}:{}", self.uri.authority.host, self.uri.authority.port)
    }
}

If I understood your question right, you may want to follow a similar model to TcpStream::connect that you use inside get_client(); that is, RabbitMQClient::new() becomes RabbitMQClient::connect(), and returns a Future<Item = Self, Error = ...>. So there's no "synchronous" constructor anymore, just an associated function that yields a future that resolves when the tcp connection is established (or an error happens). The caller is then in charge of executing this future on an executor, and chaining on continuations.

1 Like

The TcpStream::connect is pretty close to the expected behaviour that I'd like to use, except one thing: a new connection will be created per each new client, that call this method, where for RabbitMQ more preferable is using the existing connection and creating a new channel for each client / request instead.

I'm expecting that somehow can initialize the RabbitMQ structure only once, where the client field is storing lapin client with a connection (as futures?), so that I could create new futures with reusing that one.

Also recently I've got an idea how it can be implemented:

  1. Create a future that will initialize RabbitMQ client, with calling init() method.
  2. Chain the previous future with an actual server logic, that handling user request through websockets (in my case I'm using tokio-tungstenite for it)

In pseudocode it could look like this:

extern crate futures;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;

use std::collections::HashMap;
use std::env;
use std::io::{Error, ErrorKind};
use std::sync::{Arc,RwLock};

use futures::stream::Stream;
use futures::Future;
use tokio::net::TcpListener;
use tungstenite::protocol::Message;

use tokio_tungstenite::accept_async;

fn main() {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse().unwrap();
    let socket = TcpListener::bind(&addr).unwrap();
    
    let cli = Cli.parse(); // Getting options from CLI
    let rabbitmq = Arc::new(Box::new(RabbitMQClient::new(&cli)));

    let server = |rabbitmq: RabbitMQ| ->
        socket.incoming().for_each(move |stream| {

        accept_async(stream).and_then(move |ws_stream| {
            let channel = rabbitmq.clone().read().unwrap().get_channel();
            
            /// Some logic ...
            
            Ok(())
        }).map_err(|e| {
            println!("Error during the websocket handshake occurred: {}", e);
            Error::new(ErrorKind::Other, e)
        })
    });

    fn init_rabbitmq() -> impl Future<Item=Box<RabbitMQ>, Error=Box<Error>> {
        rabbitmq.init()
        Ok(rabbitmq)
    }

    tokio::runtime::run(init_rabbitmq.and_then(|rabbitmq| server(rabbitmq)).map_err(|_e| ()));
}

However two questions raised for this method:

  1. Does it looks good for this particular solution and idiomatic in Rust?
  2. How it can be implemented better?

lapin_futures::client::Client seems to be Clone if the underlying transport is Send, which I think is true for your case. So going back to my original suggestion, create a future that resolves to the connected client. Then chain on a continuation that accepts tcp connections, and give each connection handler a clone of the Client that it can use to create channels or do whatever it needs. All the cloned client instances should be (I’m unfamiliar with lapin so double check) sharing the same underlying transport mechanism so you get the connection reuse you’re seeking.

Currently I'm trying to implement this idea. So, now it contains the following code:

use std::io;
use std::marker::{PhantomData};

use amq_protocol::uri::{AMQPUri};
use futures::future::{Future};
use lapin_futures_rustls::{AMQPConnectionRustlsExt};
use lapin_futures_rustls::lapin::channel::{Channel, ConfirmSelectOptions};
use lapin_futures_rustls::lapin::client::{Client, ConnectionOptions};
use lapin_futures_tls_internal::{AMQPStream as LapinAMQPStream};
use tokio_current_thread::{spawn};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpStream};
use tokio_tls_api::{TlsStream};

use super::super::cli::{CliOptions};
use super::super::error::{PathfinderError};


/// Alias for the lapin AMQP stream.
pub type AMQPStream = LapinAMQPStream<TlsStream<TcpStream>>;
/// Alias for the lapin client with TLS.
pub type LapinClient = Client<AMQPStream>;
/// Alias for the lapin future type.
pub type LapinFuture = Box<Future<Item=LapinClient, Error=io::Error> + 'static>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;


/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient<'a, T: 'a>
{
    uri: AMQPUri,
    client: Option<LapinClient>,
    phantom: PhantomData<&'a T>
}


impl<'a, T: AsyncRead + AsyncWrite + Send + Sync + 'static> RabbitMQClient<'a, T> {
    pub fn new(cli: &CliOptions) -> RabbitMQClient<'a, T> {
        let schema = match cli.rabbitmq_secured {
            true => "amqps",
            false => "amqp",
        };

        let uri = format!(
            "{}://{}:{}@{}:{}/{}",
            schema.to_string(),
            cli.rabbitmq_username.clone(),
            cli.rabbitmq_password.clone(),
            cli.rabbitmq_host.clone(),
            cli.rabbitmq_port,
            cli.rabbitmq_virtual_host.clone()
        );

        RabbitMQClient {
            uri: uri.parse().unwrap(),
            client: None,
            phantom: PhantomData
        }
    }

    pub fn init(&mut self) {
        self.client = Some(self.create_client());
    }

    pub fn get_channel(&self) -> Box<Future<Item=Channel<AMQPStream>, Error=io::Error> + Send + 'static> {
        Box::new(
            self.client.unwrap().create_confirm_channel(ConfirmSelectOptions::default())
        )
    }

    fn get_client(&mut self) -> Option<LapinClient> {
        match self.client {
            Some(client) => Some(client),
            None => {
                self.client = Some(self.create_client());
                self.client
            }
        }
    }

    fn create_client(&self) -> LapinClient {
        let address = self.get_address_to_rabbitmq().parse().unwrap();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(self.uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }

    /// Generates a connection URL to RabbitMQ broker.
    fn get_address_to_rabbitmq(&self) -> String {
        format!("{}:{}", self.uri.authority.host, self.uri.authority.port)
    }
}

In other parts of my code base, I'm going to wrap this struct into Arc<RwLock>, so that it will thread-safe, because the struct will be initialized in the "setup" future only once:

/// A reverse proxy application.
pub struct Proxy<'a, T: 'a> {
    rabbitmq_client: Arc<RwLock<Box<RabbitMQClient<'a, T>>>>
}

impl<'a, T: AsyncRead + AsyncWrite + Send + Sync + 'static> Proxy<'a, T> {

    pub fn new(cli: &CliOptions) -> Proxy<'a, T> {
        let rabbitmq_client = Box::new(RabbitMQClient::new(cli));

        Proxy {
            rabbitmq_client: Arc::new(RwLock::new(rabbitmq_client))
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let handle = Handle::default();
        let listener = TcpListener::bind(&address).unwrap();
        println!("Listening on: {}", address);

        let init = future::lazy(|| -> FutureResult<(), PathfinderError> {
            self.rabbitmq_client.clone().write().unwrap().init();
            future::ok::<(), PathfinderError>(())
        });

        let server = listener.incoming().for_each(move |stream| {
            // server logic here ...
        });

        // Run the server
        let server_future = init
            .map_err(|_err| ())
            .and_then(|_| server.map_err(|_err| ()));
        run(server_future);
    }
}

However, for the RabbitMQ client I'm getting the compiling error:

error[E0599]: no method named `into_future` found for type `()` in the current scope
  --> src/rabbitmq/client.rs:96:18
   |
96 |                 .into_future()
   |                  ^^^^^^^^^^^

that happens in RabbitMQClient::create_client. Any ideas how to fix this compile error?

I didn't look super closely, but isn't this because you have one extra parenthesis in the spawn line:

.and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err))) <== HERE
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })

This ends up trying to call into_future on the return value of spawn, which is () (i.e. it doesn't return anything): tokio_current_thread::spawn - Rust

If look into the one of the lapin-futures examples we can see that the code above the same (and a count of parentheses as well). In the case of removing the mentioned round bracket and add wrap all this chain into spawn call, we're getting the following errors:

error[E0599]: no method named `into_future` found for type `futures::MapErr<lapin_futures_rustls::lapin::client::Heartbeat<impl std::marker::Send+futures::Future>, [closure@src/rabbitmq/client.rs:93:37: 93:82]>` in the current scope
  --> src/rabbitmq/client.rs:94:18
   |
94 |                 .into_future()
   |                  ^^^^^^^^^^^
   |
   = help: items from traits can only be used if the trait is in scope
   = note: the following trait is implemented but not in scope, perhaps add a `use` for it:
           `use futures::IntoFuture;`

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/rabbitmq/client.rs:92:10
   |
92 |         .and_then(|(client, heartbeat)| {
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0308]: mismatched types
  --> src/rabbitmq/client.rs:89:9
   |
86 |       fn create_client(&self) -> LapinClient {
   |                                  ----------- expected `lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_internal::AMQPStream<tokio_tls_api::TlsStream<tokio_tcp::TcpStream>>>` because of return type
...
89 | /         TcpStream::connect(&address).and_then(|stream| {
90 | |             Client::connect(stream, ConnectionOptions::from_uri(self.uri))
91 | |         })
92 | |         .and_then(|(client, heartbeat)| {
...  |
96 | |                 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error.")))
97 | |         })
   | |__________^ expected struct `lapin_futures_rustls::lapin::client::Client`, found struct `futures::AndThen`
   |
   = note: expected type `lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_internal::AMQPStream<tokio_tls_api::TlsStream<tokio_tcp::TcpStream>>>`
              found type `futures::AndThen<futures::AndThen<tokio_tcp::ConnectFuture, impl std::marker::Send+futures::Future, [closure@src/rabbitmq/client.rs:89:47: 91:10 self:_]>, (), [closure@src/rabbitmq/client.rs:92:19: 97:10]>`

That example is using tokio::spawn(), which returns a Spawn instance which implements IntoFuture - your code is using tokio_current_thread::spawn(), which is a different API.

Shouldn't your create_client() be returning a LapinFuture? Or even if you don't use that particular type alias, it should be returning impl Future<Item = LapinClient, ...>, and not LapinClient directly.

1 Like

Hm, then probably it has a sense to replace that call into tokio::spawn instead.

Anyway, the create_client method looks like this for now:

use tokio::executor::{spawn};
// ...

/// Alias for the lapin AMQP stream.
pub type AMQPStream = LapinAMQPStream<TlsStream<TcpStream>>;
/// Alias for the lapin client with TLS.
pub type LapinClient = Client<AMQPStream>;
/// Alias for the lapin future type.
pub type LapinFuture = Box<Future<Item=LapinClient, Error=io::Error> + 'static>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;

// ....

    fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'static {
        let address = self.get_address_to_rabbitmq().parse().unwrap();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(self.uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }

Shouldn’t your create_client() be returning a LapinFuture ? Or even if you don’t use that particular type alias, it should be returning impl Future<Item = LapinClient, ...> , and not LapinClient directly.

Probably it must be as the Client<TcpStream> as type in associate type, because the code above produces an error during the compilation:

error[E0271]: type mismatch resolving `<futures::AndThen<futures::AndThen<tokio_tcp::ConnectFuture, impl std::marker::Send+futures::Future, [closure@src/rabbitmq/client.rs:90:47: 92:10 self:_]>, futures::MapErr<futures::Map<futures::FutureResult<(), ()>, [closure@src/rabbitmq/client.rs:96:22: 96:32 client:_]>, [closure@src/rabbitmq/client.rs:97:26: 97:82]>, [closure@src/rabbitmq/client.rs:93:19: 98:10]> as futures::Future>::Item == lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_internal::AMQPStream<tokio_tls_api::TlsStream<tokio_tcp::TcpStream>>>`
  --> src/rabbitmq/client.rs:87:32
   |
87 |     fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'static {
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `tokio_tcp::TcpStream`, found enum `lapin_futures_tls_internal::AMQPStream`
   |
   = note: expected type `lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>`
              found type `lapin_futures_rustls::lapin::client::Client<lapin_futures_tls_internal::AMQPStream<tokio_tls_api::TlsStream<tokio_tcp::TcpStream>>>`
   = note: the return type of a function must have a statically known size

Ah yes. Basically, it has to be impl Future<Item = X, ...> where X is the actual type you're producing there :slight_smile:. I'm not sure if you actually want to produce a Client<AMQPStream>, and therefore the code there is missing some stuff or Client<TcpStream> is what's intended. Sorry, I don't know the lapin and amqp libs at all :frowning:

Currently I'm dealing with lifetime issues around generated futures with this code:

/// Alias for the lapin AMQP stream.
pub type AMQPStream = LapinAMQPStream<TlsStream<TcpStream>>;
/// Alias for the lapin client with TLS.
pub type LapinClient = Client<TcpStream>;
/// Alias for the lapin future type.
pub type LapinFuture = Box<Future<Item=LapinClient, Error=io::Error> + 'static>;
/// Alias for generic future for pathfinder and RabbitMQ.
pub type RabbitMQFuture = Box<Future<Item=(), Error=PathfinderError> + 'static>;


/// A future-based asynchronous RabbitMQ client.
pub struct RabbitMQClient<'a>
{
    uri: AMQPUri,
    client: Option<Box<Future<Item=LapinClient, Error=io::Error> + 'a>>,
}


impl<'a> RabbitMQClient<'a> {
    /// Returns a new instance of `RabbitMQClient`.
    pub fn new(cli: &CliOptions) -> RabbitMQClient<'a> {
        let schema = match cli.rabbitmq_secured {
            true => "amqps",
            false => "amqp",
        };

        let uri = format!(
            "{}://{}:{}@{}:{}/{}",
            schema.to_string(),
            cli.rabbitmq_username.clone(),
            cli.rabbitmq_password.clone(),
            cli.rabbitmq_host.clone(),
            cli.rabbitmq_port,
            cli.rabbitmq_virtual_host.clone()
        );

        RabbitMQClient {
            uri: uri.parse().unwrap(),
            client: None
        }
    }

    pub fn init(&mut self) {
        self.client = Some(self.create_client());
    }

    pub fn get_channel(&self) -> Box<Future<Item=Channel<AMQPStream>, Error=io::Error> + Send + 'a> {
        Box::new(
            self.client.unwrap().create_confirm_channel(ConfirmSelectOptions::default())
        )
    }

    fn get_client(&mut self) -> Option<Box<Future<Item=LapinClient, Error=io::Error> + 'a>> {
        match self.client {
            Some(client) => Some(client),
            None => {
                self.client = Some(Box::new(self.create_client()));
                self.client
            }
        }
    }

    fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'a {
        let address = self.get_address_to_rabbitmq().parse().unwrap();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(self.uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }

    /// Generates a connection URL to RabbitMQ broker.
    fn get_address_to_rabbitmq(&self) -> String {
        format!("{}:{}", self.uri.authority.host, self.uri.authority.port)
    }
}

That will lead to errors:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/rabbitmq/client.rs:86:47
   |
86 |           TcpStream::connect(&address).and_then(|stream| {
   |  _______________________________________________^
87 | |             Client::connect(stream, ConnectionOptions::from_uri(self.uri))
88 | |         })
   | |_________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 83:5...
  --> src/rabbitmq/client.rs:83:5
   |
83 | /     fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'a {
84 | |         let address = self.get_address_to_rabbitmq().parse().unwrap();
85 | |
86 | |         TcpStream::connect(&address).and_then(|stream| {
...  |
94 | |         })
95 | |     }
   | |_____^
   = note: ...so that the types are compatible:
           expected &&rabbitmq::client::RabbitMQClient<'_>
              found &&rabbitmq::client::RabbitMQClient<'a>
note: but, the lifetime must be valid for the lifetime 'a as defined on the impl at 39:6...
  --> src/rabbitmq/client.rs:39:6
   |
39 | impl<'a> RabbitMQClient<'a> {
   |      ^^
note: ...so that the type `futures::AndThen<futures::AndThen<tokio_tcp::ConnectFuture, impl std::marker::Send+futures::Future, [closure@src/rabbitmq/client.rs:86:47: 88:10 self:&&rabbitmq::client::RabbitMQClient<'_>]>, futures::MapErr<futures::Map<futures::FutureResult<(), ()>, [closure@src/rabbitmq/client.rs:92:22: 92:32 client:lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>]>, [closure@src/rabbitmq/client.rs:93:26: 93:82]>, [closure@src/rabbitmq/client.rs:89:19: 94:10]>` will meet its required lifetime bounds
  --> src/rabbitmq/client.rs:83:32
   |
83 |     fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'a {
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

As far as I understood, the compiler implies that the futures in create_client() can outlive the existing struct, isn't it? However, in real world it has no sense, because without this struct it won't receive the address (or other related things) which could help with connecting to the certain RabbitMQ node. Does it have a sense to replace it on something else (i.e. 'static)?

I don't think RabbitMQClient should have a lifetime parameter - client should be holding some Future impl that's 'static. You should clone() the uri field inside create_client and move it into the closure so that you don't need to capture a reference to self. Something like:

fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'a {
        let address = self.get_address_to_rabbitmq().parse().unwrap();
        let uri = self.uri.clone();
        TcpStream::connect(&address).and_then(move |stream| {
            Client::connect(stream, ConnectionOptions::from_uri(uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }

If uri is expensive to clone and this happens often (unlikely since you're trying to reuse a single client, right?), you can consider putting it into an Rc (or Arc if you need Send ability) and cloning that instead.

1 Like

If uri is expensive to clone and this happens often (unlikely since you’re trying to reuse a single client, right?), you can consider putting it into an Rc (or Arc if you need Send ability) and cloning that instead.

That's a good point and make sense to wrap it into Rc or Arc. Probably I will do it later, because the URI is used only once (for the first initialization of the connection).

The last the probably left to me is to figure out, how to make the future with channel as thread-safe:

    pub fn get_channel(&self) -> Box<Future<Item=LapinChannel, Error=io::Error> + Send + 'static> {
        Box::new(
            self.client.unwrap().and_then(|client| {
                client.create_confirm_channel(ConfirmSelectOptions::default())
            })
        )
    }

    fn get_client(&mut self) -> Option<Box<Future<Item=LapinClient, Error=io::Error> + 'static>> {
        match self.client {
            Some(client) => Some(client),
            None => {
                self.client = Some(Box::new(self.create_client()));
                self.client
            }
        }
    }

    fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + 'static {
        let address = self.get_address_to_rabbitmq().parse().unwrap();
        let uri = self.uri.clone();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }

Although the create_confirm_channel is marked with Send and the 'static static lifetimes it produces an error:

error[E0277]: `dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>` cannot be sent between threads safely
  --> src/rabbitmq/client.rs:64:9
   |
64 | /         Box::new(
65 | |             self.client.unwrap().and_then(|client| {
66 | |                 client.create_confirm_channel(ConfirmSelectOptions::default())
67 | |             })
68 | |         )
   | |_________^ `dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>` cannot be sent between threads safely
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>>`
   = note: required because it appears within the type `std::boxed::Box<dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>>`
   = note: required because it appears within the type `futures::future::chain::Chain<std::boxed::Box<dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>>, impl std::marker::Send+futures::Future, [closure@src/rabbitmq/client.rs:65:43: 67:14]>`
   = note: required because it appears within the type `futures::AndThen<std::boxed::Box<dyn futures::Future<Item=lapin_futures_rustls::lapin::client::Client<tokio_tcp::TcpStream>, Error=std::io::Error>>, impl std::marker::Send+futures::Future, [closure@src/rabbitmq/client.rs:65:43: 67:14]>`
   = note: required for the cast to the object type `dyn futures::Future<Item=lapin_futures_rustls::lapin::channel::Channel<tokio_tcp::TcpStream>, Error=std::io::Error> + std::marker::Send`

Probably it happens because client was captured in and_then, but I'm not sure.

You just need to indicate that the client field in RabbitMQClient is holding a Send future, e.g.:

struct RabbitMQClient {
    uri: AMQPUri,
    client: Option<Box<Future<Item=LapinClient, Error=io::Error> + Send>>,
}
1 Like

Almost done with it. The last thing that I'm trying to fix is returning a new future, so that it won't borrow the self.client itself.

I added a couple of extra type aliases for a better code readability:

/// Alias for the lapin client with TLS.
pub type LapinClient = Client<TcpStream>;
/// Alias for the lapin channel.
pub type LapinChannel = Channel<TcpStream>;
/// Alias type for lapin client future.
pub type LapinClientFuture = Arc<Box<Future<Item=LapinClient, Error=io::Error> + Sync + Send + 'static>>;
/// Alias type for lapin channel future.
pub type LapinChannelFuture = Box<Future<Item=LapinChannel, Error=io::Error> + Sync + Send + 'static>;

Currently the RabbitMQ client looks like this:

pub struct RabbitMQClient
{
    uri: AMQPUri,
    client: Option<LapinClientFuture>,
}


impl RabbitMQClient {
    pub fn new(cli: &CliOptions) -> RabbitMQClient {
        // ...
    }

    pub fn init(&mut self) {
        self.client = self.get_client();
    }

    pub fn get_channel(&self) -> LapinChannelFuture {
        let client = self.client.clone().unwrap().clone();

        Box::new(
            client.and_then(|client| {
                client.create_confirm_channel(ConfirmSelectOptions::default())
            })
        )
    }

    fn get_client(&mut self) -> Option<LapinClientFuture> {
        let nullable_client = self.client.clone();
        match nullable_client {
            Some(client) => Some(client),
            None => Some(Arc::new(Box::new(self.create_client())))
        }
    }

    fn create_client(&self) -> impl Future<Item=LapinClient, Error=io::Error> + Sync + Send + 'static {
        let address = self.get_address_to_rabbitmq().parse().unwrap();
        let uri = self.uri.clone();

        TcpStream::connect(&address).and_then(|stream| {
            Client::connect(stream, ConnectionOptions::from_uri(uri))
        })
        .and_then(|(client, heartbeat)| {
            spawn(heartbeat.map_err(|err| eprintln!("Heartbeat error: {:?}", err)))
                .into_future()
                .map(|_| client)
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Spawn error."))
        })
    }
    
    // ...
}

But I'm getting an error from the compiler side about borrowing:


error[E0507]: cannot move out of borrowed content
  --> src/rabbitmq/client.rs:73:13
   |
73 |             client.and_then(|client| {
   |             ^^^^^^ cannot move out of borrowed content

Honestly, I don’t quite understand what this error is about, because the client attribute was cloned (which has the Option<LapinClientFuture> type), unwrapped and cloned the initialized LapinClientFuture future for a usage later. What can cause such a mistake?

client is an Arc<Box<Future<...>>>, and_then() consumes self, and so that code is attempting to move the boxed future out of the Arc, but that’s not possible - hence the error.

As mentioned before, I think you’ll have an easier time if your RabbitMQClient stores the actual LapinClient, and not a future representing one.

As mentioned before, I think you’ll have an easier time if your RabbitMQClient stores the actual LapinClient , and not a future representing one.

Yeah, probably will be better get back to idea in the first posts of this thread and try to implement turn idea into something real. Although, in a sense, I am concerned about the "appearance" of this solution. But for async environment that probably a good choice?

Anyway, this solution can be written like this (at least how I'm seeing it):

// proxy.rs

/// A reverse proxy application.
pub struct Proxy {
    engine: Arc<RwLock<Box<Engine>>>,
    rabbitmq_client: Arc<AMQPUri>,
    connections: Arc<Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<Message>>>>,
    auth_middleware: Arc<RwLock<Box<Middleware>>>,
}


impl Proxy {
    /// Returns a new instance of a reverse proxy application.
    pub fn new(cli: &CliOptions, engine: Box<Engine>) -> Proxy {
        let auth_middleware: Box<Middleware> = match cli.validate {
            true => Box::new(JwtTokenMiddleware::new(cli)),
            _ => Box::new(EmptyMiddleware::new(cli))
        };

        Proxy {
            engine: Arc::new(RwLock::new(engine)),
            rabbitmq_options: Arc::new(get_amqp_options_from_cli(cli)),
            connections: Arc::new(Mutex::new(HashMap::new())),
            auth_middleware: Arc::new(RwLock::new(auth_middleware)),
        }
    }

    /// Run the server on the specified address and port.
    pub fn run(&self, address: SocketAddr) {
        let handle = Handle::default();
        let listener = TcpListener::bind(&address).unwrap();
        println!("Listening on: {}", address);

        let engine = self.engine.clone();
        let rabbitmq_client = self.rabbitmq_client.clone();
        let connections = self.connections.clone();
        let auth_middleware = self.auth_middleware.clone();

        let rabbitmq_client_init = rabbitmq_client.clone();
        let init = future::lazy(move || -> FutureResult<(), PathfinderError> {
            rabbitmq_client_init.write().unwrap().init();
            future::ok::<(), PathfinderError>(())
        });

        let server = future::lazy(move |rabbitmq_client| -> { 
            listener.incoming().for_each(move |stream| {
                let addr = stream.peer_addr().expect("Connected stream should have a peer address.");

                let rabbitmq_client = rabbimq.clone();
                let engine_local = engine.clone();
                let rabbimq_local = rabbitmq_client.clone();
                let connections_local = connections.clone();
                let auth_middleware_local = auth_middleware.clone();
                let handle_local = handle.clone();

                accept_async(stream)
                    // Process the messages
                    .and_then(move |ws_stream| {
                        let ws_reader = stream.for_each(move |message: Message| {
                            // Message processing ...
                        Ok(())
                        })
                    })
                    .or_else(|err| {
                        debug!("{}", err.description());
                        Ok(())
                    })
            })
        });

        // Run the server
        let server_future = init
            .map_err(|err| { 
                format!("Lapin error:", err.description()); 
                ()
            })
            .and_then(move |rabbitmq| server(rabbitmq).map_err(|_err| ()));
        run(server_future);
    }
}


// rabbitmq.rs
pub struct RabbitMQClient;

impl RabbitMQClient {
    pub fn connect(cli: Arc<AMQPUri>) -> impl Future<Item=LapinClient, Error=io::Error> + Sync + Send + 'static {
        // Establishing a connection 
    }
}


fn get_amqp_options_from_cli(cli: &CliOptions) -> AMQPUri {
        let schema = match cli.rabbitmq_secured {
            true => "amqps",
            false => "amqp",
        };
        format!(
            "{}://{}:{}@{}:{}/{}",
            schema.to_string(),
            cli.rabbitmq_username.clone(),
            cli.rabbitmq_password.clone(),
            cli.rabbitmq_host.clone(),
            cli.rabbitmq_port,
            cli.rabbitmq_virtual_host.clone()
        ).parse().unwrap()
}

However, raises three different questions about the usage:

  1. How to dereference Arc<AMQPUri>, so that the Client::connect(...) method of lapin library could receive the instance of AMQPUri instead of the clone of the Arc<AMQPUri>?
  2. Does it look good to store as the part of the parent struct (in this case is the Proxy struct) the AMQPUri that can be used for instantiating new lapin clients?
  3. In general, in the case of writing code with Tokio, is a good (bad?) practice to chain those kind of futures and pass it to the next stage of processing? As I see, in this wat we can avoid any potential issues with lifetimes, right?

If you have an Arc<AMQPUri>, you just need to get a &AMQPUri from it and call clone() on that. So for example:

let arc: Arc<AMQPUri> = cli;
// option 1
let uri = &*arc;
let cloned_uri = uri.clone();
// option 2
let cloned_uri = Clone::clone(&*arc);
// option 3
let cloned_uri = arc.as_ref().clone();

It's very likely that RabbitMQClient::connect can take &AMQPUri as a parameter, if you're actually cloning it inside.

Also, are you sure the lapin lib needs an owned AMQPUri and not something like AsRef<AMQPUri>?

Hmm, I'm not sure what exactly you mean by "look good". I understand that you're asking if it's good design/impl, but it's unclear on what concern you have (and you must have one since you're asking the question :slight_smile:). At a high level, I see the following phases/components in your setup:

  1. Configuration storing information on how to connect to the broker
  2. A struct that represents a pending connection to the broker - this would impl Future and resolve with a connected client.
  3. All subsequent code that uses the connected client from the previous step, creates channels, handles communication, etc. This code seems highly likely to use chaining/combinators.

I'm just curious about the good practices to use when writing async code with tokio :grinning:. Relatively to the last my post its:

  • When is it best to use locality to data instead of sharing access through Rc / Arc wrappers? (e.g. Proxy + Rc<RabbitMQ client> as field of it vs creating a single future before running the main future with processing inside?)
  • When using futures, it which cases to be better to transfer data between chained futures directly, rather than using .clone() with using it in closures instead?