Adapter for transforming io::Write into Stream?

Has anyone developed magic required to have an unbuffered (or very-tiny-buffered) futures::Stream that can be written to using io::Write?

I have a complex function that I can't change, which writes its output to io::Write, and I'd like to send this output, with as little buffering as possible, to a framework that expects a futures::Stream.

io::Write is blocking push-based, and the stream is async poll-based, so I presume the adapter would be non-trivial and would have to do something clever. Has anyone worked it out yet?

You can build this pretty easily with a channel:

use futures::sync::mpsc::Sender;
use futures::sink::{Sink, Wait};

struct FuturesWriter {
    sink: Wait<Sender<Vec<u8>>,
}

impl Write for FuturesWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.sink.send(buf.to_vec()).map_err(...)?;
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

The Stream is then just the Receiver half of the mpsc channel.

1 Like

That's very neat! Thanks.

This turns out to be tricky, because Stream.poll() can't block on recv(), and can't ask try_recv() for a callback when it becomes ready, so there's nothing to call task.notify().

OTOH if I use futures::mpsc, then polling is easy, but pushing is hard, because sending is a Future, so it needs a reactor for polling.

io::Write is blocking

Then you have to use tokio_threadpool::blocking - Rust , don't you?

Yes, I run it on another thread already. But I still need to deliver the results in a streaming fashion, so waiting for the end of the blocking task is not an option.

Here's the solution:

https://stackoverflow.com/a/55764246/27009

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.