Wrap 2 streams in another stream, or create a stream from async function

Let's say I am trying to implement tail -F in async Rust:

  • Each 50ms, I need to check if the file was rotated (by comparing inode), and in the case it was rotated I need to re-open it.
  • Otherwise, I need to read existing file, line by line.

Every individual item from the list above is trivial. There is stream in tokio that corresponds to File (tokio_util::io::ReaderStream and tokio::io::BufReader), and there is stream in tokio that corresponds to timer events (tokio::time::interval(Duration::from_millis(200))).

It would be trivial, if I needed an async function that returns a single line (I would just use select!).

But I need a stream of lines (String's).

What is the correct way to implement this?

Is there a way to convert an async function into a Stream?

Do I need to create my own stream, implementing fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)?

If yes, then how I ensure timer it scheduled? Code like this will not work:

   fn poll_next(self: core::pin::Pin<&mut Self>,
                cx: &mut std::task::Context<'_>)
                -> std::task::Poll<Option<Self::Item>> {
      match self.get_mut().wake_up.poll_next_unpin(cx) {
         std::task::Poll::Ready(t) => {
            println!("Timer fired!");
            // return std::task::Poll::Ready(Some(Ok(Default::default())));
            return std::task::Poll::Pending;
         }
         std::task::Poll::Pending => return std::task::Poll::Pending,
      }
      std::task::Poll::Pending
   }

because when timer is ready, I return Pending (because there is no line that I could possibly return). Timer does not schedule itself "proactively" in case it is ready. So, it essentially gets stuck forever.

But if I return Ready, allowing caller's while() to call poll_next, it fixes the issue with timer, but then what should I return? Empty string? But there were not file read? This looks strange...

Is there a uniform / generic way to convert async function into Stream?..

Can you make your code work by using the stream! macro from the async-stream crate? I.e. yield a line when it comes available?

1 Like

Yeah... This looks like what I need... Thanks!

P.S. Did not know that rust has yield

In case anyone stumbles on this too.

Here is the code that works as intended:

pub fn tail(p: std::path::PathBuf,
            pill: crate::poison_pill::Pill)
            -> impl futures::Stream<Item = Result<String>> {
   let mut interval = tokio::time::interval(std::time::Duration::from_millis(200));
   async_stream::stream! {
      loop {
         tokio::select! {
            _ = pill.received() => {
               log::info!("tail: Exiting...");
               return;
            }
            _ = interval.tick() => {
               println!("Timer fired!");
               yield Ok("1234".into());
            }
         }
      }
   }
}

#[cfg(test)]
mod tests {
   use futures::StreamExt;
   use super::*;

   #[tokio::test]
   async fn my_async_test() {
      let mut cs = crate::poison_pill::ChildrenStopper::new();
      let stream = tail(std::path::PathBuf::new(), cs.register_child("tail"));
      futures::pin_mut!(stream);

      while let Some(item) = stream.next().await {
         println!("{:?}", item);
      }
   }
}
1 Like

Rust doesn't actually have yield yet (the feature you linked isn't stable) so I assume that is just syntax sugar defined by the macro and expands to something else.