use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx,rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx); // Why does it keep blocking?
thread::spawn(move || {
let val = vec![
String::from("thread-01: hello"),
String::from("thread-01: rust"),
];
for item in val {
tx.send(item).unwrap();
thread::sleep(Duration::from_millis(1000));
}
});
for recv in rx {
println!("recv:{}", recv);
}
}
From the documentation of Sender
:
Note: all senders (the original and the clones) need to be dropped for the receiver to stop blocking to receive messages with
Receiver::recv
.
You never move your second sender to another thread that would drop it when it completes. You need to manually drop it:
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx,rx) = mpsc::channel();
let tx2 = tx.clone();
thread::spawn(move || {
let val = vec![
String::from("thread-01: hello"),
String::from("thread-01: rust"),
];
for item in val {
tx.send(item).unwrap();
thread::sleep(Duration::from_millis(1000));
}
});
// drop `tx2` so that it won't cause your receiver to block forever
drop(tx2);
for recv in rx {
println!("recv:{}", recv);
}
}
A loop over a mpsc::channel
’s receiver (which uses its IntoIter
implementation) such as the loop for recv in rx
in your code will block and wait for more items as long as any senders still exist.
This can be found, slightly indirectly, in the documentation
- for IntoIter in std::sync::mpsc - Rust
pub struct IntoIter<T> { /* private fields */ }
An owning iterator over messages on a
Receiver
, created byinto_iter
.This iterator will block whenever
next
is called, waiting for a new message, andNone
will be returned if the corresponding channel has hung up. - and regarding the term “hung up”, on the docs for Receiver in std::sync::mpsc - Rust (with some emphasis added by me)
pub fn recv(&self) -> Result<T, RecvError>
Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up.
This function will always block the current thread if there is no data available and it’s possible for more data to be sent (at least one sender still exists). Once a message is sent to the corresponding
Sender
(orSyncSender
), this receiver will wake up and return that message.If the corresponding
Sender
has disconnected, or it disconnects while this call is blocking, this call will wake up and returnErr
to indicate that no more messages can ever be received on this channel. However, since channels are buffered, messages sent before the disconnect will still be properly received.
Since your code keeps the sender tx2
around unused and not dropped, on the stack of the main
function until after the for recv in rx
loop would have finished, there’s always an not-dropped Sender
that still exists, so the loop can never terminate, resulting in a sort of deadlock.
The same problem does not appear from the original tx
because, even though it also lives in a local variable in main
initially, the sender is sent into the new thread via the move
-capturing closure passed to thread::spawn
and thus gets dropped once the spawned thread finishes, which happens after that thread completed its for item in val
loop.
use std::rc::Rc;
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx,rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let val = vec![String::from("thread-01: AAA...ok"), String::from("thread-01: BBB...ok")];
for item in val {
tx.send(item).unwrap();
thread::sleep(Duration::from_millis(1000));
}
});
for recv in rx {
println!("recv:{}", recv);
}
thread::spawn(move || {
tx2.send("hello".to_string()).expect("发送异常");
thread::sleep(Duration::from_millis(1000));
});
//drop(tx2); //tx2 has been moved to the thread. How do I close tx2 ?
}
It will automatically be dropped once the thread that takes ownership of tx2
finishes, thanks to move
semantics in Rust.
I.e. here:
The closure you spawn on a new thread takes ownership of tx2
thanks to the move
keyword. After spawning the thread, you can't use tx2
in your main thread anymore. tx2
is implicitly dropped when the closure finishes (so after your call to thread::sleep
).
Note that your program still deadlocks, because you use tx2
after entering your "keep on receiving till all senders are dropped loop" here:
// blocks till `tx2` is dropped
for recv in rx {
println!("recv:{}", recv);
}
You must spawn the thread where you use tx2
before entering that loop.
You have to move the second thread::spawn
before the for recv in rx
, otherwise it will never run because the for recv in rx
will wait for tx2
to be dropped, but tx2
can be dropped only after the thread after the for recv in rx
stops.
But the result of this run is blocking.
I don't understand what you mean, sorry.
This will work:
use std::{thread, sync::mpsc, time::Duration};
fn main() {
let (tx,rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let val = vec![String::from("thread-01: AAA...ok"), String::from("thread-01: BBB...ok")];
for item in val {
tx.send(item).unwrap();
thread::sleep(Duration::from_millis(1000));
}
});
thread::spawn(move || {
tx2.send("hello".to_string()).expect("发送异常");
thread::sleep(Duration::from_millis(1000));
});
for recv in rx {
println!("recv:{}", recv);
}
}
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.