Turning a file into futures Stream

I want to turn a file (tokio::fs::File or std::fs::File) into a futures::stream::Steam of bytes, so that i can construct this ByteStream from it, so that rusoto can upload while reading from file. Is this possible at all?

You can use tokio::codec::FramedRead for this. playground

use bytes::BytesMut;
use futures::{Future, Stream};
use tokio::codec::{BytesCodec, FramedRead};
use tokio::fs::File;
use tokio::runtime::Runtime;

fn main() {
    let mut rt = Runtime::new().unwrap();
    let task = File::open("Cargo.toml").and_then(|file| {
        FramedRead::new(file, BytesCodec::new())
            .map(BytesMut::freeze) // Map stream of `BytesMut` to stream of `Bytes`
            .concat2()
            .and_then(|bytes| {
                assert_eq!(bytes, &include_bytes!("../Cargo.toml")[..]);
                Ok(())
            })
    });

    if let Err(e) = rt.block_on(task) {
        dbg!(e);
    }
}
3 Likes

Thanks!

This doesn't seem to be true anymore, since tokio::codec seems to be deprecated? I wonder how it's done right now.

1 Like

The codec module has moved to the tokio-util crate.

1 Like

Thanks, it still took me some time figuring out. Searching was no help because the futures API keeps changing.

For posterity here is a full example that shows how to stream a file using Hyper and future streams:

( Example also as a gist )

use std::convert::Infallible;
use std::net::SocketAddr;
use tokio::fs::File;
use futures_util::TryStreamExt;
use futures_util::TryFutureExt;
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Method, StatusCode};
use bytes::BytesMut;
use tokio_util::codec::{BytesCodec, FramedRead};

async fn handle_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
    match (req.method(), req.uri().path()) {
        
        // Stream a file from a disk
        (&Method::GET, "/file") => {
            let stream = File::open("C:\\Source\\Backup_Ignore.txt")
                .map_ok(|file| FramedRead::new(file, BytesCodec::new()).map_ok(BytesMut::freeze))
                .try_flatten_stream();
            let s = Body::wrap_stream(stream);
            let response = Response::new(s);
            return Ok(response);
        },

        // 404 not found
        _ => {
            let mut response = Response::new(Body::empty());
            *response.status_mut() = StatusCode::NOT_FOUND;
            return Ok(response);
        },
    };
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(handle_request))
    });
    let server = Server::bind(&addr).serve(make_svc);
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

And my dependencies are:

[dependencies]
hyper = "0.13"
tokio = { version = "0.2", features = ["full"] }
futures-util = "0.3.1"
tokio-util = "0.2.0"
bytes = "0.5.3"
futures = "0.3.1"
3 Likes

There are some efforts to link the documentation better together here.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.