Let's say I am trying to implement tail -F
in async Rust:
- Each 50ms, I need to check if the file was rotated (by comparing inode), and in the case it was rotated I need to re-open it.
- Otherwise, I need to read existing file, line by line.
Every individual item from the list above is trivial. There is stream in tokio that corresponds to File (tokio_util::io::ReaderStream
and tokio::io::BufReader
), and there is stream in tokio that corresponds to timer events (tokio::time::interval(Duration::from_millis(200))
).
It would be trivial, if I needed an async function that returns a single line (I would just use select!
).
But I need a stream of lines (String
's).
What is the correct way to implement this?
Is there a way to convert an async function into a Stream?
Do I need to create my own stream, implementing fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
?
If yes, then how I ensure timer it scheduled? Code like this will not work:
fn poll_next(self: core::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>)
-> std::task::Poll<Option<Self::Item>> {
match self.get_mut().wake_up.poll_next_unpin(cx) {
std::task::Poll::Ready(t) => {
println!("Timer fired!");
// return std::task::Poll::Ready(Some(Ok(Default::default())));
return std::task::Poll::Pending;
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
std::task::Poll::Pending
}
because when timer is ready, I return Pending
(because there is no line that I could possibly return). Timer does not schedule itself "proactively" in case it is ready. So, it essentially gets stuck forever.
But if I return Ready
, allowing caller's while()
to call poll_next
, it fixes the issue with timer, but then what should I return? Empty string? But there were not file read? This looks strange...
Is there a uniform / generic way to convert async function into Stream?..