Implementing AsyncWrite, struggling with Lifetimes and Pin

Hello, I'm fairly new to Rust and even moreso new to Async Rust. I'm currently battling with Future lifetimes and Pinning whilst trying to implement AsyncWrite.

Here is an example of what I would like to achieve:

use std::{io, pin::Pin, task::{Context, Poll}};
use futures::{AsyncWrite, Future, AsyncWriteExt, FutureExt, ready};

struct Writer<'a> {
    conn: &'a mut Connection,
    fut: Option<Pin<Box<dyn Future<Output = io::Result<usize>>>>>
}

impl<'a> AsyncWrite for Writer<'a> {
    fn poll_write(
            self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();
        let fut = match this.fut.as_mut() {
            Some(fut) => fut,
            None => {
                this.fut.get_or_insert(Box::pin(this.conn.write_some(buf)))
            }
        };
        let result = ready!(fut.poll_unpin(cx));
        this.fut = None;
        Poll::Ready(result)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        todo!()
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        todo!()
    }
}

struct Connection {
    buf: Vec<u8>
}

impl Connection {
    pub fn new() -> Self {
        Self {
            buf: Vec::new()
        }
    }

    pub fn writer(&mut self) -> Writer {
        Writer { conn: self, fut: None }
    }

    pub async fn write_some(&mut self, buf: &[u8]) -> io::Result<usize> {
        let result = self.do_something_else(buf).await?;
        self.buf.extend(&buf[result..]);
        Ok(buf.len())
    }

    pub async fn do_something(&mut self) -> io::Result<usize> {
        // Do something async with self.buf
        todo!()
    }

    async fn do_something_else(&mut self, buf: &[u8]) -> io::Result<usize> {
        todo!()
    }
}

#[tokio::main]
async fn main() {
    let mut conn = Connection::new();
    let _ = conn.writer().write(b"hello world").await.unwrap();
    let _ = conn.do_something().await.unwrap();
}

I have a struct Connection which I would like to implement AsyncWrite on either directly, or as in the example above with a Writer struct. Connection will accept a byte buffer via write_some and immediately start performing some other async task with it, and then perhaps store some of it in an internal buffer which will be operated on later via do_something.

As I understand it, the way to do this would be to create/store the future returned by write_some in poll_write, and return the Poll from it. When attempting to do this I immediately run into lifetime issues, where the compiler complains like this:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'a` due to conflicting requirements
  --> src/main.rs:15:25
   |
15 |         let this = self.get_mut();
   |                         ^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime `'a` as defined here...
  --> src/main.rs:9:6
   |
9  | impl<'a> AsyncWrite for Writer<'a> {
   |      ^^
note: ...so that the reference type `&mut Writer<'a>` does not outlive the data it points at
  --> src/main.rs:15:25
   |
15 |         let this = self.get_mut();
   |                         ^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
  --> src/main.rs:19:40
   |
19 |                 this.fut.get_or_insert(Box::pin(this.conn.write_some(buf)))
   |                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   = note: expected `Pin<Box<(dyn futures::Future<Output = Result<usize, std::io::Error>> + 'static)>>`
              found `Pin<Box<dyn futures::Future<Output = Result<usize, std::io::Error>>>>`

I'm not sure how to resolve the compiler telling me that the future requires a 'static lifetime, or whether I'm generally approaching this correctly. I am fine with giving up the indirection via Writer if that is required, but I didn't have any more luck implementing AsyncWrite on Connection directly. Any direction would be greatly appreciated.

In general, the easiest way to do stuff like this is to move conn inside the future. For example, consider this implementation:

1 Like

Thank you for the reply. Interesting, I hadn't thought of that. Though this renders the rest of the Connection interface inaccessible, correct?

How about the case where Connection needs to implement both AsyncWrite and AsyncRead? I simplified my example, perhaps short-sightedly, assuming that the correct pattern would be generalizable to implementing both traits. But with this pattern it seems to assume that the inner object only needs to provide one interface.

How could I got about implementing these traits by reference, where Connection can be concurrently written to and read from? Or perhaps there is a way to do this where the wrapper owns Connection as in your example.

Well, both futures need mutable access to it, but mutable access always implies exclusive access. You can't use futures if you want concurrent reads and writes.

Ah, right, makes sense. In the case where concurrent read/write is not a requirement is implementing both traits by reference possible? Perhaps using an Arc<Mutex<Connection>>?

I will further attempt this over the weekend.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.