I am attempting to forward events from a Lapin RabbitMQ consumer over to a struct I built to process inbound events. I have the following code, which does not compile with a "cannot infer appropriate lifetime" error. I understand why this error is happening, I'm just not sure exactly how to go about fixing it, any suggestions would be much appreciated
error[E0382]: use of moved value: `msg_router`
--> src/request_executor.rs:102:12
|
96 | let mut msg_router = MessageRouter::new();
| -------------- move occurs because `msg_router` has type `message_router::MessageRouter<std::result::Result<(lapin::channel::Channel, lapin::message::Delivery), lapin::error::Error>>`, which does not implement the `Copy` trait
97 |
98 | tokio::spawn(async move {
| _________________________________-
99 | | consumer.map(Ok).forward(&mut msg_router).await
| | ---------- variable moved due to use in generator
100 | | });
| |_________- value moved here
101 |
102 | Ok(msg_router)
| ^^^^^^^^^^ value used here after move
So the overall problem I'm trying to solve, is that I need this consumer to forward the messages received into msg_router , but I also need to return the router, because other parts of the app need to subscribe to a stream that the router is producing
Rust enforces that values are owned in exactly one place. It looks like the spawned task is writing messages into the MessageRouter, whereas you say that the returned value is used to subscribe to it, i.e. read messages. You want to read messages you write into it in the task, or something else?
You probably want some message passing channels here.
The router itself consumes the messages, and then is producing multiple streams based on a tag within the data being passed in, I've been trying to figure out how to properly implement this pattern
async fn start_consumer(&self) -> Result<MessageRouter<Result<(Channel, Delivery), lapin::Error>>, RequestExecutorError> {
let mut consume_options = BasicConsumeOptions::default();
consume_options.no_ack = true;
let mut consumer = self.channel
.basic_consume(
self.config.receive_queue.as_str(),
"rabbit consumer",
consume_options,
FieldTable::default(),
)
.await?;
let msg_router = MessageRouter::new();
let mut router_clone = msg_router.clone();
tokio::spawn(async move {
while let Some(msg) = consumer.next().await {
router_clone.publish(msg).await
}
});
Ok(msg_router)
}
I haven't done extensive testing yet, but making the set of senders Arc<Mutex<Hashmap>>> should make sure that all the clones share the same internal state, and thus I can capture one copy inside the async task, and return the other