Handle stream's "poll_next" by external structure

I have two structures Codec and Incoming. The Codec implements futures::stream::Stream but for its poll_next logic I'd like to be handled by the Incoming instance. The reason I'd like this is the fact that the Codec holds the AsyncRead + AsyncWrite + Unpin instance (e.g. TcpSocket) which is used by many objects (e.g. also by Outgoing which is currently commented out).

Code example:

use std::task::{Context, Poll};
use std::pin::Pin;
use futures::prelude::{AsyncRead, AsyncWrite};

/// CODEC

pub struct Codec<S>
    where
    S: AsyncRead + AsyncWrite + Unpin,
{
    socket: S,
    incoming: Incoming,
    // outgoing: Outgoing,
}

impl<S> Codec<S>
    where
    S: AsyncRead + AsyncWrite + Unpin,
{
    pub fn new(socket: S) -> Self {
        Self {
            socket,
            incoming: Incoming::default(),
            // outgoing: Outgoing::default(),
        }
    }
}

impl<S> futures::stream::Stream for Codec<S>
    where
    S: AsyncRead + AsyncWrite + Unpin,
{
    type Item = Result<Vec<u8>, Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.incoming).poll_next(cx, &mut self.socket) // I get error for "&mut self.socket": cannot borrow `self` as mutable more than once at a time
    }
}

/// INCOMING

pub struct Incoming {}

impl Incoming {
    pub fn poll_next<R>(&mut self, cx: &mut Context, reader: &mut R) -> Poll<Option<Result<Vec<u8>, Error>>>
        where
        R: AsyncRead + Unpin,    
    {
        let reader = Pin::new(&mut reader);
        let size = match reader.poll_read(cx, &mut buf)? { // I need to poll the reader
            Poll::Ready(size) => size,
            Poll::Pending => 0,
        };
        ...
    }
}

As you can see my code example shows an error because I borrow self twice. I could require the socket to implement Clone and pass duplicates but this is not always possible. I wonder how such cases should be handled. Thanks.

UPDATE:
I could maybe wrap the socket into a structure (Incoming, Outgoing) multiple times and implement AsyncRead/AsyncWrite. Thus I would get a single socket with all these new features. I that case I would not need to move the logic into a separate structure.

Since self is wrapped in Pin, your Stream impl becomes desugared to the following:

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
    Pin::new(&mut DerefMut::deref_mut(self).incoming).poll_next(cx, &mut DerefMut::deref_mut(self).socket)
}

However, the DerefMut::deref_mut call will borrow all of self, while you only want to borrow one field of self. To fix this, deref once, and then access each field through the resulting &mut Self.

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
    let me = Pin::into_inner(self);
    Pin::new(&mut me.incoming).poll_next(cx, &mut me.socket)
}
3 Likes

A yes ... "Pin::into_inner" works. Thank you!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.