Dropping a futures_channel sender while awaiting a select causes panic

I have two receivers converging their outputs into a single closure as such:

 while let Some(transfer) = futures_util::stream::select(recv.as_mut(), task_receiver.as_mut()).next().await {
   if !Self::handle_transfer(co_runner.as_mut(), transfer).await? {
       break;
   }
}

Once the sender for recv drops in a parallel thread, I get an error:

thread 'tokio-runtime-worker' panicked at 'Receiver::next_message called after `None`

This panic is caused in this function of futures_channel (latest: 0.3.4):

fn next_message(&mut self) -> Poll<Option<T>> {
        let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
        // Pop off a message
        match unsafe { inner.message_queue.pop_spin() } {
            Some(msg) => {
                // Decrement number of messages
                self.dec_num_messages();

                Poll::Ready(Some(msg))
            }
            None => {
                let state = decode_state(inner.state.load(SeqCst));
                if state.is_open || state.num_messages != 0 {
                    // If queue is open, we need to return Pending
                    // to be woken up when new messages arrive.
                    // If queue is closed but num_messages is non-zero,
                    // it means that senders updated the state,
                    // but didn't put message to queue yet,
                    // so we need to park until sender unparks the task
                    // after queueing the message.
                    Poll::Pending
                } else {
                    // If closed flag is set AND there are no pending messages
                    // it means end of stream
                    self.inner = None;
                    Poll::Ready(None)
                }
            }
        }
    }

This seems to me like a lack of interoperability between select and dropping a sender. Is there a way to fix this without writing too many extra lines of code? I can think of some ways, but I'd prefer an idiomatic way. Ideally, the sender would drop, but the select would continue to poll the future that is still producing results

You can fuse the stream.

1 Like

Rather than reconstructing the select every time you need an item, you should construct it once wrapping the streams. It will internally track completion of the streams then and stop polling each as they complete.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.