Using mio to wrap a raw fd and registering it w/tokio

I am interested in making a tokio/async version of udt-rs (https://github.com/eminence/udt-rs).

There is already support for polling, but it isn't in a context that is necessarily directly compatible with tokio. The currently working epoll device looks like this:

        let mut sock = UdtSocket::new(SocketFamily::AFInet, SocketType::Datagram).unwrap();
        let mut epoll = Epoll::create().unwrap();

        epoll.add_usock(&sock, None).unwrap();

        let mut counter = 0;
        loop {
            let (pending_rd, pending_wr) = epoll.wait(1000, true).unwrap(); // (Vec<UdtSocket>, Vec<UdtSocket>)
            debug!("Pending sockets: {:?} {:?}", pending_rd, pending_wr);

            let rd_len = pending_rd.len();
            for s in pending_rd {
                if s == sock {
                    debug!("trying to accept new sock");
                    let (new, peer) = sock.accept().unwrap();
                    debug!("Server recieved connection from {:?}", peer);
                    epoll.add_usock(&new, None).unwrap();
                } else {
                    let msg = &mut [0u8; 100];
                    let len = s.recvmsg(msg).unwrap();
                    let msg_string = str::from_utf8(&msg[..len]).unwrap();
                    debug!("Received message: {:?}", msg_string);
                }
            }

            for s in pending_wr {
                let state = s.getstate();
                if rd_len == 0
                    && (state == UdtStatus::BROKEN
                        || state == UdtStatus::CLOSED
                        || state == UdtStatus::NONEXIST)
                {
                    epoll.remove_usock(&s).unwrap();
                    return;
                }
                debug!("Sock {:?} is in state {:?}", s, state);
            }
            sleep(Duration::from_millis(100));
            counter += 1;
            assert!(counter < 500);
        }
    });

Given that this is how the current async works, how might I be able to make this code compatible with tokio at the mio-level? The end-goal is to have an awaitable UdtSocket that can be interfaced with Framed.

Here's the Epoll struct:

pub struct Epoll {
    eid: c_int,

    // poll requires us to pass in an array to receive a list of sockets.
    // instead of allocating one every time we call into poll, we create
    // two vecs and re-use them.  this means that while the UDT api is
    // thread safe, this impl of epoll is not
    rd_vec: Vec<c_int>,
    wr_vec: Vec<c_int>,
}

And here's the constructor:

    pub fn create() -> Result<Epoll, UdtError> {
        let ret = unsafe { raw::udt_epoll_create() };
        if ret < 0 {
            Err(get_last_err())
        } else {
            Ok(Epoll {
                eid: ret,
                rd_vec: Vec::new(),
                wr_vec: Vec::new(),
            })
        }
    }

Here's the relevant C code:


int CEPoll::create()
{
   CGuard pg(m_EPollLock);

   int localid = 0;

   #ifdef LINUX
   localid = epoll_create(1024);
   if (localid < 0)
      throw CUDTException(-1, 0, errno);
   #else
   // on BSD, use kqueue
   // on Solaris, use /dev/poll
   // on Windows, select
   #endif

   if (++ m_iIDSeed >= 0x7FFFFFFF)
      m_iIDSeed = 0;

   CEPollDesc desc;
   desc.m_iID = m_iIDSeed;
   desc.m_iLocalID = localid;
   m_mPolls[desc.m_iID] = desc;

   return desc.m_iID;
}

int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
   if (p == m_mPolls.end())
      throw CUDTException(5, 13);

   if (!events || (*events & UDT_EPOLL_IN))
      p->second.m_sUDTSocksIn.insert(u);
   if (!events || (*events & UDT_EPOLL_OUT))
      p->second.m_sUDTSocksOut.insert(u);
   if (!events || (*events & UDT_EPOLL_ERR))
      p->second.m_sUDTSocksEx.insert(u);

   return 0;
}


1 Like

This is the purpose of PollEvented.

Using the example from the link you provided, we see the idea that a seperate thread would have to change the readiness:

use mio::{Ready, Registration, Poll, PollOpt, Token};
use mio::event::Evented;

use std::io;
use std::time::Instant;
use std::thread;

pub struct Deadline {
    when: Instant,
    registration: Registration,
}

impl Deadline {
    pub fn new(when: Instant) -> Deadline {
        let (registration, set_readiness) = Registration::new2();

        thread::spawn(move || {
            let now = Instant::now();

            if now < when {
                thread::sleep(when - now);
            }

            set_readiness.set_readiness(Ready::readable());
        });

        Deadline {
            when: when,
            registration: registration,
        }
    }

    pub fn is_elapsed(&self) -> bool {
        Instant::now() >= self.when
    }
}

impl Evented for Deadline {
    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
        -> io::Result<()>
    {
        self.registration.register(poll, token, interest, opts)
    }

    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
        -> io::Result<()>
    {
        self.registration.reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &Poll) -> io::Result<()> {
        self.registration.deregister(poll)
    }
}

Note: we must implement Evented in order to wrap a PollEvented around the UdtSocket.

Now, looking at the current async implementation in the Udt crate:

    // spawn the server
    let server = spawn(move || {
        let mut sock = UdtSocket::new(SocketFamily::AFInet, SocketType::Datagram).unwrap();
        do_platform_specific_init(&mut sock);
        sock.bind(SocketAddr::V4(SocketAddrV4::new(localhost, 0)))
            .unwrap();
        let my_addr = sock.getsockname().unwrap();
        debug!("Server bound to {:?}", my_addr);

        sock.listen(5).unwrap();

        tx.send(my_addr.port()).unwrap();

        let mut epoll = Epoll::create().unwrap();

        epoll.add_usock(&sock, None).unwrap();

        let mut counter = 0;
        loop {
            let (pending_rd, pending_wr) = epoll.wait(1000, true).unwrap();
            debug!("Pending sockets: {:?} {:?}", pending_rd, pending_wr);

            let rd_len = pending_rd.len();
            for s in pending_rd {
                if s == sock {
                    debug!("trying to accept new sock");
                    let (new, peer) = sock.accept().unwrap();
                    debug!("Server recieved connection from {:?}", peer);
                    epoll.add_usock(&new, None).unwrap();
                } else {
                    let msg = &mut [0u8; 100];
                    let len = s.recvmsg(msg).unwrap();
                    let msg_string = str::from_utf8(&msg[..len]).unwrap();
                    debug!("Received message: {:?}", msg_string);
                }
            }

            for s in pending_wr {
                let state = s.getstate();
                if rd_len == 0
                    && (state == UdtStatus::BROKEN
                        || state == UdtStatus::CLOSED
                        || state == UdtStatus::NONEXIST)
                {
                    epoll.remove_usock(&s).unwrap();
                    return;
                }
                debug!("Sock {:?} is in state {:?}", s, state);
            }
            sleep(Duration::from_millis(100));
            counter += 1;
            assert!(counter < 500);
        }
    });

In the case above, considering Readiness .... would I NEED to create a new thread for the protocol entirely, or would I be able to incorporate it onto a provided tokio event loop?

In the case that I need to use a thread, I could just use async channels to send information into a receiver task spawned on the tokio threadpool.

What would make most sense, in terms of efficiency?

Where do the notifications come from? Does the library have its own epoll thing?

I found this in the C code. The first function is called when the Epoll is created, and the second function is called when adding the socket.

Is local_id the raw fd?

It appears so. https://man7.org/linux/man-pages/man2/epoll_create.2.html

Does this mean that I'd need to make a C function that returns this raw FD back to rust? From there, then what? Here's one idea: use this structure:

https://docs.rs/smol/0.3.2/smol/struct.Async.html

The Async structure you linked is more or less equivalent to Tokio's PollEvented that I linked earlier. And generally you should not be mixing executors.

1 Like

This is probably a bad idea, but can’t you just:

  • Put your existing code inside an async block
  • Set the epoll_wait timeout to zero
  • Replace the sleep with tokio’s version
1 Like

This would be a good method. I would like to add to this, and make a note for especially for single-threaded networking protocols: if you use tokio's sleep, make sure the sleep is taking place on its own async task, and not being .awaited on the same task that must handle inbound (or outbound) packets, otherwise the entire execution gets blocked until the sleep is over. I learned this the long way (Lots of debugging :slight_smile: )

1 Like

While the aformentioned method would work, it should really be possible to incorporate UDT with tokio. Knowing that setting the Epoll's wait to 0 serves as a non-blocking poll, how might we incorporate that with PollEvented? (PollEvented requires that the inner object is Evented)

We can use a thread to call Epoll::wait(Duration::from_millis(1000)) in a loop, and then change the readiness as we get events. However, can we do this without spawning an entirely new thread?

If all you need to do is to give a file descriptor to Tokio's epoll setup, you can follow this example:

use mio::{Ready, Poll, PollOpt, Token};
use mio::event::Evented;
use mio::unix::EventedFd;

use std::os::unix::io::RawFd;
use std::io;

pub struct MyIo {
    fd: RawFd,
}

impl Evented for MyIo {
    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
        -> io::Result<()>
    {
        EventedFd(&self.fd).register(poll, token, interest, opts)
    }

    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
        -> io::Result<()>
    {
        EventedFd(&self.fd).reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &Poll) -> io::Result<()> {
        EventedFd(&self.fd).deregister(poll)
    }
}

You can now put your custom type inside PollEvented and call poll_read_ready or poll_write_ready to receive notifications from epoll as necessary.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.