Cannot move out of dereference of `Pin<&mut Server>`

I am receiving an error on self.receiver line it says: move occurs because value has type Option<tokio::sync::mpsc::Receiver<Vec<u8>>>, which does not implement the Copy trait`

I am not able to understand why self.tcp is ok but the self.receiver is not ok, also how fix this issue?

#[derive(Debug)]
enum Event {
    Connection((TcpStream, SocketAddr)),
    Message(Vec<u8>),
}

struct Server {
    tcp: TcpListener,
    receiver: Option<Receiver<Vec<u8>>>,
    sender: Option<Sender<Vec<u8>>>,
}

impl Server {
    async fn run(sender: Sender<Vec<u8>>, receiver: Receiver<Vec<u8>>) -> Self {
        Self {
            tcp: TcpListener::bind("127.0.0.1:9090").await.unwrap(),
            sender: Some(sender),
            receiver: Some(receiver),
        }
    }
}

impl Stream for Server {
    type Item = Vec<u8>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let message_stream = stream::poll_fn(|cx| {
            self.receiver
                .unwrap()
                .poll_recv(cx)
                .map(|message| Some(Event::Message(message.unwrap())))
        });

        let connection_stream = stream::poll_fn(|cx| {
            self.tcp
                .poll_accept(cx)
                .map(|result| Some(Event::Connection(result.unwrap())))
        });

        //Future::poll(connection_stream, cx)
        match stream::select(connection_stream, message_stream)
            .select_next_some()
            .poll_unpin(cx)
        {
            Poll::Ready(event) => match event {
                Event::Connection((tcp, address)) => {
                    Poll::Ready(Some(Vec::from("Hello as Connection")))
                }
                Event::Message(msg) => Poll::Ready(Some(Vec::from("Hello as Message"))),
            },
            Poll::Pending => Poll::Pending,
        }
    }
}

tcp is not behind an Option, but receiver is. Therefore, when you write self.tcp.poll_accept(), automatic borrowing can kick in and take a mutable reference to self.tcp. However, Option::unwrap takes the optional by value, which is not possible for something that is behind the reference.

You probably want self.receiver.as_mut().unwrap() in order to convert the by-value optional to a by-mutable-ref optional.

I made some changes by removing the option from receiver ... now I got another error :

Can you help in fixing it and explaining?

error[E0596]: cannot borrow `self` as mutable, as it is not declared as mutable
  --> src\main.rs:48:13
   |
40 |     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
   |                  ---- help: consider changing this to be mutable: `mut self`
...
48 |             self.receiver
   |             ^^^^ cannot borrow as mutable

Another error :

error[E0500]: closure requires unique access to `self` but it is already borrowed
  --> src\main.rs:47:46
   |
41 |         let connection_stream = stream::poll_fn(|cx| {
   |                                                 ---- borrow occurs here
42 |             self.tcp
   |             ---- first borrow occurs due to use of `self` in closure
...
47 |         let message_stream = stream::poll_fn(|cx| {
   |                                              ^^^^ closure construction occurs here
48 |             self.receiver
   |             ---- second borrow occurs due to use of `self` in closure
...
54 |         match stream::select(connection_stream, message_stream)
   |                              ----------------- first borrow later used her

My Code :

#[derive(Debug)]
enum Event {
    Connection((TcpStream, SocketAddr)),
    Message(Vec<u8>),
}

struct Server {
    tcp: TcpListener,
    receiver: Receiver<Vec<u8>>,
    sender: Sender<Vec<u8>>,
}

impl Server {
    async fn run(sender: Sender<Vec<u8>>, receiver: Receiver<Vec<u8>>) -> Self {
        Self {
            tcp: TcpListener::bind("127.0.0.1:9090").await.unwrap(),
            sender: sender,
            receiver: receiver,
        }
    }
}

impl Stream for Server {
    type Item = Vec<u8>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let connection_stream = stream::poll_fn(|cx| {
            self.tcp
                .poll_accept(cx)
                .map(|result| Some(Event::Connection(result.unwrap())))
        });

        let message_stream = stream::poll_fn(|cx| {
            self.receiver
                .poll_recv(cx)
                .map(|message| Some(Event::Message(message.unwrap())))
        });

        //Future::poll(connection_stream, cx)
        match stream::select(connection_stream, message_stream)
            .select_next_some()
            .poll_unpin(cx)
        {
            Poll::Ready(event) => match event {
                Event::Connection((tcp, address)) => {
                    Poll::Ready(Some(Vec::from("Hello as Connection")))
                }
                Event::Message(msg) => Poll::Ready(Some(Vec::from("Hello as Message"))),
            },
            Poll::Pending => Poll::Pending,
        }
    }
}

Just do what the compiler suggests in the very same error message:

1 Like

Thanks, it solves the issue of receiver ... I don't know how I missed that help from the compiler (I am still learning Rust =) )

but again another issue: I am moving context into several closures and the compiler is complaining about it :

error[E0502]: cannot borrow `self` as mutable because it is also borrowed as immutable
  --> src\main.rs:50:46
   |
44 |         let connection_stream = stream::poll_fn(|cx| {
   |                                                 ---- immutable borrow occurs here
45 |             self.tcp
   |             ---- first borrow occurs due to use of `self` in closure
...
50 |         let message_stream = stream::poll_fn(|cx| {
   |                                              ^^^^ mutable borrow occurs here
51 |             self.receiver
   |             ---- second borrow occurs due to use of `self` in closure
...
57 |         match stream::select(connection_stream, message_stream)
   |                              ----------------- immutable borrow later used here

And when I tried to clone context it self it didn't work .. any suggestion to fix it?

That can't possibly work; it causes two mutable borrows to exist at the same time. You will need to re-think your design. It's hard to tell exactly how without knowing the specific high-level goals of the code.

You should not be manually implementing Stream here. Just make an async fn instead.

impl Server {
    async fn next(&mut self) -> Event {
        tokio::select! {
            conn = self.tcp.accept() => Event::Connection(conn.unwrap()),
            // why is this in an Option?
            msg = self.receiver.as_mut().unwrap().recv() => Event::Message(msg.unwrap()),
        }
    }
}

The code is not tested to verify that it compiles because you did not provide import statements, and I didn't feel like writing them myself.

If you want to see a correct Stream impl, it would look more like this:

Click to see Stream impl
impl Stream for Server {
    type Item = Event;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let me = Pin::into_inner(self);
        
        match me.tcp.poll_accept(cx) {
            Poll::Ready(result) => return Poll::Ready(Some(Event::Connection(result.unwrap()))),
            Poll::Pending => {},
        }
        
        if let Some(recv) = self.receiver.as_mut() {
            match recv.poll_recv() {
                Poll::Ready(item) => return Poll::Ready(Some(Event::Message(item.unwrap()))),
                Poll::Pending => {},
            }
        }
    
        Poll::Pending
    }
}
2 Likes

Thanks, @alice and @H2CO3

What I am trying to achieve is creating a stream of data from TCP socket, let me explain:

  1. Accept any new incoming TCP socket (first poll)
  2. After Accepting I need to create a new Tokio task to read the data from the accepted socket then send it back to the stream as Vec<u8> (second poll)

So basically this is how the user will consume the server struct :

let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9090);
let tcp = start_sever(socket).await.unwrap();
read_data(tcp).await.for_each(|data| async move {
    println!("{:?}",std::str::from_utf8(&data) );
}).await;

P.S : start_sever fn is just for binding only and creating new TcpListene

I am writing the requirement because ... I think what I am doing totally wrong and maybe there is a better idea from your side ... so please give me some solution.

Sending the data back is reasonable enough.

I generally recommend that you should prefer to use simpler constructs over more complicated ones unless you have a good reason. For example, I generally suggest using loops rather than for_each and just defining an async next method rather than manually implementing Stream.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.