Futures-rs: how to generate a stream from futures?

Hello.

Got another question about futures-rs.

I have a function which reads a frame, e. g.

fn read_frame<R : Read>(read: R) -> IoFuture<(R, Frame>) { ... }

I'd like to create a stream from futures. Like this:

fn read_frame_stream<R : Read>(read: R) -> IoStream<Frame> {
    loop {
        await (read, frame) = read_frame(read);
        yield frame;
    }
}

Is is possible/hard/how to implement read_frame_stream with futures-rs?

Currently, I do it like this:

let stream = stream::iter(repeat(()).map(|x| Ok(x)));

let future = stream.fold(read, |read, _| {
    recv_raw_frame(read) ...
};

I. e. without intermediate stream. Stream of frames would be convenient intermediate abstraction.

cc @alexcrichton

1 Like

What you'll probably want to do here is a manual implementation of either future or Stream. For example something like:

struct Frames<R> {
    inner: R
}

impl<R: Read> Stream for Frames<R> {
    type Item = MyFrame;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<MyFrame>, io::Error> {
        // read and parse the frame from `self.inner`, 
        // propagating "would block" as Poll::NotReady
    }
}

That way you could then start applying the adaptors to that stream.

We intended to have a pretty generic framework for doing this sort of operations, however, so stay tuned for more info soon!

@alexcrichton - I'm curious to see how the Read, Write trait analogues in futures/stream are going to look. Or perhaps, there's something that exist already?

Yeah in the tokio-io crate you'll find AsyncRead and AsyncWrite