Async Streams with Async Mutex

I have a use case for mocking tokio::stream::Streams so that I can manually progress them from a test, then make incremental assertions about the behavior of a system. If you want to skip to the code, a full gist is here: mockable_stream.rs · GitHub

I've done this for other async code, and found it's fairly simple to have some internal state struct that's stored as Arc<Mutex<T>> (so the mockable thing can be Clone), and the state object contains either a pair of mpsc Sender/Receivers, a VecDeque, or Option<_> that allows calls like mock_thing.mock_response(...) from a test, and then the code under test is unaware that mock_thing.get_response(...).await is actually waiting on the test mocking some response.

The hitch with mocking async streams is you end up with this state object locked with an async mutex, and have to figure out implementing poll_next() by polling the mutex rather than awaiting like any other async method could. I found this async-wg use case that got me through it with some modifications, but I'm wondering if there's anything I've missed or could do better given the current constraints.

I'm likely moving away from async streams for reasons like this, since I'm fairly sure I'd be far better served by a hand-written pub async fn next(...) -> ... of my own than worrying about streams and poll_next, but I'm looking to learn whatever I can about the paradigm since I'm sure there's more to learn about how/why I got here.

  • Are you sure you really need an async mutex? A blocking one is likely enough.
  • If you do need it, then consider using the async-stream crate to implement your stream using async/await.
  • You may find tokio_test::stream_mock useful.
2 Likes

I hadn't run across stream_mock yet, thanks! It would be helpful for some of my testing, but for other parts I need more granularity than it offers, like being able to release a message mid-test, observe behavior, then release another etc.

In the gist (snippet below), wouldn't using the sender from inside the lock constitute holding a sync mutex across an .await? I'm using deadqueues instead of Sender/Receiver pairs, but the issue is still the same I think.

async fn mock_response(&mut self, i: i64) {
    self.state.lock().await.tx.send(i).await.unwrap()
}

My reason for using async internally was that when a given task under test is running, when it calls .next() from the stream, I just have it await the queue/channel so it blocks asynchronously, but I guess it would be the same effect if I used a std::sync::Mutex, and returned Poll:Pending for any failure to try_lock, or when a VecDeque or whatever internal queue of messages I have is empty. Is that what you had in mind?

Maybe my main issue is I'm still thinking of async as event-loop like (Python background), but in Rust it's sync-polling all the way down, even if the top-level API feels more like async/await in other languages. Is that a fair assessment?

In the case of a channel, you can probably have the stream exclusively own the receiver, and something else exclusively own the sender. This way, they're not in a mutex at all.

Another thing is, if the channel has a poll_recv method, you can call that under an std mutex if you're okay with other operations happening on it between polls.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.