(async) zero-copy file write() in Rust

In the previous "episode" I figured out that anything that implements

impl AsyncWrite for File {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        src: &[u8],
    ) -> Poll<io::Result<usize>> {

has to make at least one unnecessary copy (due to the src being a reference).

This was the most useful suggestion (thanks @alice!):

Now you are just reinventing Sink<Item = Bytes>

Indeed, Sink<Bytes> looks good from the point of view of interface / users.

Now, my questions is: what is the best way to implement futures::Sink<Bytes> (for writing files on local fs)?

where "best" means:

  • as few copies as possible (there must be 0 copies in user-land / Rust, and as few copies in Linux kernel as possible)
  • ideally it should be async-friendly, although task::block_in_place should be fine (?)

Right now I think I can wrap regular, non-async std::fs::file operations in task::block_in_place and implement the futures::Sink<bytes::Bytes> interface.

Is there anything better?

It depends what latency guarantees do you want to give. You could use std::fs::write even without block_in_place if you assume the disk is fast enough (and/or buffered in the kernel).

If you don't want to disrupt the async runtime, then tokio::fs will write from a thread pool.

To be more efficient than that you'd need to use async I/O kernel APIs (which in case of io_uring requires special care about Future cancellation) .


However, whether the interface is 0-copy depends on your perspective. src: &[u8] needs a copy only if you decide to keep the data in a buffer. Instead if your poll_write called POSIX write immediately, then you could pass this slice of bytes to the kernel without copying, and the underlying file system could in theory DMA it straight to the disk.

Disk might be fast (around 1GiB/s), but I typically write in ~2GiB files (so, 2 seconds, might be quite disruptive for tokio runtime/scheduler).

However, whether the interface is 0-copy depends on your perspective. src: &[u8] needs a copy only if you decide to keep the data in a buffer. Instead if your poll_write called POSIX write immediately, then you could pass this slice of bytes to the kernel without copying, and the underlying file system could in theory DMA it straight to the disk.

Yep, agree.

But it is is probably more accurate to say that if I implement the AsyncWrite interface AND the implementation writes to disk, then I can do what you described above.

The problem is that I will need to have an implementation of AsyncWrite / Sink<Bytes> for other targets (such as S3), and S3 crate provides only async functions (which is good), but it also means I cannot use the trick you described above.

Which s3 crate are you using? If the API is async fn put(whole_object: &[u8]) then it will be incompatible with both unbuffered AsyncWrite and Bytes, because Bytes can be concatenated without copying only when you use the Buf interface (that exposes fragmented chunks), not slices.

But if you make an S3 API that can stream the data, then you should be able to feed &[u8] fragments through AsyncWrite sink given by S3 call, or by providing an AsyncRead buffer wrapper to the S3 API.

I am using the official AWS SDK. In particular, this is what they do to create a multi-part upload:

// bytes:               Option<bytes::Bytes>,
let blob = this.bytes.as_ref().unwrap();
   for (i, start_pos) in (0..num_of_uploads).map(|i| (i, i * chunk_len)) {
      let chunk = if i == num_of_uploads - 1 {
         blob.slice(start_pos..blob_len)
      } else {
         blob.slice(start_pos..start_pos + chunk_len)
      };
      let stream = aws_smithy_types::byte_stream::ByteStream::from(chunk);
      task_handlers.push(tokio::task::spawn(async move {
                            let upload_part_res =
                               client.upload_part()
                                     .body(stream)

As you can see, byte_stream::ByteStream::from() takes Bytes. Bytes come from this.bytes.

And this is where I had to make a copy in order to populate bytes:

   fn poll_write(self: std::pin::Pin<&mut Self>,
                 cx: &mut std::task::Context<'_>,
                 buf: &[u8])
                 -> std::task::Poll<Result<usize, std::io::Error>> {
      ops_state.bytes = Some(bytes::Bytes::copy_from_slice(buf)); // <--- HERE

If poll_write had bytes argument, I would not need to copy anything (well, copy of Bytes is shallow).


But if you make an S3 API that can stream the data, then you should be able to feed &[u8] fragments through AsyncWrite sink given by S3 call, or by providing an AsyncRead buffer wrapper to the S3 API.

I still think there is a conceptual issue: you cannot schedule & execute an async function (everything related to network is async), that works with a reference from the stack (this is exactly what poll_write forces everyone to do). I mean, sooner or later something will have to deal with this issue (whether it is my code or hidden in the code of S3).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.