Generic trait implementation for extension and mocking

Greetings,

Having some trouble

use amiquip::{Connection, ConsumerMessage, ConsumerOptions};
use crossbeam_channel::Receiver;

const BATCH_LIMIT: usize = 100;

type RabbitResult<T> = Result<T, Box<dyn std::error::Error>>;

pub trait AmqpConnection<'Consumer, R>{
    type Channel: AmqpChannel<'Consumer, R>;

    fn open(connection: &str) -> Self;
    fn channel(&self, channel: Option<u16>) -> Self::Channel;
}


impl<'Consumer> AmqpConnection<'Consumer, amiquip::ConsumerMessage> for amiquip::Connection{
    type Channel = amiquip::Channel;

    fn open(connection: &str) -> Self {
       Self::insecure_open(connection).unwrap()
    }
    
    fn channel(&self, channel: Option<u16>) -> Self::Channel {
        self.open_channel(channel).unwrap()
    }
    
}

pub trait AmqpChannel<'Consumer, R>{
    type Consumer: AmqpConsumer<'Consumer>;

    fn consume(&self, queue: &str) -> Self::Consumer; 

    fn queue_ack(&self, msg: R, queue: &Vec<Vec<u8>>);
}

impl<'Consumer> AmqpChannel<'Consumer, amiquip::ConsumerMessage> for amiquip::Channel{
    type Consumer = amiquip::Consumer<'Consumer>;
   
    fn consume(&self, queue: &str) -> Self::Consumer {
       self.basic_consume(queue, ConsumerOptions::default()).unwrap() 
    }

    fn queue_ack(&self, msg: amiquip::ConsumerMessage, queue: &Vec<Vec<u8>>){
        if let ConsumerMessage::Delivery(msg) = msg {
            queue.push(msg.body.to_owned());
            msg.to_owned().ack(self).expect("could not ack previous message");
        }
    }
}


pub trait AmqpConsumer<'Message> {
    type Reciever : IntoIterator;

    fn recv(&self) -> Self::Reciever;
} 

impl<'Message> AmqpConsumer<'Message> for amiquip::Consumer<'Message>{
    type Reciever = &'Message Receiver<amiquip::ConsumerMessage>;

    fn recv(&self) -> Self::Reciever {
        self.receiver()
    }
}

pub struct Rabbit{
    connection : String, 
    queue : String, 
    channel : Option<u16>,
}

impl Rabbit{
    pub fn new(connection: String, queue: String) -> Self {
        Self {
            connection, 
            queue,
            channel: None,
        }
    }
}

impl<'Consumer> Rabbit{
    pub fn consume<T: AmqpConnection<'Consumer, R>, R>(&self) -> RabbitResult<Vec<Vec<u8>>> 
    {
        let connection = T::open(&self.connection);

        let channel = connection.channel(self.channel);

        let consumer = channel.consume(&self.queue);

        let mut msg_queue = Vec::new();

        for msg in consumer.recv() {
            if msg_queue.len() >= BATCH_LIMIT {
                break;
            }
            channel.queue_ack(msg, &msg_queue);
        }
        Ok(msg_queue)
    }
}

I am getting an error. The msg variable returned from consumer.recv

mismatched types
expected type parameter `R`
  found associated type `<<<<T as AmqpConnection<'Consumer, R>>::Channel as AmqpChannel<'_, R>>::Consumer as AmqpConsumer<'_>>::Reciever as IntoIterator>::Item`
you might be missing a type parameter or trait bound (rustc E0308) 

I am trying to:
A) Make this code easier to mock and test. I don't really want to rely on an instance of RabbitMQ for every test
B) Allow this to be somewhat interchangeable. So if I wanted to say use, Kafka instead, it would make some of these changes less cumbersome. Although one could argue this does the opposite at this point.....

EDIT: FYI -- I am relatively new to Rust

This says that the caller gets to choose any arbitrary type for R:

pub fn consume<T: AmqpConnection<'Consumer, R>, R>(&self) -> RabbitResult<Vec<Vec<u8>>> {
                                                ^

This says that implementers of AmqpConsumer get to choose any arbitrary type for the items returned by the iterator:

pub trait AmqpConsumer<'Message> {
    type Reciever: IntoIterator;

    fn recv(&self) -> Self::Reciever;
}

This assumes that R and the iterated items have the same type, since queue_ack expects R.

for msg in consumer.recv() {
    channel.queue_ack(msg, &msg_queue);
}
1 Like

Understood but what would be a resolution to this? And look, if there is another way to better handle it; im open to suggestions

I made the following changes:

  • Implementations now control R
  • Fixed lifetime constraints
  • Fixed mut
use amiquip::{ConsumerMessage, ConsumerOptions};
use crossbeam_channel::Receiver;

const BATCH_LIMIT: usize = 100;

type RabbitResult<T> = Result<T, Box<dyn std::error::Error>>;

pub trait AmqpConnection {
    type Channel: AmqpChannel;

    fn open(connection: &str) -> Self;
    fn channel(&mut self, channel: Option<u16>) -> Self::Channel;
}

impl AmqpConnection for amiquip::Connection {
    type Channel = amiquip::Channel;

    fn open(connection: &str) -> Self {
        Self::insecure_open(connection).unwrap()
    }

    fn channel(&mut self, channel: Option<u16>) -> Self::Channel {
        self.open_channel(channel).unwrap()
    }
}

pub trait AmqpChannel {
    type R;
    type Consumer<'message>: AmqpConsumer<Item = Self::R>
    where
        Self: 'message;

    fn consume(&self, queue: &str) -> Self::Consumer<'_>;

    fn queue_ack(&self, msg: Self::R, queue: &mut Vec<Vec<u8>>);
}

impl AmqpChannel for amiquip::Channel {
    type R = amiquip::ConsumerMessage;
    type Consumer<'message> = amiquip::Consumer<'message>;

    fn consume(&self, queue: &str) -> Self::Consumer<'_> {
        self.basic_consume(queue, ConsumerOptions::default())
            .unwrap()
    }

    fn queue_ack(&self, msg: amiquip::ConsumerMessage, queue: &mut Vec<Vec<u8>>) {
        if let ConsumerMessage::Delivery(msg) = msg {
            queue.push(msg.body.to_owned());
            msg.to_owned()
                .ack(self)
                .expect("could not ack previous message");
        }
    }
}

pub trait AmqpConsumer {
    type Item;
    type Reciever<'message>: IntoIterator<Item = Self::Item>
    where
        Self: 'message;

    fn recv(&self) -> Self::Reciever<'_>;
}

impl<'message> AmqpConsumer for amiquip::Consumer<'message> {
    type Item = amiquip::ConsumerMessage;
    type Reciever<'a> = &'a Receiver<amiquip::ConsumerMessage> where Self: 'a;

    fn recv(&self) -> Self::Reciever<'_> {
        self.receiver()
    }
}

pub struct Rabbit {
    connection: String,
    queue: String,
    channel: Option<u16>,
}

impl Rabbit {
    pub fn new(connection: String, queue: String) -> Self {
        Self {
            connection,
            queue,
            channel: None,
        }
    }
}

impl Rabbit {
    pub fn consume<T: AmqpConnection>(&self) -> RabbitResult<Vec<Vec<u8>>> {
        let mut connection = T::open(&self.connection);

        let channel = connection.channel(self.channel);

        let consumer = channel.consume(&self.queue);

        let mut msg_queue = Vec::new();

        for msg in consumer.recv() {
            if msg_queue.len() >= BATCH_LIMIT {
                break;
            }
            channel.queue_ack(msg, &mut msg_queue);
        }
        Ok(msg_queue)
    }
}
2 Likes

Really appreciate your time and help on this. Was not expecting a full write up — truly appreciate it