Greetings,
Having some trouble
use amiquip::{Connection, ConsumerMessage, ConsumerOptions};
use crossbeam_channel::Receiver;
const BATCH_LIMIT: usize = 100;
type RabbitResult<T> = Result<T, Box<dyn std::error::Error>>;
pub trait AmqpConnection<'Consumer, R>{
type Channel: AmqpChannel<'Consumer, R>;
fn open(connection: &str) -> Self;
fn channel(&self, channel: Option<u16>) -> Self::Channel;
}
impl<'Consumer> AmqpConnection<'Consumer, amiquip::ConsumerMessage> for amiquip::Connection{
type Channel = amiquip::Channel;
fn open(connection: &str) -> Self {
Self::insecure_open(connection).unwrap()
}
fn channel(&self, channel: Option<u16>) -> Self::Channel {
self.open_channel(channel).unwrap()
}
}
pub trait AmqpChannel<'Consumer, R>{
type Consumer: AmqpConsumer<'Consumer>;
fn consume(&self, queue: &str) -> Self::Consumer;
fn queue_ack(&self, msg: R, queue: &Vec<Vec<u8>>);
}
impl<'Consumer> AmqpChannel<'Consumer, amiquip::ConsumerMessage> for amiquip::Channel{
type Consumer = amiquip::Consumer<'Consumer>;
fn consume(&self, queue: &str) -> Self::Consumer {
self.basic_consume(queue, ConsumerOptions::default()).unwrap()
}
fn queue_ack(&self, msg: amiquip::ConsumerMessage, queue: &Vec<Vec<u8>>){
if let ConsumerMessage::Delivery(msg) = msg {
queue.push(msg.body.to_owned());
msg.to_owned().ack(self).expect("could not ack previous message");
}
}
}
pub trait AmqpConsumer<'Message> {
type Reciever : IntoIterator;
fn recv(&self) -> Self::Reciever;
}
impl<'Message> AmqpConsumer<'Message> for amiquip::Consumer<'Message>{
type Reciever = &'Message Receiver<amiquip::ConsumerMessage>;
fn recv(&self) -> Self::Reciever {
self.receiver()
}
}
pub struct Rabbit{
connection : String,
queue : String,
channel : Option<u16>,
}
impl Rabbit{
pub fn new(connection: String, queue: String) -> Self {
Self {
connection,
queue,
channel: None,
}
}
}
impl<'Consumer> Rabbit{
pub fn consume<T: AmqpConnection<'Consumer, R>, R>(&self) -> RabbitResult<Vec<Vec<u8>>>
{
let connection = T::open(&self.connection);
let channel = connection.channel(self.channel);
let consumer = channel.consume(&self.queue);
let mut msg_queue = Vec::new();
for msg in consumer.recv() {
if msg_queue.len() >= BATCH_LIMIT {
break;
}
channel.queue_ack(msg, &msg_queue);
}
Ok(msg_queue)
}
}
I am getting an error. The msg
variable returned from consumer.recv
mismatched types
expected type parameter `R`
found associated type `<<<<T as AmqpConnection<'Consumer, R>>::Channel as AmqpChannel<'_, R>>::Consumer as AmqpConsumer<'_>>::Reciever as IntoIterator>::Item`
you might be missing a type parameter or trait bound (rustc E0308)
I am trying to:
A) Make this code easier to mock and test. I don't really want to rely on an instance of RabbitMQ for every test
B) Allow this to be somewhat interchangeable. So if I wanted to say use, Kafka instead, it would make some of these changes less cumbersome. Although one could argue this does the opposite at this point.....
EDIT: FYI -- I am relatively new to Rust