Wrap 2 streams in another stream, or create a stream from async function

Let's say I am trying to implement tail -F in async Rust:

  • Each 50ms, I need to check if the file was rotated (by comparing inode), and in the case it was rotated I need to re-open it.
  • Otherwise, I need to read existing file, line by line.

Every individual item from the list above is trivial. There is stream in tokio that corresponds to File (tokio_util::io::ReaderStream and tokio::io::BufReader), and there is stream in tokio that corresponds to timer events (tokio::time::interval(Duration::from_millis(200))).

It would be trivial, if I needed an async function that returns a single line (I would just use select!).

But I need a stream of lines (String's).

What is the correct way to implement this?

Is there a way to convert an async function into a Stream?

Do I need to create my own stream, implementing fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)?

If yes, then how I ensure timer it scheduled? Code like this will not work:

   fn poll_next(self: core::pin::Pin<&mut Self>,
                cx: &mut std::task::Context<'_>)
                -> std::task::Poll<Option<Self::Item>> {
      match self.get_mut().wake_up.poll_next_unpin(cx) {
         std::task::Poll::Ready(t) => {
            println!("Timer fired!");
            // return std::task::Poll::Ready(Some(Ok(Default::default())));
            return std::task::Poll::Pending;
         }
         std::task::Poll::Pending => return std::task::Poll::Pending,
      }
      std::task::Poll::Pending
   }

because when timer is ready, I return Pending (because there is no line that I could possibly return). Timer does not schedule itself "proactively" in case it is ready. So, it essentially gets stuck forever.

But if I return Ready, allowing caller's while() to call poll_next, it fixes the issue with timer, but then what should I return? Empty string? But there were not file read? This looks strange...

Is there a uniform / generic way to convert async function into Stream?..

Can you make your code work by using the stream! macro from the async-stream crate? I.e. yield a line when it comes available?

1 Like

Yeah... This looks like what I need... Thanks!

P.S. Did not know that rust has yield

In case anyone stumbles on this too.

Here is the code that works as intended:

pub fn tail(p: std::path::PathBuf,
            pill: crate::poison_pill::Pill)
            -> impl futures::Stream<Item = Result<String>> {
   let mut interval = tokio::time::interval(std::time::Duration::from_millis(200));
   async_stream::stream! {
      loop {
         tokio::select! {
            _ = pill.received() => {
               log::info!("tail: Exiting...");
               return;
            }
            _ = interval.tick() => {
               println!("Timer fired!");
               yield Ok("1234".into());
            }
         }
      }
   }
}

#[cfg(test)]
mod tests {
   use futures::StreamExt;
   use super::*;

   #[tokio::test]
   async fn my_async_test() {
      let mut cs = crate::poison_pill::ChildrenStopper::new();
      let stream = tail(std::path::PathBuf::new(), cs.register_child("tail"));
      futures::pin_mut!(stream);

      while let Some(item) = stream.next().await {
         println!("{:?}", item);
      }
   }
}
1 Like

Rust doesn't actually have yield yet (the feature you linked isn't stable) so I assume that is just syntax sugar defined by the macro and expands to something else.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.