Help implementing message buffering and flushing in async Rust


Currently I'm trying to implement an async broadcast middleware using tokio. I created a Sender task that reads client messages from a tokio mpsc channel and writes them into a stream. So that I don't perform a system call (by writing to the stream) for each message, I created 2 parameters that allow me to buffer messages before writing:

  • N number of buffered messages
  • T reading timeout from the mpsc channel

Currently the task is simply reading from the mpsc channel. Whenever a message comes, I buffer it and I increment a counter. Then I check if the value in this counter is greater than N and if it is, I write the buffered messages into the stream and the counter goes to 0.

If the previous counter never reaches N, the buffered will never be flushed. So what I want to do is add the T timeout for reading. If after T time no messages were received from the channel, then the buffer should be flushed (if there are messages) and the T timeout multiplied by 2. While there are no messages, the timeout should increase until it reaches a ceiling value that I also have. But, if a message arrives, then I buffer the message and the T timeout "resets" to the starting value.

For now, I've been able to create a future where I added a timeout for reading from the mpsc channel using the Timeout struct. But I couldn't find a way to "restart" the future after the timeout has ended. What I mean is that I have a future with a timeout for reading messages from the mpsc channel and after the timeout ends this future also ends.

Any suggestion on how I could implement this Sender task or at least restart a future after a timeout has ended?

On a stream of bytes, you can do something like this with a BufReader/BufWriter to read bigger chunks before sending them on. I bet the easiest design for your scenario is to make a similar type which will implement stream, so you can poll it continuously and which internally manages the buffer, the counter and the timer.

In your poll_next fn you can then just poll the timer to see if it returns pending or if it runs out. If it's still pending, look at your counter and pull more data from your channel. When it's time to send, use send_all on your sink to avoid flushing between each item. Maybe this stream can just return the number of messages processed on each poll_next.

The other way to do stuff like this is using select from the futures library, which allows you to select on a future or a timer whichever finishes first, but given that you are working with streams rather than futures, designing this might warrant you make a custom future. You will probably end up with a bunch of Arc<Mutex<counter>> and similar sync for the buffer and the timer as different tasks would have to access and update them.

I see what you mean. That might work. What do you recommend to use as a timer? I've been trying with the Delay future, but I can't seem able to reset it after it has return Async::Read(())

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.