Test for when crossbeam_channel is "disconnected"?


#1

I have a simple app using crossbeam_channel. There’s a bounded channel:

let (s, r) = bounded::<String>(conf.nthreads * 4);

There’s a reader, in the main thread, reading lines from stdin and sending them down the channel:

    let reader = BufReader::new(io::stdin());
    for line in reader.lines() {
        STAT.read();
        s.send(line?).unwrap();
    }

And there are some workers that take the lines and … well, work on them. Then they POST them in a HTTP request to an elasticsearch instance. They’re started before the reader takes over the main thread, of course.

    for _tid in 0..conf.nthreads {
        let recv = r.clone();
        thread::spawn(move || {
            for line in recv.iter() {
                // ...
                if http_res.status().as_u16() == 201 {
                    STAT.created();
                } else {
                    STAT.error();
                }
            }
        });
    }

Also started before them is another thread, which the focus of this question. It deals with printing progress statistics at intervals, using a collection of AtomicUsize in a struct and shared via lazy_static.

    {
        let recv = r.clone();
        thread::spawn(move || {
            let ticker = tick(Duration::from_secs(60));
            for _ in ticker {
                STAT.print();
                if recv.is_empty() {  //  XXX
                    return;
                }
            }
        })
    };

The question is about how to know when this thread should terminate. The others just use the blocking iterator until it’s None. This one is currently checking whether the channel is_empty(). However, this is not strictly correct. In this case it’s very unlikely, but there is a small chance that the input reader stalled on io (it’s reading a pipe from another process) and the workers drained the channel without actually finishing. This isn’t just about stats; I’ve omitted some detail about joining the threads at the end, which basically waits for the stats thread as the overall termination condition for the program.

The condition I actually want to test is that all the work is done: the channel is empty (the workers have drained it) and “disconnected” (the input reader is dropped). When all senders or all receivers associated with a channel get dropped, the channel becomes disconnected.

I need the stats thread to observe the channel state, but not interfere with the contents. It can’t try reading from the channel because it might end up consuming data. It can’t take a copy of the sender instead of the receiver, because that will prevent the sender getting dropped. There’s no is_disconnected() method, and no other way I can see to check this by implied side-effects. A bigger bounded size is probably a sufficient workaround in this case, but this is Rust and I want to be correct :slight_smile:

I can also think of some other, separate signalling channels between the threads. Maybe the simplest is to add another field/flag to the stats struct. I’m sure that would work, but it really feels like this is something the channel should be able to tell me directly. Have I missed something? Is this a missing feature?


Rust simple job queue
#2

raised as https://github.com/crossbeam-rs/crossbeam/issues/302