Cannot re-use a tokio channel receiver

Hello,
I am trying to use a tokio mpsc channel multiple times. I am passing the tx and rx endpoints into a function. When calling the function multiple times, I can pass in a clone of the tx endpoint, but the rx endpoint is not clone-able, so the function takes ownership and I cannot re-use the rx endpoint on subsequent function calls. I cannot pass the rx endpoint in as a reference because it gets moved inside a new task, and there is a lifetime problem. How can I re-use the rx receiver in multiple functions.

#[cfg(test)]
mod tests {
	use tokio::sync::mpsc::{
		channel as MpscChannel, Receiver as MpscReceiver, Sender as MpscSender,
	};
	use tokio::task;

	async fn try_send(tx: MpscSender<Vec<u8>>, mut rx: MpscReceiver<Vec<u8>>) {
		//create a receiving task
		let jh = task::spawn(async move {
			let rxd_vec = rx.recv().await;
			println!("Rxd: {:?}", rxd_vec);
		});

		//send somethingto the receiving task
		_ = tx.send(vec![1_u8, 2, 3]).await;

		jh.await.unwrap();
	}

	#[tokio::test]
	async fn try_multiple_sends() {
		let (tx, rx) = MpscChannel::<Vec<u8>>(10);
		try_send(tx.clone(), rx).await; //***Calling the function once is fine

		//try_send(tx, rx).await;           ***Enabling this line causes a build error
	}
}

Running the test produces the following:

running 1 test
Rxd: Some([1, 2, 3])
test tests::try_multiple_sends ... ok

Note, the problem occurs when calling try_send() a second time.

This is a somewhat unusual setup you have right there. Why are you spawning multiple receiving tasks? It is more common to have one receiving task that keeps on receiving while the channel is open. I don't know how close your example is to your real world problem you are trying to solve, but this is how I'd handle receiving multiple messages in one dedicated task instead of spawning a new task each time I'm expecting to receive a new message:

#[cfg(test)]
mod tests {
    use tokio::sync::mpsc::{
        channel as MpscChannel, Sender as MpscSender,
    };
    use tokio::task;

    async fn try_send(tx: MpscSender<Vec<u8>>) {
        _ = tx.send(vec![1_u8, 2, 3]).await;
    }

    #[tokio::test]
    async fn try_multiple_sends() {
        let (tx, mut rx) = MpscChannel::<Vec<u8>>(10);

        try_send(tx.clone()).await;
        try_send(tx).await;

        task::spawn(async move {
            while let Some(rxd_vec) = rx.recv().await {
                println!("Rxd: {:?}", rxd_vec);
            }
        })
        .await
        .unwrap();
        
        
        assert!(false); // fail test so we can inspect stdout on playground
    }
}

Playground.

Thanks Jofas. I re-wrote as you suggested and it works OK.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.