Unable to find implemented trait

In the following code, I have implemented AsyncWrite/AsyncRead, But rust errors the trait tokio_io::async_read::AsyncRead` is not implemented for

error[E0277]: the trait bound `UdpStream: tokio_io::async_read::AsyncRead` is not satisfied
   --> src/main.rs:123:27
    |
123 |     acceptor.accept_async(stream);
    |                           ^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `UdpStream`

error[E0277]: the trait bound `UdpStream: tokio_io::async_write::AsyncWrite` is not satisfied
   --> src/main.rs:123:27
    |
123 |     acceptor.accept_async(stream);
    |                           ^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `UdpStream`
// use async_std::io::{Read, Write};
// use async_std::net::SocketAddr;
// use async_std::net::UdpSocket;
// use async_std::pin::Pin;
// use async_std::prelude::*;
// use async_std::task;
// use async_std::task::{Context, Poll};

use pin_utils::pin_mut;
use std::cell::Cell;
use std::io::Result;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::UdpSocket;
use tokio::prelude::*;

impl tokio_io::AsyncRead for UdpStream {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
        let mut socket = (&self.inner).lock().unwrap();
        let future = socket.recv_from(buf);
        //x.peer_addr.as_mut(); // = Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080));
        pin_mut!(future);
        future
            .poll(cx) // Poll<Result<(usize, SocketAddr)>>
            .map(|res| {
                res.map(|(count, address)| {
                    self.peer_addr.set(Some(address));
                    count
                })
            })
    }
}

impl tokio_io::AsyncWrite for UdpStream {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
        //let addr = "127.0.0.1:7878";
        let mut socket = (&self.inner).lock().unwrap();
        let future = socket.send_to(buf, self.peer_addr().unwrap());
        pin_mut!(future);
        future.poll(cx)
    }
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }
}

struct UdpStream {
    local_addr: Option<SocketAddr>,
    peer_addr: Cell<Option<SocketAddr>>,
    inner: Arc<Mutex<UdpSocket>>,
}

impl UdpStream {
    pub async fn connect(addr: &'static str) -> Result<Self> {
        dbg!(2);
        match UdpSocket::bind(addr).await {
            Ok(socket) => Ok({
                dbg!(3);
                {
                    Self {
                        local_addr: Some(socket.local_addr().unwrap()),
                        peer_addr: Cell::new(None),
                        inner: Arc::new(Mutex::new(socket)),
                    }
                }
            }),
            Err(_e) => unimplemented!(),
        }
    }
    pub fn local_addr(&self) -> Result<SocketAddr> {
        match self.local_addr {
            Some(addr) => Ok(addr),
            None => unimplemented!(),
        }
    }
    pub fn peer_addr(&self) -> Result<SocketAddr> {
        match self.peer_addr.get() {
            Some(addr) => Ok(addr),
            None => unimplemented!(),
        }
    }
}

use std::time::Duration;
use tokio::timer::Timeout;

use std::{thread, time};

use openssl::pkey::PKey;
use openssl::ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509;
use tokio_openssl::{SslAcceptorExt, SslConnectorExt};

static SERVER_CERT: &'static [u8] = include_bytes!("../server-cert.pem");
static SERVER_KEY: &'static [u8] = include_bytes!("../server-key.pem");
static SERVER_DOMAIN: &'static str = "server";

fn ssl_acceptor(certificate: &[u8], private_key: &[u8]) -> Result<SslAcceptor> {
    let mut acceptor_builder = SslAcceptor::mozilla_intermediate(SslMethod::dtls())?;
    acceptor_builder.set_certificate(&&X509::from_pem(certificate)?)?;
    acceptor_builder.set_private_key(&&PKey::private_key_from_pem(private_key)?)?;
    acceptor_builder.check_private_key()?;
    let acceptor = acceptor_builder.build();
    Ok(acceptor)
}

#[tokio::main]
async fn main() -> Result<()> {
    let acceptor = ssl_acceptor(SERVER_CERT, SERVER_KEY)?;
    dbg!(1);
    let mut stream = UdpStream::connect("127.0.0.1:8080").await?;
    dbg!(4);
    let mut buf = vec![0u8; 1024];
    acceptor.accept_async(stream);
    // loop {
    //     // let x = Timeout::new(stream.read(&mut buf), Duration::from_millis(10000)).await??;
    //     let x = stream.read(&mut buf).await?;
    //     stream.write(&buf[..x]).await?;
    // }
    Ok(())
}

deps

tokio = "0.2.0-alpha"
tokio-openssl = "0.2.0-alpha"
tokio-io = "0.2.0-alpha"
pin-utils = "0.1.0-alpha"
openssl = "0.10"

Expect need tokio-openssl = "0.4.0-alpha"

1 Like