pub struct PrioRetry<S>
where
S: Stream,
S::Item: Ord + Clone + Eq,
{
delay_duration: Duration,
delayed_item: Option<DelayedItem<S::Item>>,
stream: Fuse<S>,
}
impl<S> PrioRetry<S>
where
S: Stream,
S::Item: Ord + Clone + Eq,
{
pub fn new(stream: S, delay_duration: Duration) -> Self {
Self {
delay_duration,
delayed_item: None,
stream: stream.fuse(),
}
}
}
impl<S> Stream for PrioRetry<S>
where
S: Stream,
S::Item: Ord + Clone + Eq,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.stream.poll_next(cx) {
Poll::Pending => {
break;
}
Poll::Ready(Some(new_item)) => {
if let Some(ref mut delayed_item) = self.delayed_item {
if delayed_item.value < new_item {
self.delayed_item = Some(DelayedItem::new(new_item.clone()));
return Poll::Ready(Some(new_item));
} else if delayed_item.value == new_item {
delayed_item.exp_backoff(self.delay_duration);
}
} else {
self.delayed_item = Some(DelayedItem::new(new_item.clone()));
return Poll::Ready(Some(new_item));
}
}
Poll::Ready(None) => {
return Poll::Ready(None);
} // Err(e) => {
// return Err(Error(Kind::Inner(e)));
// }
}
}
if let Some(ref mut delayed_item) = self.delayed_item {
if let Some(ref mut delay) = delayed_item.delay {
match Pin::new(delay).poll(cx) {
Poll::Pending => {}
Poll::Ready(()) => {
// we yield a clone, since we need the old copy to check if an item was requeued
delayed_item.pause();
return Poll::Ready(Some(delayed_item.value.clone()));
} // Err(e) => {
// return Err(Error(Kind::Timer(e)));
// }
}
}
};
Poll::Pending
}
}
some thing worng with match self.stream.poll_next(cx). and how to return a Err(Error(Kind::Timer(e))); type with current futures... can you help??? very appreciated
no method named `poll_next` found for struct `futures_util::stream::stream::fuse::Fuse<S>` in the current scope