How to call poll on channel's Recv in Future

struct Object(async_broadcast::Receiver<u32>);
impl Future for Object {
type Output = ();

fn poll(...) -> ... {
    let recv = self.0.recv(); // Recv<'_, _>
    recv.poll_unpin(..);
}

Looking through async_broadcast's implementation of <Recv as Future>::poll, seems like calling recv() then poll puts you in a queue to listen for events. Holding on to the Recv object you get from recv() and polling that every time, instead of creating a new Recv every time, is a good idea methinks. poll on Recv also handles overflow errors, whereas <Receiver as Stream>::poll_next doesn't seem to care for some reason.

Given the above reasons, I want to store the Recv object and poll that object in my own poll method. Self-referential issue arises. Recv owns a reference to Receiver. Looking at other channel implementations, they all basically have the same self-referential implementation. This has to be a common thing other people have run into. What's the solution? ouroboros? I ask because self-referential structs are the only thing in Rust that gives me great unease, even if I think I know what I'm doing.

Don't bother. It's a trivial constructor.

No, just don't use self-referential types. Some core futures do use them for performance, but avoid them unless absolutely needed (which they most of the time aren't).

2 Likes

I have been using Rust for 8 years and have been constantly deeply immersing myself in every conversation and bit of documentation I can find that discusses the underlying theory behind lifetimes. And I still don't know what I'm doing.

Don't worry about it. The only way you can gain an intuition for this stuff is through the experience of mistakes.

1 Like

Yes but it's poll finishes that construction on the first call. Looking at listen() it doesn't appear to be trivial. I assume the outputs will be the same, but by repeatedly recreating Recv, you are losing your spot on it's event listener.

I've been using Rust on and off for several years now (it's a nice language but I can't use it for work) and it's only recently that I started taking off the training wheels and delving more into it's lower level unsafe facilities that my learning, and appreciation, of it's safety guarantees has accelerated.

I read through it and it's basically an Arc::clone() and an insertion into a linked list. It doesn't seem too bad. I would definitely try to see first whether re-creating the Recv actually incurs a significant (or even measurable) performance impact.

1 Like

Note that both Recv::poll and Event::listen acquire a lock. You probably want to avoid that if you can.

1 Like

Using an async block is usually my solution (playground):

struct Foo<T> {
    rx_state: ReceiverState<T>,
}

enum ReceiverState<T> {
    Idle(Receiver<T>),
    Receiving(Pin<Box<dyn Future<Output = (Receiver<T>, Result<T, RecvError>)>>>),
    Poisoned,
}

impl<T: Clone + 'static> Future for Foo<T> {
    type Output = Result<T, RecvError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut future = match std::mem::replace(&mut self.rx_state, ReceiverState::Poisoned) {
            ReceiverState::Idle(mut rx) => Box::pin(async move {
                let result = rx.recv().await;
                (rx, result)
            }),
            ReceiverState::Receiving(future) => future,
            ReceiverState::Poisoned => panic!(),
        };
        match future.as_mut().poll(cx) {
            Poll::Pending => {
                self.rx_state = ReceiverState::Receiving(future);
                Poll::Pending
            }
            Poll::Ready((rx, result)) => {
                self.rx_state = ReceiverState::Idle(rx);
                Poll::Ready(result)
            }
        }
    }
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.