In Tokio::sync::mpsc may occur memory leak!

hi.

for a scenario need to safe shutdown a process/task
for example if we recv a signal shutdown from channel,
then we want close channel and also want receive all msg exist in buffer channel

now see bellow code :

let (sx, mut rx)  = channel(10);
    
    let our = sx.clone();
    let jh = tokio::spawn(async move {
        while let Some(i) = rx.recv().await {
            if i == 20 {
                rx.close();
            }
            
            // Processing here ...
            
        }
    });

    for elem in 0..100 {
        let _ = sx.send(elem).await;
    }

    let _ = jh.await;


in above code when channel is closed,
we never get None from channel to know its finished !!!
and while let awaiting for recv next msg !!!
if we not thinking for that situation process/task never finish

for above case if exist a recv.len() to we know about how
many msg exist, it was to much simpler than checking like
bellow :

        loop {
            match rx.recv().await {
                Some(i) => {
                    if i == 20 {
                        rx.close();
                    }
                    println!("==> {}", i);

                    if our.is_closed() {
                        loop {
                            select! {
                                Some(i) = rx.recv() => {
                                    println!("after closed >> {}", i)
                                }
        
                                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                                    println!("channel is closed, and buffer is empty now");
                                    return;
                                }
                            }           
                        }
                    }         
                }
                None => {
                    return;   
                }
            }
        }

maybe i wrong,
or exist better way,
what's you think ??

I'm not quite sure what exactly you are suggesting, but you need to drop(sx) before you await the join handle to get a None from the recv call.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.