I am interested in making a tokio/async version of udt-rs (https://github.com/eminence/udt-rs).
There is already support for polling, but it isn't in a context that is necessarily directly compatible with tokio. The currently working epoll device looks like this:
let mut sock = UdtSocket::new(SocketFamily::AFInet, SocketType::Datagram).unwrap();
let mut epoll = Epoll::create().unwrap();
epoll.add_usock(&sock, None).unwrap();
let mut counter = 0;
loop {
let (pending_rd, pending_wr) = epoll.wait(1000, true).unwrap(); // (Vec<UdtSocket>, Vec<UdtSocket>)
debug!("Pending sockets: {:?} {:?}", pending_rd, pending_wr);
let rd_len = pending_rd.len();
for s in pending_rd {
if s == sock {
debug!("trying to accept new sock");
let (new, peer) = sock.accept().unwrap();
debug!("Server recieved connection from {:?}", peer);
epoll.add_usock(&new, None).unwrap();
} else {
let msg = &mut [0u8; 100];
let len = s.recvmsg(msg).unwrap();
let msg_string = str::from_utf8(&msg[..len]).unwrap();
debug!("Received message: {:?}", msg_string);
}
}
for s in pending_wr {
let state = s.getstate();
if rd_len == 0
&& (state == UdtStatus::BROKEN
|| state == UdtStatus::CLOSED
|| state == UdtStatus::NONEXIST)
{
epoll.remove_usock(&s).unwrap();
return;
}
debug!("Sock {:?} is in state {:?}", s, state);
}
sleep(Duration::from_millis(100));
counter += 1;
assert!(counter < 500);
}
});
Given that this is how the current async works, how might I be able to make this code compatible with tokio at the mio-level? The end-goal is to have an awaitable UdtSocket that can be interfaced with Framed
.
Here's the Epoll struct:
pub struct Epoll {
eid: c_int,
// poll requires us to pass in an array to receive a list of sockets.
// instead of allocating one every time we call into poll, we create
// two vecs and re-use them. this means that while the UDT api is
// thread safe, this impl of epoll is not
rd_vec: Vec<c_int>,
wr_vec: Vec<c_int>,
}
And here's the constructor:
pub fn create() -> Result<Epoll, UdtError> {
let ret = unsafe { raw::udt_epoll_create() };
if ret < 0 {
Err(get_last_err())
} else {
Ok(Epoll {
eid: ret,
rd_vec: Vec::new(),
wr_vec: Vec::new(),
})
}
}