I have a public object which holds a collection of futures::mpsc::Sender
. It exposes a Sink impl and when the user sends something we forward it to all channels.
I'm wondering about the Sink impl. For now I have a naive but hopefully correct logic that does:
fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
{
// As soon as any is not ready, we are not ready
//
for obs in self.get_mut().observers.iter_mut()
{
if let Some( ref mut o ) = obs
{
let res = ready!( Pin::new(o).poll_ready(cx) );
// Errors mean disconnected, so drop.
//
if res.is_err()
{
*obs = None;
}
}
}
Ok(()).into()
}
So basically on every call to poll_ready
I start looping from the beginning until any channel returns Pending
. As soon as that happens I return Pending
and start over next time.
In principle if any channel has returned Pending
, the next call to our poll_ready
function should be that channel waking up the task, so I'm tempted to do something like calling all of them and storing a counter of how many returned Pending
and just counting down on all subsequent calls, avoiding the loop.
Is this sensible? It makes assumptions about how the user uses the object, as in they don't just call poll_ready
outside of the standard situation of a task that is going to block on the waker. Otherwise things come crumbling down.
It obviously takes Pin<&mut Self>
, so normally only one such request should exist until it completes, but it can be cheated...
It's a hypothetical performance improvement, so it's not very high on my priority list, but I was wondering what others think of this?