Reading a file as a Stream for Tokio


#1

I am trying to integrate a Linux device file into Tokio event loop. (Yeah, I know that Tokio does not support files out of the box.) The device file is like /proc/kmsg, it is effectively an infinite file from which messages can be read. The file supports O_NONBLOCK mode so it should be possible to do asynchronous IO with read() and epoll().

I have implemented a wrapper over RawFd using PollEvented as described in this post, so now I am able to use tokio_io::io::read() to read from my device file. However, this function returns a future that performs exactly one read. I would like to repeatedly read the file. What is the recommended way of doing it?

I have managed to loop the read using futures::future::loop_fn but this looks kinda ugly. I have also thought about enqueuing another Read future when previous completes, but I am currently struggling with expressing that. I have also tried writing a futures::stream::Stream implementation similar to tokio_io::io::Read, but I’m having troubles with lifetime of the buffer used for reading.


#2

Okay… I have reread the linked thread and discovered Io::Framed trait which should allow getting a message stream.

One thing that bothers me is that it requires Write implementation for the IO source, but the device file driver does not support writing to it. Well, it should never be called, so returning an error should be okay.

However, what I have issues with is the buffering inside Io::Framed. It starts reading with a 32-byte buffer which should be okay for regular files, but the device file driver can have a peculiar interface. For example, /dev/kmsg requires the buffer passed to read() system call to be long enough for a single message to fit into it. 32 bytes is not long enough and the driver return a EINVAL error. Is there a way to control the buffering strategy for Io::Framed then?


#3

Is this because you’re trying to reuse the buffer across calls? Maybe you can elaborate a bit on what you tried here.

Also, do you want the stream to return raw bytes or a message? Your last post mentioned turning it into a message so just curious where you want to go with this.


#4

It appears that kmsg actually does satisfy O_NONBLOCK properly: http://elixir.free-electrons.com/linux/latest/source/fs/proc/kmsg.c#L36, I was concerned it may just sleep the thread.

tokio-file-unix implements mio’s Evented trait as well as std::io’s Read. I think it’d be possible to add the AsyncRead/AsyncWrite markers to it, but I think that crate provides what you’re asking for?


#5

Yeah, I initially tried to do something similar to what io::read() does: take an external buffer (T: AsMut<[u8]> + AsRef<[u8]>), always read actual data into that buffer, but provide a stream of slices into the buffer with ready data. However, I could not convince the borrow checker to believe that the slices will be alive as long as the original buffer, which is longer than the stream itself. I guess io::Read does not have that issue because it relinquishes the ownership over the whole buffer to the returned future, without any references involved.

In the end I gave up and implemented a stream of Vec<u8>, copying the slices from the internal buffer owned by the stream. I will post the actual code tomorrow when I get to the office, along with more comments on where I tried putting the references.

Well, ideally some messages (i.e., chunked and parsed raw bytes). I guess for that it is not idiomatic to first get a stream of some raw bytes, then convert it into the stream of parsed messages (at least I could not find a suitable combinator for that). The stream implementation should immediately perform any necessary framing and parsing of raw bytes. Is it so?


#6

As I recall, on Linux O_NONBLOCK does nothing for regular on-disk files, the reads and writes will still sleep. The flag is only a hint as per POSIX specs, so it’s up to the filesystem driver whether to implement the nonblocking semantics or not. But usually various in-memory filesystems (e.g., procfs) do it right.

Yeah, it seems to do exactly what I need. This certainly saves me from writing my own wrapper over RawFd via libc. Thanks for the pointer.


#7

Yeah, references and future/stream don’t go together all that well.

Perhaps you can look at the bytes crate and return Buf instances. Those are basically ref counted views over some slice of memory. I’ve not thought this through all the way but it might be doable and better than allocating a Vec each time.

So https://tokio-rs.github.io/tokio-core/src/tokio_core/net/udp/frame.rs.html#71 sounds similar. This one reads data into a private buffer and then calls the codec to decode the value from that buffer; the decoded value is then returned. So assuming the decoded value doesn’t need to hold any references to the buffer, it would be more efficient.


#8

Thanks for the pointer to the bytes crate. I’ve ended up using BytesMut for the buffer and Buf (via Cursor) for message parsing. I guess this is as close to ‘zero-copy’ as we can get with futures (well, we really do not copy data with bytes if they are just refcounted slices).

The reader looks like this:

/// Message parser state machine.
pub struct MessageStream<R> {
    /// Interal `AsyncRead`.
    reader: R,
    /// Buffer for data to be read and chunked.
    buffer: BytesMut,
    max_len: usize,
    state: State,
}

enum State {
    /// The buffer is empty, we need to read something into it first.
    NotBuffered,
    /// The buffer is filled, we need to extract messages from it.
    Buffered,
    /// The stream has ended.
    Done,
}

pub fn read_messages<R>(reader: R, max_len: usize) -> MessageStream<R>
    where R: AsyncRead
{
    MessageStream {
        reader: reader,
        buffer: BytesMut::with_capacity(max_len),
        max_len: max_len,
        state: State::NotBuffered,
    }
}

impl<R> Stream for MessageStream<R>
    where R: AsyncRead
{
    type Item = Message;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Message>, io::Error> {
        loop {
            match self.state {
                State::Done => {
                    return Ok(Async::Ready(None));
                }
                State::NotBuffered => {
                    // BytesMut gradually exhausts its reserved space when it is filled by
                    // reader.read_buf(). Reclaim free space before reading so that the kernel
                    // won't throw EINVAL at us for using a too small buffer.
                    self.buffer.clear();
                    self.buffer.reserve(self.max_len);

                    let mut retries_left = 5;
                    let nread = loop {
                        match self.reader.read_buf(&mut self.buffer) {
                            Ok(Async::Ready(nread)) => { break nread; }
                            Ok(Async::NotReady) => { return Ok(Async::NotReady); }
                            Err(e) => {
                                // We can get EPIPE from the kernel if we do not read fast enough
                                // and some messages were purged while we were gone. The kernel
                                // resynchronizes our stream after returning an error so we may try
                                // reading again. However, if the error persists then don't bother.
                                if (e.kind() == io::ErrorKind::BrokenPipe) && (retries_left > 0) {
                                    retries_left -= 1;
                                    continue;
                                }
                                return Err(e);
                            }
                        }
                    };

                    if nread == 0 {
                        self.state = State::Done;
                    } else {
                        self.state = State::Buffered;
                    }
                }
                State::Buffered => {
                    match self.next_message()? {
                        Some(message) => {
                            return Ok(Async::Ready(Some(message)));
                        }
                        None => {
                            self.state = State::NotBuffered;
                        }
                    }
                }
            }
        }
    }
}

impl<R> MessageStream<R> {
    /// Here we parse out the messages out of the buffer, gradually exhausting it.
    /// Return Ok(Some(message)) for every message parsed out, then Ok(None)
    /// when there are no more messages. Return Err(something) if the message
    /// appears to be garbled and buffer is now in inconsistent state.
    fn next_message(&mut self) -> Result<Option<Message>, io::Error> {
        if self.buffer.is_empty() {
            return Ok(None);
        }

        unimplemented!()
    }
}

#9

Also worth checking out futures-fs - based off your last comment, it looks like you’re going that route manually.


#10

I would recommend tokio_io::codec::Framed, this handles the reading and buffering data from an AsyncRead source for you, you then provide it with a tokio_io::codec::Decoder implementation that parses a buffer of data to a message. At a quick look through your code I think Framed is basically a generic version of your MessageStream (it also uses a BytesMut internally).


#11

@ilammy mentioned the following up thread:

I didn’t check that bit myself though.


#12

Ah, whoops, I skimmed the thread to see if it had been mentioned but missed that.

Looking into the source it appears that FramedRead2 starts with an 8kB buffer so I’m not sure why it would only read 32-bytes initially. It is possible to change this by constructing a FramedParts with pre-allocated buffers, then using Framed::from_parts.

I also noticed just noticed that there’s a tokio_io::codec::FramedRead for a read-only stream like this (although, it seems like you can create a read-only Framed instance via Framed::from_parts currently).