Read from stream write to file (tokio+await)

Hi all,

I'm playing around with tokio/streams/futures/async/await trying to learn how to work with them all.

Got stuck trying to do what I though would be a simple thing. I have a stream of some vectors and want to write these vectors from stream to a file, something like below:

// [dependencies]
// futures-preview = {version = "0.3.0-alpha.19", features = [ "async-await"] }
// tokio = "0.2.0-alpha.6"
use futures::{future, future::FutureExt, stream, stream::StreamExt};
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() {
    let mut file = tokio::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .open("some_file")
        .await
        .unwrap();

    let mut stream = stream::repeat::<Vec<u8>>(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
    stream
        .for_each(move |v| file.write_all(&v).then(|_| future::ready(())))
        .await;
}

However compiling this results in:

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements...

...and some further description of the error which I don't really understand:

note: first, the lifetime cannot outlive the lifetime '_ as defined on the body ...
note:...so that closure can access `file`
...and more

Maybe something needs pinning, maybe it's due to playing with bleeding edge stuff?
Anyhow! I would be very happy if someone could explain how to solve this or point me in the direction of some good docs :slight_smile:

Thanks in advance!

So what happens is that you move file into the for_each but then write_all borrows file in the future that is returned. This means that the future returned from move |v| ... borrows from that environment of that same closure which rust ends up complaining about.

The first thing we can try is what you seem to be aware of is to pin the file which would let us return the write_all future without lifetime issues

    stream
        .for_each(move |v| async move { file.write_all(&v).then(|_| future::ready(())).await })
        .await;

This fails with another error however since for_each takes a FnMut and therefore we can't move the file out (if we could, it wouldn't exist for the next iteration).

error[E0507]: cannot move out of `file`, a captured variable in an `FnMut` closure
  --> tokio\tests\udp.rs:86:39
   |
77 |     let mut file = tokio::fs::OpenOptions::new()
   |         -------- captured outer variable
...
86 |         .for_each(move |v| async move { file.write_all(&v).then(|_| future::ready(())).await })
   |                                       ^^----^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                                       | |
   |                                       | move occurs because `file` has type `tokio::fs::file::File`, which does not implement the `Copy` trait
   |                                       | move occurs due to use in generator
   |                                       move out of `file` occurs here

warning: variable does not need to be mutable
  --> tokio\tests\udp.rs:84:9
   |
84 |     let mut stream = stream::repeat::<Vec<u8>>(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
   |         ----^^^^^^
   |         |
   |         help: remove this `mut`
   |
   = note: `#[warn(unused_mut)]` on by default

In general, the stream combinators don't work that well with (mutably) borrowing things (or at least I haven't figured out a good way). So what you need to do is drop the for_each and loop manually with while let + next() instead.

#[tokio::test]
async fn main() {
    use futures::{future, future::FutureExt, stream, stream::StreamExt};
    use tokio::io::AsyncWriteExt;
    let mut file = tokio::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .open("some_file")
        .await
        .unwrap();

    let mut stream = stream::repeat::<Vec<u8>>(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
    while let Some(v) = stream.next().await {
        file.write_all(&v).await.unwrap();
    }
}

Unfortunately the signature of for_each has no guarantee regarding whether the closure will be called again before the future from the previous iteration completed, but the signature

fn write_all<'a>(&'a mut self, src: &'a [u8]) -> WriteAll<'a, Self>;

has the lifetime of self in the return value, so you must guarantee that the WriteAll returned by write_all is completed before you call write_all again in the next iteration.

The while let loop in @Marwes' post does provide a guarantee for this, which is why it works.

1 Like

Thanks @Marwes and @alice for the explanations and solution, made it very understandable! :+1: