Impl Sink that fans out, are my assumptions correct?

I have a public object which holds a collection of futures::mpsc::Sender. It exposes a Sink impl and when the user sends something we forward it to all channels.

I'm wondering about the Sink impl. For now I have a naive but hopefully correct logic that does:

fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
   // As soon as any is not ready, we are not ready
   //
   for obs in self.get_mut().observers.iter_mut()
   {
      if let Some( ref mut o ) = obs
      {
         let res = ready!( Pin::new(o).poll_ready(cx) );

         // Errors mean disconnected, so drop.
         //
         if res.is_err()
         {
            *obs = None;
         }
      }
   }

   Ok(()).into()
}

So basically on every call to poll_ready I start looping from the beginning until any channel returns Pending. As soon as that happens I return Pending and start over next time.

In principle if any channel has returned Pending, the next call to our poll_ready function should be that channel waking up the task, so I'm tempted to do something like calling all of them and storing a counter of how many returned Pending and just counting down on all subsequent calls, avoiding the loop.

Is this sensible? It makes assumptions about how the user uses the object, as in they don't just call poll_ready outside of the standard situation of a task that is going to block on the waker. Otherwise things come crumbling down.

It obviously takes Pin<&mut Self>, so normally only one such request should exist until it completes, but it can be cheated...

It's a hypothetical performance improvement, so it's not very high on my priority list, but I was wondering what others think of this?

You would still have to check whether all senders are ready after counting down, in case one of those wakes was spurious.

I think that would be ok with futures::mpsc::Sender specifically, but in general you could have multiple sinks multiplexed over a single wake event, which could be optimized itself to only wake the task once, even if that task was registered via multiple wakers (this is why Waker::will_wake exists).

Though, thinking about it a little more, I’m not sure that that would work if the task migrates to a different executor. In that case all the outstanding wakers are abandoned, and the sink would need to call into all the underlying senders again to reregister them with the new waker. You could maybe get around that by storing the old waker and checking that the new waker is equivalent before skipping the loop, but at that point the loop itself might just be cheaper.

Thanks for clarifying. I thought about having to check it was the same waker, but I thought it could only happen if used in unconventional ways. I didn't think about spurious wakeups and changing executors.

For a small amount of channels just looping is surely the most sane option. Just not sure how it's going to scale and since it's library code, it's only a matter of time before someone wants to have thousands of observers to something... but it seems changing it will definitely explode complexity and error-proneness of the code.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.