In the following code, I have implemented AsyncWrite/AsyncRead, But rust errors the trait
tokio_io::async_read::AsyncRead` is not implemented for
error[E0277]: the trait bound `UdpStream: tokio_io::async_read::AsyncRead` is not satisfied
--> src/main.rs:123:27
|
123 | acceptor.accept_async(stream);
| ^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `UdpStream`
error[E0277]: the trait bound `UdpStream: tokio_io::async_write::AsyncWrite` is not satisfied
--> src/main.rs:123:27
|
123 | acceptor.accept_async(stream);
| ^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `UdpStream`
// use async_std::io::{Read, Write};
// use async_std::net::SocketAddr;
// use async_std::net::UdpSocket;
// use async_std::pin::Pin;
// use async_std::prelude::*;
// use async_std::task;
// use async_std::task::{Context, Poll};
use pin_utils::pin_mut;
use std::cell::Cell;
use std::io::Result;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::UdpSocket;
use tokio::prelude::*;
impl tokio_io::AsyncRead for UdpStream {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
let mut socket = (&self.inner).lock().unwrap();
let future = socket.recv_from(buf);
//x.peer_addr.as_mut(); // = Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080));
pin_mut!(future);
future
.poll(cx) // Poll<Result<(usize, SocketAddr)>>
.map(|res| {
res.map(|(count, address)| {
self.peer_addr.set(Some(address));
count
})
})
}
}
impl tokio_io::AsyncWrite for UdpStream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
//let addr = "127.0.0.1:7878";
let mut socket = (&self.inner).lock().unwrap();
let future = socket.send_to(buf, self.peer_addr().unwrap());
pin_mut!(future);
future.poll(cx)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}
struct UdpStream {
local_addr: Option<SocketAddr>,
peer_addr: Cell<Option<SocketAddr>>,
inner: Arc<Mutex<UdpSocket>>,
}
impl UdpStream {
pub async fn connect(addr: &'static str) -> Result<Self> {
dbg!(2);
match UdpSocket::bind(addr).await {
Ok(socket) => Ok({
dbg!(3);
{
Self {
local_addr: Some(socket.local_addr().unwrap()),
peer_addr: Cell::new(None),
inner: Arc::new(Mutex::new(socket)),
}
}
}),
Err(_e) => unimplemented!(),
}
}
pub fn local_addr(&self) -> Result<SocketAddr> {
match self.local_addr {
Some(addr) => Ok(addr),
None => unimplemented!(),
}
}
pub fn peer_addr(&self) -> Result<SocketAddr> {
match self.peer_addr.get() {
Some(addr) => Ok(addr),
None => unimplemented!(),
}
}
}
use std::time::Duration;
use tokio::timer::Timeout;
use std::{thread, time};
use openssl::pkey::PKey;
use openssl::ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509;
use tokio_openssl::{SslAcceptorExt, SslConnectorExt};
static SERVER_CERT: &'static [u8] = include_bytes!("../server-cert.pem");
static SERVER_KEY: &'static [u8] = include_bytes!("../server-key.pem");
static SERVER_DOMAIN: &'static str = "server";
fn ssl_acceptor(certificate: &[u8], private_key: &[u8]) -> Result<SslAcceptor> {
let mut acceptor_builder = SslAcceptor::mozilla_intermediate(SslMethod::dtls())?;
acceptor_builder.set_certificate(&&X509::from_pem(certificate)?)?;
acceptor_builder.set_private_key(&&PKey::private_key_from_pem(private_key)?)?;
acceptor_builder.check_private_key()?;
let acceptor = acceptor_builder.build();
Ok(acceptor)
}
#[tokio::main]
async fn main() -> Result<()> {
let acceptor = ssl_acceptor(SERVER_CERT, SERVER_KEY)?;
dbg!(1);
let mut stream = UdpStream::connect("127.0.0.1:8080").await?;
dbg!(4);
let mut buf = vec![0u8; 1024];
acceptor.accept_async(stream);
// loop {
// // let x = Timeout::new(stream.read(&mut buf), Duration::from_millis(10000)).await??;
// let x = stream.read(&mut buf).await?;
// stream.write(&buf[..x]).await?;
// }
Ok(())
}
deps
tokio = "0.2.0-alpha"
tokio-openssl = "0.2.0-alpha"
tokio-io = "0.2.0-alpha"
pin-utils = "0.1.0-alpha"
openssl = "0.10"