Implement futures::Stream for paginator

Howdy!

I am using Redis Streams through the redis-rs crate.

I'm expecting to read a lot of data from the streams, so I'd like to use futures::Stream trait. However, the redis-rs crate does not expose such functionality, but instead loads all the data in memory at once.

So I'm trying to write a futures::Stream implementation using XRANGE ... COUNT n functionality, which is doing pagination.

The idea is simple:

  1. Call XRANGE ... COUNT n first, with n being the size for each page by using the redis-rs client (this is a Future).
  2. Get the page from above, which will be a Vec<StreamId>, and exhaust the result returning Poll::Ready(Some(id)), for each id in the page.
  3. When the page is exahusted, call back Redis as per point 1. changing the bounds of the request (Future) -- if empty Vec is returned, return Poll::Ready(None) to stop the Stream.

I don't know much of pinning, but I tried to look around and read about it and this is what I could come up with:

#[pin_project]
struct RedisStream {
    #[pin]
    conn: redis::aio::Connection,
    stream_name: &'static str,
    page_size: usize,
    #[pin]
    last_sequence_number: usize,
    #[pin]
    chunk: Option<Chunk>,
}

#[derive(Debug)]
struct Chunk {
    position: usize,
    data: Vec<StreamId>,
}

impl Stream for RedisStream {
    type Item = StreamId;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let now = std::time::SystemTime::now();

        let page_size = self.page_size;
        let stream_name = self.stream_name;

        let mut this = self.as_mut().project();

        println!(
            "{:?} - Last: {}, Chunk: {:?}",
            now, this.last_sequence_number, this.chunk
        );

        // Try to read from the cached page, and when completely exhausted,
        // set the chunk to None to go to Redis for the next poll_next call.
        if let Some(mut chunk) = this.chunk.as_mut().as_pin_mut() {
            let id = chunk.data[chunk.position].clone();
            let (v, _) = parse_entry_id(&id.id).unwrap();

            this.last_sequence_number.set((v as usize) + 1);
            chunk.position += 1;

            if chunk.position == chunk.data.len() {
                this.chunk.set(None);
            }

            return Poll::Ready(Some(id));
        }

        println!(
            "{:?} - Last: {}, Calling redis",
            now, this.last_sequence_number
        );

        // Retrieve the next page from Redis using XRANGE .. COUNT n
        let result: StdResult<StreamRangeReply, _> = ready!(this
            .conn
            .xrange_count(
                stream_name,
                *this.last_sequence_number.as_ref(),
                "+",
                page_size
            )
            .as_mut()
            .poll(cx));

        println!(
            "{:?} - Last: {}, Called redis: {:?}",
            now, this.last_sequence_number, result
        );

        let page = result.unwrap();

        // No more data from Redis, close the Stream.
        if page.ids.is_empty() {
            return Poll::Ready(None);
        }

        // Set the chunk returned by Redis to produce data from memory,
        // starting from the next poll_next iteration.
        this.chunk.set(Some(Chunk {
            position: 0,
            data: page.ids,
        }));

        Poll::Pending
    }
}

However, this is the output I get:

SystemTime { tv_sec: 1603703455, tv_nsec: 793395391 } - Last: 4, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 793395391 } - Last: 4, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 793494339 } - Last: 4, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 793494339 } - Last: 4, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 793749880 } - Last: 4, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 793749880 } - Last: 4, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 793749880 } - Last: 4, Called redis: Ok(StreamRangeReply { ids: [StreamId { id: "4-2", map: {"event": binary-data([129, 0, 192]), "source_id": string-data('"test-source-2"')} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794088292 } - Last: 4, Chunk: Some(Chunk { position: 0, data: [StreamId { id: "4-2", map: {"event": binary-data([129, 0, 192]), "source_id": string-data('"test-source-2"')} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794152241 } - Last: 5, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 794152241 } - Last: 5, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 794152241 } - Last: 5, Called redis: Ok(StreamRangeReply { ids: [StreamId { id: "4-2", map: {"event": binary-data([129, 0, 192]), "source_id": string-data('"test-source-2"')} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794513314 } - Last: 5, Chunk: Some(Chunk { position: 0, data: [StreamId { id: "4-2", map: {"event": binary-data([129, 0, 192]), "source_id": string-data('"test-source-2"')} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794574181 } - Last: 5, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 794574181 } - Last: 5, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 794574181 } - Last: 5, Called redis: Ok(StreamRangeReply { ids: [StreamId { id: "4-2", map: {"source_id": string-data('"test-source-2"'), "event": binary-data([129, 0, 192])} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794843492 } - Last: 5, Chunk: Some(Chunk { position: 0, data: [StreamId { id: "4-2", map: {"source_id": string-data('"test-source-2"'), "event": binary-data([129, 0, 192])} }] })
SystemTime { tv_sec: 1603703455, tv_nsec: 794898904 } - Last: 5, Chunk: None
SystemTime { tv_sec: 1603703455, tv_nsec: 794898904 } - Last: 5, Calling redis
SystemTime { tv_sec: 1603703455, tv_nsec: 794898904 } - Last: 5, Called redis: Ok(StreamRangeReply { ids: [] })

As you can see, Redis XRANGE function is called 3 times with the same parameters, even after the last_sequence_number parameter has changed to a value that should produce no entries.

This means the futures::Stream call will end up with 3 duplicated results of the same entry.

I'm assuming I'm either messing up the call to the Future or the memory updates using pinning.

Any ideas?
Thanks :smiley:

A few things I notice:

  1. xrange_count returns a future, and you need to store this future in the struct. As it is now, every poll creates a new future, polls it once and then destroys it. You must keep it around.
  2. You should only #[pin] futures and streams and such. None of the fields currently in the struct need a #[pin].

Generally I recommend implementing this using the async-stream crate.

Thank you! With async-stream it worked flawlessly:

struct RedisPaginatedStream {
    conn: redis::aio::Connection,
    stream_name: String,
    page_size: usize,
    from: usize,
}

impl RedisPaginatedStream {
    fn into_stream(mut self) -> impl Stream<Item = redis::RedisResult<StreamId>> + 'static {
        async_stream::try_stream! {
            let mut from = self.from;

            loop {
                let result: StreamRangeReply = self.conn.xrange_count(&self.stream_name, from, "+", self.page_size).await?;
                let ids = result.ids;

                if ids.is_empty() {
                    break;
                }

                for id in ids {
                    let (v, _) = parse_entry_id(&id.id).unwrap();
                    from = (v + 1) as usize;

                    yield id;
                }
            }
        }
    }
}

1 Like