So I have an app built around quite a few SyncSender and Receiver
I have a issue when shutting down my app. Each thread looks at the same flag for when they should shut down and
I get some Receiver's closing while the other threads are still sending.
I can use try_send but that function has two failure cases instead of one (one for disconnection, one for a full buffer)
I would like one for only disconnection.
Is the and elegant way to send and block unless there is only a disconnection ?
I guess some kind of recursion like
match sender1.as_ref() {
Some(lock) => {
if let Err(e) = lock.try_send(val.clone()) {
match e {
std::sync::mpsc::TrySendError::Full(v) => {
// call try_send again and do another match ?
....
},
std::sync::mpsc::TrySendError::Disconnected(v) => break,
}
};
},
None => {},
}
However, that is a little messy with the multiple matches.
By "receiver is available" do you mean until the buffer has room? In your case, during shutdown, it sounds like the receivers are dropped - I'd imagine that should cause an error on the sender side?
Or that is what seems to happen.
the first receiver gets dropped and the my console print statements end just before the send and execution does not seem to continue in that thread.
Although obviously I may be fooling myself, it's a fairly complicated program.
If I replace send with try_send the program shuts down correctly.
use std::sync::mpsc::sync_channel;
use std::thread;
fn main() {
let (tx, rx) = sync_channel(10);
thread::spawn(move || {
for x in 0 .. 20 {
match tx.send(x) {
Ok(()) => println!("sent {}", x),
Err(e) => println!("error: {}", e)
}
}
println!("tx thread exiting");
});
thread::sleep(std::time::Duration::from_millis(2000));
for _ in 0 .. 5 {
let r = rx.recv().unwrap();
println!("received {}", r);
}
println!("Dropping receiver");
drop(rx);
thread::sleep(std::time::Duration::from_millis(2000));
}
Here's a likely execution that you'll see (likely only because I'm hacking this with sleeps, and not any coordination):
sent 0
sent 1
sent 2
sent 3
sent 4
sent 5
sent 6
sent 7
sent 8
sent 9
received 0
sent 10
sent 11
received 1
received 2
received 3
received 4
Dropping receiver
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
tx thread exiting
So the sender fills up the buffer, and blocks. The receiver drains 5 items, which allows the sender to make some progress on sending a few more items. Then we drop the receiver. A blocked send pops out, and all subsequent send attempts error immediately.
So, it would seem that it's behaving as expected unless I did something stupid/non-representative in this example.
I have done my own test and I think you are right . Thanks.
Could it be that the send is waiting because of a full buffer and then the receiver gets dropped so the buffer never gets reduced for send to complete ?
send is supposed to return with an error if a receiver is dropped, irrespective of whether there's room in the buffer or not. In other words, buffering should not prevent a sender from observing the disconnect. You can modify my example code above by dropping the receiver without it consuming any elements - the sender should still pop out of send with an error.
Is it possible you're not handling the error part of send appropriately?
Sorry its a bit convoluted but it reflects my main program more.
My receiver thread exits out early after getting a changing value (You can edit this value to see the send fail at different times)
The thread then goes away.
My other send thread then hangs a short time later.
The problem is the receiver isn't actually dropped because you have it wrapped in an Arc and you "leave" one behind on the main thread, which won't drop until main() exits. So, try the following:
<...>
sender = Some(Arc::new(Mutex::new(s)));
receiver = Some(Arc::new(Mutex::new(r)));
receiver2 = receiver.clone();
drop(receiver); <-- ADD THIS LINE
<...>