How do I poll a Stream behind a RwLock?

I have an axum server with a state struct. In it I want to use delay_map::HashMapDelay to keep a map of sessions that are removed after a timeout. Since the state is shared, the map is under a tokio::sync::RwLock.

But from what I understand, in order to actually remove the expired entries, I need to call next() since it's a Stream. So I'm trying to spawn a task that does that in a loop. However, since the map is behind a RwLock, I can't make that work, it gets stuck; I think it deadlocks since it needs to hold the lock while it waits in the next() call.

Any suggestions on how to make this work, or maybe a totally different approach to do the same thing?

You could try using some wrapper type like

struct RwLockStream<'a, T>(pub &'a RwLock<T>);

impl<'a, T: Stream + Unpin> Stream for RwLockStream<'a, T> {
    type Item = T::Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        self.0.write().unwrap().poll_next_unpin(cx)
    }
}

Rust Playground

then replace your current call of

state_struct.session_map.write().unwrap().next().await

with something like

RwLockStream(&state_struct.session_map).next().await

In the same way, you can still access to all other convenience functionality of StreamExt/TryStreamExt.


This way, the lock isn’t kept between different polls. You should probably still make sure not to have multiple active such next calls running at the same time, I believe otherwise wakeups might be lost – however I believe it should be fine to be inserting or removing items from/to the map while another task/thread is awaiting the expired-items stream.


Alternatively (e.g. if this is only a single-use thing anyway), you could use std::future::poll_fn to call HashMapDelay::poll_expired directly, so where the former .next().await call was:

std::future::poll_fn(|cx| state_struct.session_map.write().unwrap().poll_expired(cx)).await
3 Likes

Thank you very much! I'm glad I asked because it would take me ages to come up with that solution.