Impl Future for struct with reference to TCPStream

Hi,

I am learning rust and I am experimenting with writing a Future impl for a struct which has a reference to a stream. I am getting the following error, was wondering what I need to do here to work through this? I'd rather just borrow the stream than own it, and I haven't tried owning it yet, seems like from the message that is irrelevant anyways.

I implemented a simple handshake exchange with the server already in a more procedural way with async / await and some loops to read the response when its ready. This did work fine. But I got curious about making the whole startup process a single future so that's why I started experimenting with it this way.

Many thanks!

error[E0277]: the trait bound `&'a tokio::net::TcpStream: tokio::io::AsyncWrite` is not satisfied
  --> src/lib.rs:74:34
   |
74 |         ClientMessage::HandShake.write(Pin::new(&mut l.stream), cx);
   |                                  ^^^^^ the trait `tokio::io::AsyncWrite` is not implemented for `&'a tokio::net::TcpStream`
   |
   = help: the following implementations were found:
             <tokio::net::TcpStream as tokio::io::AsyncWrite>
   = note: `tokio::io::AsyncWrite` is implemented for `&'a mut tokio::net::TcpStream`, but not for `&'a tokio::net::TcpStream`
   = note: required because of the requirements on the impl of `tokio::io::AsyncWriteExt` for `&'a tokio::net::TcpStream`
note: required by a bound in `client::ClientMessage::write`
  --> src/connection/client.rs:25:12
   |
23 |     pub fn write<T>(&self, stream: Pin<&mut T>, cx: &mut std::task::Context<'_>) -> Result<()>
   |            ----- required by a bound in this
24 |     where
25 |         T: AsyncWriteExt + std::marker::Unpin,
   |            ^^^^^^^^^^^^^ required by this bound in `client::ClientMessage::write`


struct StartAPI<'a> {
    stream: &'a TcpStream,
    started: bool,
    client_id: u32,
    next_valid_id: Option<u32>,
}

impl<'a> StartAPI<'a> {
    fn new(stream: &'a mut TcpStream, client_id: u32) -> Self {
        StartAPI {
            stream,
            started: false,
            client_id,
            next_valid_id: None,
        }
    }
}

impl<'a> Future for StartAPI<'a> {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut l = self.as_mut();
        let mut stream = l.stream;
        ClientMessage::HandShake.write(Pin::new(&mut stream), cx);
        std::task::Poll::Pending
    }
}

#[derive(Debug)]
pub enum ClientMessage {
    HandShake,
    StartAPI(u32),
}

impl ClientMessage {
    pub fn write<T>(&self, stream: Pin<&mut T>, cx: &mut std::task::Context<'_>) -> Result<()>
    where
        T: AsyncWriteExt + std::marker::Unpin,
    {
        let mut buf = BytesMut::new();
        match self {
            Self::HandShake => write_handshake(&mut buf),
            Self::StartAPI(client_id) => write_to_ib(&StartAPI::new(*client_id), &mut buf),
        };
        let n = stream.poll_write(cx, &buf);
        Ok(())
    }
}

The issue has a pretty simple fix: use &'a mut TcpStream for stream in StartAPI. This makes it a unique reference to the stream, which can be used just like the original, since &mut T: AsyncWrite whenever T: AsyncWrite.

 struct StartAPI<'a> {
-    stream: &'a TcpStream,
+    stream: &'a mut TcpStream,
     started: bool,
     client_id: u32,
     next_valid_id: Option<u32>,
 }
1 Like

Thanks! I did not realize that you need to flag mutability on struct members, I am used to them being mutable when self is mut and not mutable when self is not mut. That said I haven't written a lot of structs with reference members, so I guess you have to be explicit when borrowing. This makes sense as it would prevent any other borrows of this entity while the struct is alive. Am I understanding that correctly?

Thanks again!

Yes, that is correct. Regarding mut on struct members, keep in mind the difference between mut bindings and &mut references. Ultimately, there are four cases:

let a: &i32 = &42;             // Immutable binding to a shared reference
let b: &mut i32 = &mut 42;     // Immutable binding to a unique reference
let mut c: &i32 = &42;         // Mutable binding to a shared reference
let mut d: &mut i32 = &mut 42; // Mutable binding to a unique reference

We can modify c and d since they are mutable bindings, and we can modify *b and *d since they are unique references. In your case, if we want to change self.started, then we need a mut self binding. But if a method requires &mut *self.stream, then we need self.stream to hold a unique (&mut) reference to a TcpStream, since a unique reference cannot be obtained from a shared reference (except via interior mutability).

2 Likes

Nice, glad I asked. Was not tracking the mut binding vs &mut reference thing...

Thank you for the explanation!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.