Migrate tokio from 0.1 to 0.2

#[cfg(test)]

mod tests {

    use super::*;

    use tokio;

    use tokio::time::interval;

    #[test]

    fn test_prio_retry() {

        let mut items = vec![0, 1, 2, 3, 3, 3, 0, 1, 2, 2, 6, 5, 7].into_iter();

        let len = items.len();

        let items = interval(Duration::from_millis(200))

            .take(len as usize)

            .map(move |_| items.next().unwrap())

            .map_err(|e| error!("can't consume interval: {:?}", e));

        let exp: Vec<i64> = vec![0, 1, 2, 3, 3, 3, 6, 7];

        let stream = PrioRetry::new(items, Duration::from_millis(100));

        let res = stream.collect();

        tokio::run(res.then(move |res| {

            match res {

                Err(_) => assert!(false),

                Ok(items) => assert_eq!(items, exp, "can't get expected items from prio retry"),

            };

            Ok(())

        }));

    }

}
reqwest = { version = "0.10", features = ["blocking","json"] }
tokio = { version = "0.2", features = ["full"] }

it throws out some errors

no method named `map_err` found for struct `futures_util::stream::stream::map::Map<futures_util::stream::stream::take::Take<tokio::time::interval::Interval>, [closure@src/future/prio_retry.rs:151:18: 151:48 items:_]>` in the current scope
cannot find function `run` in crate `tokio`
no method named `then` found for struct `futures_util::stream::stream::collect::Collect<future::prio_retry::PrioRetry<_>, _>` in the current scope

can someone help me ?

The combinators are currently available on the StreamExt extension trait, which you have to import to use them. That said, the combinators are used much less with Tokio v0.2 as we have async/await:

use tokio::stream::StreamExt;

while let Some(item) = stream.next().await {
    ...
}

Note that tokio::run doesn't exist anymore, so you should now use this instead:

#[tokio::test]
async fn test_prio_retry() {
    ...
}

or use block_on

use tokio::runtime::Runtime;

let mut rt = Runtime::new().unwrap();
rt.block_on(your_future);

Also, please don't make every second line empty; it makes it hard to read.

2 Likes

sorry,
why tokio::run be replaced? can you tell me how to change it?

Instead of tokio::run, you should now define your test as:

#[tokio::test]
async fn test_prio_retry() {
    ...
}

Then you can simply use .await, or in the case of a stream, use a while loop.

// before
let res = stream.collect();
tokio::run(res.then(move |res| {
    ...
}));
// now
while let Some(res) = stream.next().await {
    ...
}

what need to do within await block ??
is the block return Futrure struct or Result Struct??

If you want more help, please edit your posts to not have empty lines all over the place.

pub struct PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    delay_duration: Duration,
    delayed_item: Option<DelayedItem<S::Item>>,
    stream: Fuse<S>,
}
impl<S> PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    pub fn new(stream: S, delay_duration: Duration) -> Self {
        Self {
            delay_duration,
            delayed_item: None,
            stream: stream.fuse(),
        }
    }
}
impl<S> Stream for PrioRetry<S>
where
    S: Stream,
    S::Item: Ord + Clone + Eq,
{
    type Item = S::Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match self.stream.poll_next(cx) {
                Poll::Pending => {
                    break;
                }
                Poll::Ready(Some(new_item)) => {
                    if let Some(ref mut delayed_item) = self.delayed_item {
                        if delayed_item.value < new_item {
                            self.delayed_item = Some(DelayedItem::new(new_item.clone()));
                            return Poll::Ready(Some(new_item));
                        } else if delayed_item.value == new_item {
                            delayed_item.exp_backoff(self.delay_duration);
                        }
                    } else {
                        self.delayed_item = Some(DelayedItem::new(new_item.clone()));
                        return Poll::Ready(Some(new_item));
                    }
                }
                Poll::Ready(None) => {
                    return Poll::Ready(None);
                } // Err(e) => {
                  //     return Err(Error(Kind::Inner(e)));
                  // }
            }
        }

        if let Some(ref mut delayed_item) = self.delayed_item {
            if let Some(ref mut delay) = delayed_item.delay {
                match Pin::new(delay).poll(cx) {
                    Poll::Pending => {}
                    Poll::Ready(()) => {
                        // we yield a clone, since we need the old copy to check if an item was requeued
                        delayed_item.pause();
                        return Poll::Ready(Some(delayed_item.value.clone()));
                    } // Err(e) => {
                      //     return Err(Error(Kind::Timer(e)));
                      // }
                }
            }
        };
        Poll::Pending
    }
}

some thing worng with match self.stream.poll_next(cx). and how to return a Err(Error(Kind::Timer(e))); type with current futures... can you help??? very appreciated

no method named `poll_next` found for struct `futures_util::stream::stream::fuse::Fuse<S>` in the current scope

To call the poll_* methods, you need to pin it with Pin::new(&mut self.stream). You also need to add Unpin to the where bounds. See playground.

To return errors in your stream, you should make your item type a Result.

type Item = Result<S::Item, YourErrorType>;

thank you !!!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.