Implementing Read trait through AsyncRead

I would like to implement DTLS by using tokio-OpenSSL, the trait requires Read/Write and AsyncRead/AsyncWrite traits. Fortunately, I have implemented AsyncRead/AsyncWrite but, I don't have any idea for implementing Read/Write traits, because Read/Write traits are sync and either of tokio's methods or AsyncRead/AsyncWrite trait is async.

use pin_utils::pin_mut;
use std::cell::Cell;
use std::io::Result;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::net::UdpSocket;
use tokio::prelude::*;
use tokio_io::{AsyncRead, AsyncWrite};

impl Read for UdpStream {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        let x = tokio_io::AsyncReadExt::read(self, buf);
        // match tokio_io::AsyncReadExt::read(self, buf) { // Read should called from AsyncRead
        //     Poll::Pending => {},
        //     Poll::Ready(_) => {},
        // };
        unimplemented!()
    }
}
impl Write for UdpStream {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        unimplemented!()
    }
    fn flush(&mut self) -> Result<()> {
        unimplemented!()
    }
}

impl AsyncRead for UdpStream {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
        let mut socket = (&self.inner).lock().unwrap();
        let future = socket.recv_from(buf);
        //x.peer_addr.as_mut(); // = Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080));
        pin_mut!(future);
        future
            .poll(cx) // Poll<Result<(usize, SocketAddr)>>
            .map(|res| {
                res.map(|(count, address)| {
                    self.peer_addr.set(Some(address));
                    count
                })
            })
    }
}

impl AsyncWrite for UdpStream {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
        //let addr = "127.0.0.1:7878";
        let mut socket = (&self.inner).lock().unwrap();
        let future = socket.send_to(buf, self.peer_addr().unwrap());
        pin_mut!(future);
        future.poll(cx)
    }
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        Poll::Ready(Ok(()))
    }
}

struct UdpStream {
    local_addr: Option<SocketAddr>,
    peer_addr: Cell<Option<SocketAddr>>,
    inner: Arc<Mutex<UdpSocket>>,
}

impl UdpStream {
    pub async fn connect(addr: &'static str) -> Result<Self> {
        dbg!(2);
        match UdpSocket::bind(addr).await {
            Ok(socket) => Ok({
                dbg!(3);
                {
                    Self {
                        local_addr: Some(socket.local_addr().unwrap()),
                        peer_addr: Cell::new(None),
                        inner: Arc::new(Mutex::new(socket)),
                    }
                }
            }),
            Err(_e) => unimplemented!(),
        }
    }
    pub fn local_addr(&self) -> Result<SocketAddr> {
        match self.local_addr {
            Some(addr) => Ok(addr),
            None => unimplemented!(),
        }
    }
    pub fn peer_addr(&self) -> Result<SocketAddr> {
        match self.peer_addr.get() {
            Some(addr) => Ok(addr),
            None => unimplemented!(),
        }
    }
}

use std::time::Duration;
use tokio::timer::Timeout;

use std::{thread, time};

use openssl::pkey::PKey;
use openssl::ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509;

static SERVER_CERT: &'static [u8] = include_bytes!("../server-cert.pem");
static SERVER_KEY: &'static [u8] = include_bytes!("../server-key.pem");
static SERVER_DOMAIN: &'static str = "server";

fn ssl_acceptor(certificate: &[u8], private_key: &[u8]) -> Result<SslAcceptor> {
    let mut acceptor_builder = SslAcceptor::mozilla_intermediate(SslMethod::dtls())?;
    acceptor_builder.set_certificate(&&X509::from_pem(certificate)?)?;
    acceptor_builder.set_private_key(&&PKey::private_key_from_pem(private_key)?)?;
    acceptor_builder.check_private_key()?;
    let acceptor = acceptor_builder.build();
    Ok(acceptor)
}

#[tokio::main]
async fn main() -> Result<()> {
    let acceptor = ssl_acceptor(SERVER_CERT, SERVER_KEY)?;
    dbg!(1);
    let mut stream = UdpStream::connect("127.0.0.1:8080").await?;
    dbg!(4);
    let mut buf = vec![0u8; 1024];
    acceptor.accept(stream);
    // loop {
    //     // let x = Timeout::new(stream.read(&mut buf), Duration::from_millis(10000)).await??;
    //     let x = stream.read(&mut buf).await?;
    //     stream.write(&buf[..x]).await?;
    // }
    Ok(())
}

Dependencies:

tokio = "0.2.0-alpha"
tokio-openssl = "0.4.0-alpha"
tokio-io = "0.2.0-alpha"
pin-utils = "0.1.0-alpha"
openssl = "0.10"

I would be really grateful for your helps/suggestions.

You shouldn't need to implement Read and Write if you're using tokio-openssl.

1 Like

I had the same thought as you, but unfortunately, by commenting Write/Read implementer the following error occurs:

error[E0277]: the trait bound `UdpStream: std::io::Read` is not satisfied
   --> src/main.rs:141:21
    |
141 |     acceptor.accept(stream);
    |                     ^^^^^^ the trait `std::io::Read` is not implemented for `UdpStream`

error[E0277]: the trait bound `UdpStream: std::io::Write` is not satisfied
   --> src/main.rs:141:21
    |
141 |     acceptor.accept(stream);
    |                     ^^^^^^ the trait `std::io::Write` is not implemented for `UdpStream`

You need to actually use tokio-openssl: https://docs.rs/tokio-openssl/0.4.0-alpha.6/tokio_openssl/fn.accept.html

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.