Help understanding lifetimes within tokio::task

I am attempting to forward events from a Lapin RabbitMQ consumer over to a struct I built to process inbound events. I have the following code, which does not compile with a "cannot infer appropriate lifetime" error. I understand why this error is happening, I'm just not sure exactly how to go about fixing it, any suggestions would be much appreciated

pub struct RabbitRequestExecutor {
    pub config: RabbitConfig,
    connection: lapin::Connection,
    channel: lapin::Channel,
    message_router: MessageRouter<Result<(Channel, Delivery), lapin::Error>>
}

impl RabbitRequestExecutor {
    async fn start_consumer(&mut self) -> Result<(), RequestExecutorError> {

        let mut consume_options = BasicConsumeOptions::default();
        consume_options.no_ack = true;

        let consumer = self.channel
            .basic_consume(
                self.config.receive_queue.as_str(),
                "rabbit consumer",
                consume_options,
                FieldTable::default(),
            )
            .await?

        tokio::spawn(async move {
            consumer.map(Ok).forward(&mut self.message_router).await
        });

        Ok(())

    }
}

Please post the full error as reported by cargo build.

@Alice here is a simpler example to help

async fn start_consumer(&self) -> Result<MessageRouter<Result<(Channel, Delivery), lapin::Error>>, RequestExecutorError> {

        let mut consume_options = BasicConsumeOptions::default();
        consume_options.no_ack = true;

        let consumer = self.channel
            .basic_consume(
                self.config.receive_queue.as_str(),
                "rabbit consumer",
                consume_options,
                FieldTable::default(),
            )
            .await?;

        let mut msg_router = MessageRouter::new();

        tokio::spawn(async move {
            consumer.map(Ok).forward(&mut msg_router).await
        });

        Ok(msg_router)

    }

with the error being as follows

error[E0382]: use of moved value: `msg_router`
   --> src/request_executor.rs:102:12
    |
96  |           let mut msg_router = MessageRouter::new();
    |               -------------- move occurs because `msg_router` has type `message_router::MessageRouter<std::result::Result<(lapin::channel::Channel, lapin::message::Delivery), lapin::error::Error>>`, which does not implement the `Copy` trait
97  | 
98  |           tokio::spawn(async move {
    |  _________________________________-
99  | |             consumer.map(Ok).forward(&mut msg_router).await
    | |                                           ---------- variable moved due to use in generator
100 | |         });
    | |_________- value moved here
101 | 
102 |           Ok(msg_router)
    |              ^^^^^^^^^^ value used here after move

So the overall problem I'm trying to solve, is that I need this consumer to forward the messages received into msg_router , but I also need to return the router, because other parts of the app need to subscribe to a stream that the router is producing

Rust enforces that values are owned in exactly one place. It looks like the spawned task is writing messages into the MessageRouter, whereas you say that the returned value is used to subscribe to it, i.e. read messages. You want to read messages you write into it in the task, or something else?

You probably want some message passing channels here.

The router itself consumes the messages, and then is producing multiple streams based on a tag within the data being passed in, I've been trying to figure out how to properly implement this pattern

Perhaps a channel per tag?

@alice thanks for the help! Here is what I put together if anyone else has a need for something like this, feedback is much appreciated.

use futures::channel::mpsc::{Sender, Receiver};
use std::collections::HashMap;
use futures::{SinkExt};
use std::sync::Arc;
use futures::lock::Mutex;

pub trait Addressable {
    fn get_topic(&self) -> String;
}

#[derive(Clone)]
pub struct MessageRouter<T: Addressable + Clone + Send> {
    subscriptions: Arc<Mutex<HashMap<String, Sender<T>>>>,
}

impl <T: Addressable + Clone + Send> MessageRouter<T> {
    pub fn new() -> Self {
        MessageRouter {
            subscriptions: Arc::new(Mutex::new(HashMap::new()))
        }
    }
}

pub struct Subscription<T>(Receiver<T>);

impl<T: Addressable + Clone + Send> MessageRouter<T> {

    pub async fn publish(&mut self, item: T) {
        let mut subscriptions = self.subscriptions.lock().await;
        if let Some(subscriber) = subscriptions.get_mut(&item.get_topic()) {
            match subscriber.send(item.clone()).await {
                Err(_) => subscriptions.remove(&item.get_topic()),
                Ok(_) => None
            };
        }
    }

    pub async fn subscribe(&mut self, topic: String) -> Subscription<T> {
        let (sender, receiver) = futures::channel::mpsc::channel(128);
        self.subscriptions.lock().await.insert(topic.clone(), sender);
        return Subscription(receiver);
    }
}

I use it like this

async fn start_consumer(&self) -> Result<MessageRouter<Result<(Channel, Delivery), lapin::Error>>, RequestExecutorError> {

        let mut consume_options = BasicConsumeOptions::default();
        consume_options.no_ack = true;

        let mut consumer = self.channel
            .basic_consume(
                self.config.receive_queue.as_str(),
                "rabbit consumer",
                consume_options,
                FieldTable::default(),
            )
            .await?;

        let msg_router = MessageRouter::new();
        let mut router_clone = msg_router.clone();

        tokio::spawn(async move {
            while let Some(msg) = consumer.next().await {
                router_clone.publish(msg).await
            }
        });

        Ok(msg_router)

    }

I haven't done extensive testing yet, but making the set of senders Arc<Mutex<Hashmap>>> should make sure that all the clones share the same internal state, and thus I can capture one copy inside the async task, and return the other

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.