How to use Fuse stream

pub struct PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    delay_duration: Duration,
    delayed_item: Option<DelayedItem<S::Item>>,
    stream: Fuse<S>,
}
impl<S> PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    pub fn new(stream: S, delay_duration: Duration) -> Self {
        Self {
            delay_duration,
            delayed_item: None,
            stream: stream.fuse(),
        }
    }
}
impl<S> Stream for PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    type Item = S::Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match self.stream.poll_next(cx) {
                Poll::Pending => {
                    break;
                }
                Poll::Ready(Some(new_item)) => {
                    if let Some(ref mut delayed_item) = self.delayed_item {
                        if delayed_item.value < new_item {
                            self.delayed_item = Some(DelayedItem::new(new_item.clone()));
                            return Poll::Ready(Some(new_item));
                        } else if delayed_item.value == new_item {
                            delayed_item.exp_backoff(self.delay_duration);
                        }
                    } else {
                        self.delayed_item = Some(DelayedItem::new(new_item.clone()));
                        return Poll::Ready(Some(new_item));
                    }
                }
                Poll::Ready(None) => {
                    return Poll::Ready(None);
                } // Err(e) => {
                  //     return Err(Error(Kind::Inner(e)));
                  // }
            }
        }

        if let Some(ref mut delayed_item) = self.delayed_item {
            if let Some(ref mut delay) = delayed_item.delay {
                match Pin::new(delay).poll(cx) {
                    Poll::Pending => {}
                    Poll::Ready(()) => {
                        // we yield a clone, since we need the old copy to check if an item was requeued
                        delayed_item.pause();
                        return Poll::Ready(Some(delayed_item.value.clone()));
                    } // Err(e) => {
                      //     return Err(Error(Kind::Timer(e)));
                      // }
                }
            }
        };
        Poll::Pending
    }
}

some thing worng with match self.stream.poll_next(cx). and how to return a Err(Error(Kind::Timer(e))); type with current futures... can you help??? very appreciated

no method named `poll_next` found for struct `futures_util::stream::stream::fuse::Fuse<S>` in the current scope

Please do not post the same question multiple times.