I have two receivers converging their outputs into a single closure as such:
while let Some(transfer) = futures_util::stream::select(recv.as_mut(), task_receiver.as_mut()).next().await {
if !Self::handle_transfer(co_runner.as_mut(), transfer).await? {
break;
}
}
Once the sender for recv
drops in a parallel thread, I get an error:
thread 'tokio-runtime-worker' panicked at 'Receiver::next_message called after `None`
This panic is caused in this function of futures_channel
(latest: 0.3.4):
fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
// Decrement number of messages
self.dec_num_messages();
Poll::Ready(Some(msg))
}
None => {
let state = decode_state(inner.state.load(SeqCst));
if state.is_open || state.num_messages != 0 {
// If queue is open, we need to return Pending
// to be woken up when new messages arrive.
// If queue is closed but num_messages is non-zero,
// it means that senders updated the state,
// but didn't put message to queue yet,
// so we need to park until sender unparks the task
// after queueing the message.
Poll::Pending
} else {
// If closed flag is set AND there are no pending messages
// it means end of stream
self.inner = None;
Poll::Ready(None)
}
}
}
}
This seems to me like a lack of interoperability between select and dropping a sender. Is there a way to fix this without writing too many extra lines of code? I can think of some ways, but I'd prefer an idiomatic way. Ideally, the sender would drop, but the select would continue to poll the future that is still producing results