Why does it keep blocking?

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx,rx) = mpsc::channel();
    let tx2 = mpsc::Sender::clone(&tx); // Why does it keep blocking?

    thread::spawn(move || {
        let val = vec![
            String::from("thread-01: hello"),
            String::from("thread-01: rust"),
        ];
        for item in val {
            tx.send(item).unwrap();
            thread::sleep(Duration::from_millis(1000));
        }
    });
    for recv in rx {
        println!("recv:{}", recv);
    }
}

From the documentation of Sender:

Note: all senders (the original and the clones) need to be dropped for the receiver to stop blocking to receive messages with Receiver::recv.

You never move your second sender to another thread that would drop it when it completes. You need to manually drop it:

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx,rx) = mpsc::channel();
    let tx2 = tx.clone();
    
    thread::spawn(move || {
        let val = vec![
            String::from("thread-01: hello"),
            String::from("thread-01: rust"),
        ];
        for item in val {
            tx.send(item).unwrap();
            thread::sleep(Duration::from_millis(1000));
        }
    });

    // drop `tx2` so that it won't cause your receiver to block forever
    drop(tx2);
    
    for recv in rx {
        println!("recv:{}", recv);
    }
}

Playground.

1 Like

A loop over a mpsc::channel’s receiver (which uses its IntoIter implementation) such as the loop for recv in rx in your code will block and wait for more items as long as any senders still exist.

This can be found, slightly indirectly, in the documentation

  • for IntoIter in std::sync::mpsc - Rust

    pub struct IntoIter<T> { /* private fields */ }

    An owning iterator over messages on a Receiver, created by into_iter.

    This iterator will block whenever next is called, waiting for a new message, and None will be returned if the corresponding channel has hung up.

  • and regarding the term “hung up”, on the docs for Receiver in std::sync::mpsc - Rust (with some emphasis added by me)

    pub fn recv(&self) -> Result<T, RecvError>

    Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up.

    This function will always block the current thread if there is no data available and it’s possible for more data to be sent (at least one sender still exists). Once a message is sent to the corresponding Sender (or SyncSender), this receiver will wake up and return that message.

    If the corresponding Sender has disconnected, or it disconnects while this call is blocking, this call will wake up and return Err to indicate that no more messages can ever be received on this channel. However, since channels are buffered, messages sent before the disconnect will still be properly received.

Since your code keeps the sender tx2 around unused and not dropped, on the stack of the main function until after the for recv in rx loop would have finished, there’s always an not-dropped Sender that still exists, so the loop can never terminate, resulting in a sort of deadlock.

The same problem does not appear from the original tx because, even though it also lives in a local variable in main initially, the sender is sent into the new thread via the move-capturing closure passed to thread::spawn and thus gets dropped once the spawned thread finishes, which happens after that thread completed its for item in val loop.

2 Likes
use std::rc::Rc;
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx,rx) = mpsc::channel();
    let tx2 = mpsc::Sender::clone(&tx);

    thread::spawn(move || {
        let val = vec![String::from("thread-01: AAA...ok"), String::from("thread-01: BBB...ok")];
        for item in val {
            tx.send(item).unwrap();
            thread::sleep(Duration::from_millis(1000));
        }
    });

    for recv in rx {
        println!("recv:{}", recv);
    }

    thread::spawn(move || {
        tx2.send("hello".to_string()).expect("发送异常");
        thread::sleep(Duration::from_millis(1000));
    });

    //drop(tx2); //tx2 has been moved to the thread. How do I close tx2 ?
}

It will automatically be dropped once the thread that takes ownership of tx2 finishes, thanks to move semantics in Rust.

I.e. here:

The closure you spawn on a new thread takes ownership of tx2 thanks to the move keyword. After spawning the thread, you can't use tx2 in your main thread anymore. tx2 is implicitly dropped when the closure finishes (so after your call to thread::sleep).


Note that your program still deadlocks, because you use tx2 after entering your "keep on receiving till all senders are dropped loop" here:

    // blocks till `tx2` is dropped
    for recv in rx {
        println!("recv:{}", recv);
    }

You must spawn the thread where you use tx2 before entering that loop.

2 Likes

You have to move the second thread::spawn before the for recv in rx, otherwise it will never run because the for recv in rx will wait for tx2 to be dropped, but tx2 can be dropped only after the thread after the for recv in rx stops.

2 Likes

But the result of this run is blocking.

I don't understand what you mean, sorry.

This will work:

use std::{thread, sync::mpsc, time::Duration};

fn main() {
    let (tx,rx) = mpsc::channel();
    let tx2 = mpsc::Sender::clone(&tx);

    thread::spawn(move || {
        let val = vec![String::from("thread-01: AAA...ok"), String::from("thread-01: BBB...ok")];
        for item in val {
            tx.send(item).unwrap();
            thread::sleep(Duration::from_millis(1000));
        }
    });

    thread::spawn(move || {
        tx2.send("hello".to_string()).expect("发送异常");
        thread::sleep(Duration::from_millis(1000));
    });

    for recv in rx {
        println!("recv:{}", recv);
    }
}

Playground.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.