Close async channel when all elements got processed

In an async project, I have a channel for inputs and a stream for outputs.
While the following code compiles, it never terminates because the input channel _tx never gets closed. I want to close the channel once it's empty. I need to push new elements into _tx inside of for_each_concurrent (for recursion), so I probably have to do it inside there.

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

const MAX_CONCURRENCY: usize = 10;

#[tokio::main]
async fn main() {
    let (_tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel(MAX_CONCURRENCY);
    let rx_stream = ReceiverStream::new(rx);
    tokio::spawn(async move {
        futures::StreamExt::for_each_concurrent(rx_stream, MAX_CONCURRENCY, |_i| async {
            // Occasionally push something to `_tx` in here; stop if empty
        })
        .await;
    })
    .await
    .unwrap();
}

Any way I can make this terminate?
Alternatively, is there a more idiomatic way to do this?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.