Currently I'm trying to implement an async broadcast middleware using tokio. I created a Sender task that reads client messages from a tokio mpsc channel and writes them into a stream. So that I don't perform a system call (by writing to the stream) for each message, I created 2 parameters that allow me to buffer messages before writing:
- N number of buffered messages
- T reading timeout from the mpsc channel
Currently the task is simply reading from the mpsc channel. Whenever a message comes, I buffer it and I increment a counter. Then I check if the value in this counter is greater than N and if it is, I write the buffered messages into the stream and the counter goes to 0.
If the previous counter never reaches N, the buffered will never be flushed. So what I want to do is add the T timeout for reading. If after T time no messages were received from the channel, then the buffer should be flushed (if there are messages) and the T timeout multiplied by 2. While there are no messages, the timeout should increase until it reaches a ceiling value that I also have. But, if a message arrives, then I buffer the message and the T timeout "resets" to the starting value.
For now, I've been able to create a future where I added a timeout for reading from the mpsc channel using the Timeout struct. But I couldn't find a way to "restart" the future after the timeout has ended. What I mean is that I have a future with a timeout for reading messages from the mpsc channel and after the timeout ends this future also ends.
Any suggestion on how I could implement this Sender task or at least restart a future after a timeout has ended?