Implementing polymorphic `Incoming`

I've implemented a universal Listener, Stream and SocketAddr so a user can use async_std::net or async_std::os::unix::net objects. The last piece I need to implement is universal Incoming object.

Here is the source code for Listener, Stream and Incoming:

use async_std::io;
use async_std::net;
#[cfg(unix)]
use async_std::os::unix::net as unix;
use crate::Incoming;
use crate::SocketAddr;
use crate::Stream;

#[derive(Debug)]
pub enum Listener {
    Inet(net::TcpListener),
    #[cfg(unix)]
    Unix(unix::UnixListener)
}

impl From<net::TcpListener> for Listener {
    fn from(s: net::TcpListener) -> Listener {
        Listener::Inet(s)
    }
}

#[cfg(unix)]
impl From<unix::UnixListener> for Listener {
    fn from(s: unix::UnixListener) -> Listener {
        Listener::Unix(s)
    }
}

impl Listener {
    pub async fn bind(s: &SocketAddr) -> io::Result<Listener> {
        match s {
            SocketAddr::Inet(s) => net::TcpListener::bind(s).await.map(Listener::Inet),
            #[cfg(unix)]
            SocketAddr::Unix(s) => unix::UnixListener::bind(s).await.map(Listener::Unix)
        }
    }

    pub async fn accept(&self) -> io::Result<(Stream, SocketAddr)> {
        match self {
            Listener::Inet(l) => l.accept().await.map(|(s,e)| (s.into(), e.into())),
            #[cfg(unix)]
            Listener::Unix(l) => l.accept().await.map(|(s,e)| (s.into(), e.into()))
        }
    }

    pub fn incoming(&self) -> Incoming<'_> {
        match self {
            Listener::Inet(l) => Incoming::from(l.incoming()),
            #[cfg(unix)]
            Listener::Unix(l) => Incoming::from(l.incoming()),
        }
    }
}
use std::pin::Pin;
use async_std::io;
use async_std::net;
use async_std::prelude::*;
use async_std::task::{Context, Poll};
#[cfg(unix)]
use async_std::os::unix::net as unix;
use crate::SocketAddr;

#[derive(Debug)]
pub enum Stream {
    Inet(net::TcpStream),
    #[cfg(unix)]
    Unix(unix::UnixStream)
}

impl From<net::TcpStream> for Stream {
    fn from(s: net::TcpStream) -> Stream {
        Stream::Inet(s)
    }
}

#[cfg(unix)]
impl From<unix::UnixStream> for Stream {
    fn from(s: unix::UnixStream) -> Stream {
        Stream::Unix(s)
    }
}

impl Stream {
    pub async fn connect(s: &SocketAddr) -> io::Result<Self> {
        match s {
            SocketAddr::Inet(s) => net::TcpStream::connect(s).await.map(Stream::Inet),
            #[cfg(unix)]
            SocketAddr::Unix(s) => unix::UnixStream::connect(s).await.map(Stream::Unix),
        }
    }

    pub fn local_addr(&self) -> io::Result<SocketAddr> {
        match self {
            Stream::Inet(s) => s.local_addr().map(SocketAddr::Inet),
            #[cfg(unix)]
            Stream::Unix(s) => s.local_addr().map(|e| e.into())
        }
    }

    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
        match self {
            Stream::Inet(s) => s.peer_addr().map(SocketAddr::Inet),
            #[cfg(unix)]
            Stream::Unix(s) => s.peer_addr().map(|e| e.into())
        }
    }

    pub fn shutdown(&self, t: net::Shutdown) -> io::Result<()> {
        match self {
            Stream::Inet(s) => s.shutdown(t),
            #[cfg(unix)]
            Stream::Unix(s) => s.shutdown(t)
        }
    }
}

impl io::Read for &Stream {

    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        match Pin::get_unchecked_mut(self) {
            Stream::Inet(s) => {
                (&mut (&*s)).read(buf);
                Pin::new(&mut &*s).poll_read(cx, buf)
            },
            #[cfg(unix)]
            Stream::Unix(s) => {
                (&mut (&*s)).read(buf);
                Pin::new(&mut &*s).poll_read(cx, buf)
            },
        }
    }
}

impl io::Write for &Stream {

    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        match Pin::get_unchecked_mut(self) {
            Stream::Inet(s) => {
                (&mut (&*s)).write(buf);
                Pin::new(&mut &*s).poll_write(cx, buf)
            },
            #[cfg(unix)]
            Stream::Unix(s) => {
                (&mut (&*s)).write(buf);
                Pin::new(&mut &*s).poll_write(cx, buf)
            },
        }
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match Pin::get_unchecked_mut(self) {
            Stream::Inet(s) => {
                (&mut (&*s)).flush();
                Pin::new(&mut &*s).poll_flush(cx)
            },
            #[cfg(unix)]
            Stream::Unix(s) => {
                (&mut (&*s)).flush();
                Pin::new(&mut &*s).poll_flush(cx)
            },
        }
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match Pin::get_unchecked_mut(self) {
            Stream::Inet(s) => {
                (&mut (&*s)).flush();
                Pin::new(&mut &*s).poll_close(cx)
            },
            #[cfg(unix)]
            Stream::Unix(s) => {
                (&mut (&*s)).flush();
                Pin::new(&mut &*s).poll_close(cx)
            },
        }
    }
}

impl io::Read for Stream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &*self).poll_read(cx, buf)
    }
}

impl io::Write for Stream {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &*self).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut &*self).poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut &*self).poll_close(cx)
    }
}
use std::pin::Pin;
use async_std::task::{Context, Poll};
use async_std::stream;
use async_std::io;
use async_std::net;
#[cfg(unix)]
use async_std::os::unix::net as unix;
use crate::Stream;

#[derive(Debug)]
pub enum Incoming<'a> {
    Inet(net::Incoming<'a>),
    #[cfg(unix)]
    Unix(unix::Incoming<'a>)
}

impl<'a> From<net::Incoming<'a>> for Incoming<'a> {
    fn from(s: net::Incoming<'_>) -> Incoming {
        Incoming::Inet(s)
    }
}

#[cfg(unix)]
impl<'a> From<unix::Incoming<'a>>for Incoming<'a> {
    fn from(s: unix::Incoming<'_>) -> Incoming {
        Incoming::Unix(s)
    }
}

impl<'a> stream::Stream for Incoming<'a> {
    type Item = io::Result<Stream>; // universal `Stream`

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match Pin::get_unchecked_mut(self) {
            Incoming::Inet(s) => {
                Pin::new(&mut s).poll_next(cx) // io::Result<TcpStream> but io::Result<Stream>; expected
            },
            #[cfg(unix)]
            Incoming::Unix(s) => {
                Pin::new(&mut s).poll_next(cx) // io::Result<UnixStream> but io::Result<Stream>; expected
            },
        }
    }
}

I'm not sure how to properly implement poll_next method in Incoming. I need to somehow map the result into the correct type. The question is how?

The error says:

mismatched types

expected enum `stream::Stream`, found struct `async_std::net::tcp::stream::TcpStream`

note: expected enum `std::task::Poll<std::option::Option<std::result::Result<stream::Stream, _>>>`
         found enum `std::task::Poll<std::option::Option<std::result::Result<async_std::net::tcp::stream::TcpStream, _>>>`rustc(E0308)

Some help please, thanks.

You can use map_ok for this.

Pin::new(&mut s).poll_next(cx).map_ok(|tcp| Stream::Inet(tcp))

Note that you should not need the unsafe pin method as your struct is Unpin. You can just do

match Pin::into_inner(self) {
    ...
}
1 Like

@alice you rock! Well, it says that map_ok is unstable feature. Is there an alternative to this?

Ah, sorry, I didn't notice that. You can use a nested map instead.

Pin::new(&mut s).poll_next(cx).map(|res| res.map(Stream::Inet))

This first uses Poll::map and then Result::map. I also realized the closure was not necessary, as we can use the constructor directly.

Close. It shows an error on res.map(Stream::Inet).

Well I have not tested it locally. What is the error?

The error is on the last map call. I guess I have to provide a closure and an argument:

type mismatch in function arguments

expected signature of `fn(std::result::Result<async_std::net::tcp::stream::TcpStream, std::io::Error>) -> _`rustc(E0631)
stream.rs(12, 5): found signature of `fn(async_std::net::tcp::stream::TcpStream) -> _`
incoming.rs(37, 66): expected signature of `fn(std::result::Result<async_std::net::tcp::stream::TcpStream, std::io::Error>) -> _`

Let me check :).

What is the full error?

That's the full error I see. It's "red" at res.map(Stream::Inet).

Ah, is that some dialog provided by your IDE? Please ask the compiler directly. The full error has more info. You can do it with cargo build in your terminal.

Here is what I see in the console:

error[E0631]: type mismatch in function arguments
  --> src/incoming.rs:37:66
   |
37 |                 Pin::new(&mut s).poll_next(cx).map(|res| res.map(Stream::Inet))
   |                                                                  ^^^^^^^^^^^^ expected signature of `fn(std::result::Result<async_std::net::tcp::stream::TcpStream, std::io::Error>) -> _`
   | 
  ::: src/stream.rs:12:5
   |
12 |     Inet(net::TcpStream),
   |     -------------------- found signature of `fn(async_std::net::tcp::stream::TcpStream) -> _`

error[E0658]: use of unstable library feature 'poll_map'
  --> src/incoming.rs:42:48
   |
42 |                 Pin::new(&mut s).poll_next(cx).map_ok(|unix| Stream::Unix(unix))
   |                                                ^^^^^^
   |
   = note: see issue #63514 <https://github.com/rust-lang/rust/issues/63514> for more information

For this source:

impl<'a> stream::Stream for Incoming<'a> {
    type Item = io::Result<Stream>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match Pin::into_inner(self) {
            Incoming::Inet(s) => {
                // Pin::new(&mut s).poll_next(cx) // io::Result<TcpStream> but I need io::Result<Stream>;
                Pin::new(&mut s).poll_next(cx).map(|res| res.map(Stream::Inet))
            },
            #[cfg(unix)]
            Incoming::Unix(s) => {
                // Pin::new(&mut s).poll_next(cx) // io::Result<UnixStream> but I need io::Result<Stream>;
                Pin::new(&mut s).poll_next(cx).map_ok(|unix| Stream::Unix(unix))
            },
        }
    }
}

Thank you. That's how the full error looks, so use that when posting on the forums. I now realize what is wrong — there's an Option in there in that type, which is part of the Stream trait.

To fix this, you need a call to Option::map in there.

Pin::new(&mut s).poll_next(cx).map(|opt| opt.map(|res| res.map(Stream::Inet)))

A match might be better at this point.

match Pin::new(&mut s).poll_next(cx) {
    Poll::Ready(Some(Ok(stream))) => Poll::Ready(Some(Ok(Stream::Inet(stream)))),
    Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
    Poll::Ready(None) => Poll::Ready(None),
    Poll::Pending => Poll::Pending,
}
1 Like

Maybe there is some incantation using the ready! macro and the question mark operator that helps here, but oh well.

This works but I now get:

error[E0507]: cannot move out of a mutable reference
  --> src/incoming.rs:34:15
   |
34 |         match Pin::into_inner(self) {
   |               ^^^^^^^^^^^^^^^^^^^^^
35 |             Incoming::Inet(mut s) => {
   |                            ----- data moved here
...
45 |             Incoming::Unix(mut s) => {
   |                            ----- ...and here
   |
   = note: move occurs because these variables have types that don't implement the `Copy` trait

Those mut were not there before, right? I'm pretty sure either of these will work:

match Pin::into_inner(self) {
    Incoming::Inet(s) => { ... }
    #[cfg(unix)]
    Incoming::Unix(s) => { ... }
}
match Pin::into_inner(self) {
    Incoming::Inet(ref mut s) => { ... }
    #[cfg(unix)]
    Incoming::Unix(ref mut s) => { ... }
}

Here we go. This works:

impl<'a> stream::Stream for Incoming<'a> {
    type Item = io::Result<Stream>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match Pin::into_inner(self) {
            Incoming::Inet(ref mut s) => {
                Pin::new(s).poll_next(cx).map(|opt| opt.map(|res| res.map(Stream::Inet)))
            }
            #[cfg(unix)]
            Incoming::Unix(ref mut s) => {
                Pin::new(s).poll_next(cx).map(|opt| opt.map(|res| res.map(Stream::Unix)))
            }
        }
    }
}
1 Like

Thank you @alice. I really appreciate your help.

1 Like

I published a crate that provides the whole logic from this thread:
https://crates.io/crates/async-uninet

Thanks @alice.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.