SyncSender block on full buffer


#1

So I have an app built around quite a few SyncSender and Receiver

I have a issue when shutting down my app. Each thread looks at the same flag for when they should shut down and
I get some Receiver’s closing while the other threads are still sending.

I can use try_send but that function has two failure cases instead of one (one for disconnection, one for a full buffer)
I would like one for only disconnection.

Is the and elegant way to send and block unless there is only a disconnection ?

I guess some kind of recursion like

match sender1.as_ref() {
 
    Some(lock) => {
                                        
        if let Err(e) = lock.try_send(val.clone()) {
            match e {
                std::sync::mpsc::TrySendError::Full(v) => {
                       // call try_send again and do another match ?
                       ....
                },

                 std::sync::mpsc::TrySendError::Disconnected(v) => break,
              }
         };
      },

     None    => {},
}

However, that is a little messy with the multiple matches.

Thanks


#2

This is a problem I want to solve with a new channel crate, which will provide a blocking send(_) method.

But for now, I’d just suggest using the following solution:

let mut val = ...;
let res = loop {
    let res = lock.try_send(val);
    if let Err(std::sync::mpsc::TrySendError::Full(v)) = res {
        val = v;
        std::thread::yield_now();
    } else {
        break res;
    }
};

#3

Thanks useful to know about yield_now and the break parameter.


#4

Doesn’t SyncSender::send already block when the buffer is full? You get an error only if the receiver is dropped.


#5

Yes which is great. However, it seems to also just block until a receiver is available. I want it to return immediately in that case.


#6

By “receiver is available” do you mean until the buffer has room? In your case, during shutdown, it sounds like the receivers are dropped - I’d imagine that should cause an error on the sender side?


#7

I mean dropped.

Or that is what seems to happen.
the first receiver gets dropped and the my console print statements end just before the send and execution does not seem to continue in that thread.
Although obviously I may be fooling myself, it’s a fairly complicated program.

If I replace send with try_send the program shuts down correctly.


#8

Unless I’m misunderstanding the docs for send (or it has a bug), it seems that send should return with an error once the receiver side is dropped.


#9

I will try to make a simplified test case later to see if I a reproduced the problem. It seems logical that it should behave that way.


#10

Here’s a really rudimentary example:

use std::sync::mpsc::sync_channel;
use std::thread;

fn main() {

    let (tx, rx) = sync_channel(10);

    thread::spawn(move || {
        for x in 0 .. 20 {
            match tx.send(x) {
                Ok(()) => println!("sent {}", x),
                Err(e) => println!("error: {}", e)
            }
        }
        println!("tx thread exiting");
    });

    thread::sleep(std::time::Duration::from_millis(2000));
    for _ in 0 .. 5 {
        let r = rx.recv().unwrap();
        println!("received {}", r);
    }
    println!("Dropping receiver");
    drop(rx);
    thread::sleep(std::time::Duration::from_millis(2000));
    
}

https://play.rust-lang.org/?gist=f14ff7f7887cbe89448841eea11cacd8&version=nightly

Here’s a likely execution that you’ll see (likely only because I’m hacking this with sleeps, and not any coordination):

sent 0
sent 1
sent 2
sent 3
sent 4
sent 5
sent 6
sent 7
sent 8
sent 9
received 0
sent 10
sent 11
received 1
received 2
received 3
received 4
Dropping receiver
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
error: sending on a closed channel
tx thread exiting

So the sender fills up the buffer, and blocks. The receiver drains 5 items, which allows the sender to make some progress on sending a few more items. Then we drop the receiver. A blocked send pops out, and all subsequent send attempts error immediately.

So, it would seem that it’s behaving as expected unless I did something stupid/non-representative in this example.


#11

You are faster than I :slight_smile:

I have done my own test and I think you are right . Thanks.

Could it be that the send is waiting because of a full buffer and then the receiver gets dropped so the buffer never gets reduced for send to complete ?


#12

send is supposed to return with an error if a receiver is dropped, irrespective of whether there’s room in the buffer or not. In other words, buffering should not prevent a sender from observing the disconnect. You can modify my example code above by dropping the receiver without it consuming any elements - the sender should still pop out of send with an error.

Is it possible you’re not handling the error part of send appropriately?


#13

I have changed the error checking around send to the same as yours. My thread just stops at send :frowning:


#14

Hmm. And you’re sure the receiver is actually dropped? It’s important that its drop executes.


#15

I have a link to a playground example

https://play.rust-lang.org/?gist=603c544cb66395775154a6680a18f787&version=nightly

Sorry its a bit convoluted but it reflects my main program more.
My receiver thread exits out early after getting a changing value (You can edit this value to see the send fail at different times)
The thread then goes away.

My other send thread then hangs a short time later.


#16

The problem is the receiver isn’t actually dropped because you have it wrapped in an Arc and you “leave” one behind on the main thread, which won’t drop until main() exits. So, try the following:

<...>
sender = Some(Arc::new(Mutex::new(s)));
receiver = Some(Arc::new(Mutex::new(r)));
receiver2 = receiver.clone();
drop(receiver); <-- ADD THIS LINE
<...>

#17

That makes complete sense having it spelled out :slight_smile: Thank you