Dereferencing issues when implementing Future trait for enum

I'm trying to create an enum on which you can await, which then opens a socket and authenticates, returning the TcpStream for further use. The problem is that the data inside the enum variants prevents the *self dereference. I'm guessing this is due to the TcpStream, but I have no idea how to fix this. Some guidance and options would be greatly appreciated :slight_smile: .

enum SocketState{
    Off(String, String),
    Connecting(String, String),
    WriteHello(TcpStream, Vec<u8>),
    ReadHelloResponse(TcpStream),
    Ready(Result<TcpStream,anyhow::Error>),
}

impl Future for SocketState{
    type Output = Result<TcpStream,anyhow::Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

        use SocketState::*;
        loop{
            match *self{
                Off(imei_path, addr_string) => {
                    match read_to_string(imei_path){
                        Ok(imei_str) =>{
                            *self = Connecting(imei_str, addr_string);
                            return Poll::Pending;
                        }
                        Err(e) => {
                            *self = Ready(Err(Error::new(e)));
                            return Poll::Pending;
                        },
                    };
                },
                Connecting(imei, addr) => {
                    let connection_future = TcpStream::connect(addr);
                    tokio::pin!(connection_future);
                    match connection_future.poll(cx){
                        Poll::Ready(Ok(stream)) => {
                            let mut imei_hello:Vec<u8> = vec![0x00, 0x0F];
                            for imei_char in imei.chars(){
                                imei_hello.push(imei_char as u8);
                            }
                            *self = WriteHello(stream, imei_hello);
                            return Poll::Pending;
                        },
                        Poll::Ready(Err(e)) => {
                            *self = Ready(Err(Error::new(e)));
                            return Poll::Pending;
                        },
                        Poll::Pending => return Poll::Pending,
                    }
                },
                WriteHello(mut stream, hello_message) => {
                    let hello_write_future = stream.write(&hello_message);
                    tokio::pin!(hello_write_future);
                    match hello_write_future.poll(cx){
                        Poll::Ready(Ok(_)) => {
                            *self = ReadHelloResponse(stream);
                            return Poll::Pending;
                        }
                        Poll::Ready(Err(e)) => {
                            *self = Ready(Err(Error::new(e)));
                            return Poll::Pending;
                        }
                        Poll::Pending => return Poll::Pending,
                    }
                },
                ReadHelloResponse(mut stream ) => {
                    let mut server_response = Vec::<u8>::with_capacity(1);
                    let response_read_future = stream.read_buf(&mut server_response);
                    tokio::pin!(response_read_future);
                    match response_read_future.poll(cx){
                        Poll::Ready(Ok(_)) => {
                            match server_response[0]{
                                0x01 => {
                                    *self = Ready(Ok(stream));
                                    return Poll::Pending;
                                },
                                unexpected_code => {
                                    *self = Ready(Err(Error::msg(format!("Unexpected server response {unexpected_code}"))));
                                }
                            }
                        },
                        Poll::Ready(Err(e)) => {
                            *self = Ready(Err(Error::new(e)));
                            return Poll::Pending;
                        },
                        Poll::Pending => return Poll::Pending,
                    }
                }
                Ready(connection_result) => {
                    return Poll::Ready(connection_result);
                },
            }
        }
    }
}
error[E0507]: cannot move out of dereference of `Pin<&mut SocketState>`
   --> src/main.rs:42:19
    |
42  |             match *self{
    |                   ^^^^^
43  |                 Off(imei_path, addr_string) => {
    |                     ---------  ----------- ...and here
    |                     |
    |                     data moved here
...
55  |                 Connecting(imei, addr) => {
    |                            ----  ---- ...and here
    |                            |
    |                            ...and here
...
74  |                 WriteHello(mut stream, hello_message) => {
    |                            ----------  ------------- ...and here
    |                            |
    |                            ...and here
...
89  |                 ReadHelloResponse(mut stream ) => {
    |                                   ---------- ...and here
...
112 |                 Ready(connection_result) => {
    |                       ----------------- ...and here
    |
    = note: move occurs because these variables have types that don't implement the `Copy` trait
help: consider removing the dereference here
    |
42  -             match *self{
42  +             match self{
    |

Add a dummy state called Panicked and do this:

match mem::replace(self, SocketState::Panicked) { 
    ...
}

this way you take ownership of the current state and can do what you want with the contents. If the code panics, then the state is left in Panicked for the next poll.

2 Likes

That said there's a much bigger issue. This is never gonna work:

When you return Poll::Pending, the destructor of connection_future runs. That destructor is going to deregister the waker. The correct way to do it would be to store the connection future inside of SocketState, but that's extremely hard to implement when you're using an enum like that and probably impossible without copious amounts of unsafe.

1 Like

Another issue is that you return Poll::Pending in situations where you could make further progress; you’d have to either continue execution right away, or at least make sure – e.g. via cx.waker.wake_by_ref() – that the executor knows that further progress is possible right away.


I feel like the main obstacle would be that the futures here have anonymous types. Which means boxing would be necessary as a workaround; with Pin<Box<… Future>> being used, I don’t see how any unsafe could come into play though.

To be fair, the other two instances involve references, e.g. stream.write(&hello_message) borrows a local variable. However, this can be worked around by re-wrapping the call with another async move {} future capturing only owned arguments. Naturally, this still leaves the question of β€œwhy not use async for the whole thing instead of the state-machine enum”.

@kriksis this would look roughly as follows:

async fn open_and_authenticate(imei_path: String, addr: String) -> anyhow::Result<TcpStream> {
    // Off(imei_path, addr)
    let imei = read_to_string(imei_path)?; // TODO: should this fs-operation switch to async?
    // Connecting(imei, addr)
    let mut stream = TcpStream::connect(addr).await?;
    let mut hello_message: Vec<u8> = vec![0x00, 0x0F];
    for imei_char in imei.chars() {
        hello_message.push(imei_char as u8);
    }
    // WriteHello(stream, hello_message)
    stream.write(&hello_message).await?;
    // ReadHelloResponse(stream)
    let mut server_response = Vec::<u8>::with_capacity(1);
    stream.read_buf(&mut server_response).await?;
    match server_response[0] {
        0x01 => Ok(stream), // <- Ready(stream)
        unexpected_code => anyhow::bail!("Unexpected server response {unexpected_code}"),
    }
}

I have the whole thing running on a

loop{
  tokio::select!{}
}

My intention was to use the enum inside a "read from server" or "send to server" future in the select. I could then await on the enum, which would open the socket and do the read. If I use a reference to the enum then, in theory, the await on enum would return immediately if a socket is open and I could await on socket read or write inside the "read from socket" or "send to server" async functions.

That's how I had it planned out in my head, but it is possible that this is absolutely not how tokio works or is intended to be used :sweat_smile:.