I have an axum
server with a state struct. In it I want to use delay_map::HashMapDelay
to keep a map of sessions that are removed after a timeout. Since the state is shared, the map is under a tokio::sync::RwLock
.
But from what I understand, in order to actually remove the expired entries, I need to call next()
since it's a Stream
. So I'm trying to spawn a task that does that in a loop. However, since the map is behind a RwLock
, I can't make that work, it gets stuck; I think it deadlocks since it needs to hold the lock while it waits in the next()
call.
Any suggestions on how to make this work, or maybe a totally different approach to do the same thing?
You could try using some wrapper type like
struct RwLockStream<'a, T>(pub &'a RwLock<T>);
impl<'a, T: Stream + Unpin> Stream for RwLockStream<'a, T> {
type Item = T::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
self.0.write().unwrap().poll_next_unpin(cx)
}
}
Rust Playground
then replace your current call of
state_struct.session_map.write().unwrap().next().await
with something like
RwLockStream(&state_struct.session_map).next().await
In the same way, you can still access to all other convenience functionality of StreamExt
/TryStreamExt
.
This way, the lock isn’t kept between different polls. You should probably still make sure not to have multiple active such next
calls running at the same time, I believe otherwise wakeups might be lost – however I believe it should be fine to be inserting or removing items from/to the map while another task/thread is awaiting the expired-items stream.
Alternatively (e.g. if this is only a single-use thing anyway), you could use std::future::poll_fn
to call HashMapDelay::poll_expired
directly, so where the former .next().await
call was:
std::future::poll_fn(|cx| state_struct.session_map.write().unwrap().poll_expired(cx)).await
3 Likes
Thank you very much! I'm glad I asked because it would take me ages to come up with that solution.