I am currently working on a Rust library/CLI/UI for managing Focusrite Scarlett USB audio devices. This is possible with a recent Linux kernel and with the ALSA APIs. The Rust alsa
crate so far has been extremely helpful for this. I can list devices and explore their properties with relative ease. The general architecture within ALSA is that a system can have many devices, and each device has many elements, which are things like volume controls/mute/solo/routing configuration, etc. A device is usually a snd_ctl_t
(alsa::ctl::Ctl
) or snd_hctl_t
(alsa::hctl::HCtl
), and both implement alsa::poll::Descriptors
, making it possible to get a list of libc::pollfd
s for each of the elements of each device.
Here's where things are missing that I need to figure out. It's possible to check each element for events while blocking the current thread (with a timeout if need be), but my device has probably close to a hundred different elements within it, and this would be extremely inefficient at best. This is why I'm looking into handling it via some sort of asynchronous API rather than just slamming the kernel all the time.
I could just implement a futures::Stream
which owns a list of the libc::pollfd
s and perhaps associated references for event callbacks to make it easier to determine what device/element was affected by the event. However, I imagine that this would also result in a busy-loop of (non-blocking) syscalls. I've had it recommended to me that I should perhaps use a Tokio interval or deadline system and only poll at a reasonable interval.
However, since these are simply file descriptors like socket read/write fds, I have to imagine that there is a more efficient way of doing this. How do mio
/tokio
poll for readiness on socket I/O operations without simply loop { syscall(); }
or blocking the current thread/running a background thread?
Ideally I'd probably like to have a Stream
API; how can I implement this event streaming system without overwhelming my kernel?
Since the crate is already using libc::poll
you can probably use epoll
to watch for events on the set of file descriptors you care about?
Fair warning: I haven't personally used epoll
before, so it's possible there are gotchas I'm not aware of.
From what I can see mio::Poll
might be the right way to do it without diving directly into epoll
, but unfortunately this doesn't completely solve my question and inform my understanding. Essentially in mio
to register interest in these libc::pollfd
s, I could probably do something like this:
use mio::{Poll, Events, Token, Interest};
use mio::unix::SourceFd;
use std::os::fd::RawFd
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(1024);
// register poll fds
for (index, poll_fd) in poll_fds.iter().enumerate() {
let mut source_fd = SourceFd(&(poll_fd.fd as RawFd));
poll.registry().register(&mut source_fd, Token(index), Interest::READABLE)?;
}
// block the thread and assault the kernel
loop {
poll.poll(&mut events, None)?;
for (event_index, event) in events.iter().enumerate() {
// react somehow
}
}
I haven't tested this code, but this is what seems to emerge from the documentation. If this code does work, I still need a way to convert this into a futures::Stream
which polls conservatively and does not block the thread that the current task is running on.
I will keep digging into things to see how Tokio/Mio poll the OS respectfully and only when it is absolutely needed. I find it hard to believe that underneath it all, Tokio is running a busy loop syscalling each I/O operation until they are ready. A server with 10,000 connections simply waiting on I/O in this case would most likely be absolutely thrashing the CPU with interrupts/syscalls, but I could be wrong.
No, tokio handles async I/O by pushing blocking operations onto a thread pool where blocking is safe.
You would use epoll in a similar way, blocking a single thread (NOT one of the async runtime threads) and indicating a new value was available from the stream when the OS wakes the blocking thread (assuming it wasn't a wakeup due to the timeout expiring).
Tokio uses epoll for the vast majority of our async IO. Only things which don't work with epoll like the operations in tokio::fs
use the spawn_blocking
thread pool.
1 Like
Thanks for clarifying!
Is there a more sophisticated strategy involved than blocking with epoll_wait
for responding to epoll events?
Spawning tasks actually!
This is actually what async-await is intended to abstract over. The idea is that tasks try to make progress until they can't (generally because some socket generated EWOULDBLOCK) and then they stow their waker somewhere (in our case, in Tokio's epoll instance on Linux) and get woken when an epoll event arrives.
Think of async/await as being an abstraction that makes life less miserable than it would be if you were doing everything by manually managing your own epoll instance and maintaining a table mapping events to callbacks.
AFAIK the idea is that you do a single sycall for all I/O operations combined, so only a single thread is busy in that loop. Then when you receive an event (in the "react somehow") you send it to the Future
that requested it, letting that Future
handle the event somehow when the executor will schedule it again.
Additionally, using epoll
is not "busy polling". It adds back the blocking to non-blocking IO, i.e. the kernel waits until something becomes ready and then wakes up userspace with a list of the things that are ready. Userspace then does its work on the ready items, submitting IO until it gets an EWOULDBLOCK on the sockets and requeues those for the next poll along with all the other ones.
There are ways to further avoid unnecessary wakeups such as setting a minimum recv buffer/send buffer size when it's known that a reader needs more than 1 byte to make progress, but I don't know if tokio makes use of that.
2 Likes
Thank you so much, this is actually the fundamental question I was asking about that underlies everything else. So, based on this, you're essentially doing something like this?
let mut event_handles = Vec::new();
// add some file descriptors to the list
event_handles.push(EventHandle::new(1, Interest::READABLE));
event_handles.push(EventHandle::new(2, Interest::READABLE | Interest::Writable));
// epoll.wait_for_readiness blocks the current thread until one or more events
// are ready to be acted on
while let Some(event_handle) = epoll.wait_for_readiness(event_handles.as_slice()) {
// do stuff
}
I assume there's also a way, as it were, to wake up the thread on an interval even if no events are ready in order to perhaps handle internal things?
In any case, thank you for your answer, it all makes sense now.
So, based on this, you're essentially doing something like this? [...]
Yes, though there are more bells and whistles. For example can register edge-triggered, oneshot and level-triggered notifications.
You might want to read the epoll manpages to get an overview. But as others have said, this and similar APIs on other platforms is what underpins most async runtimes, except for file IO, since that doesn't work with the readiness model.
I assume there's also a way, as it were, to wake up the thread on an interval even if no events are ready in order to perhaps handle internal things?
Multiple ways even. The simplest one is invoking epoll with a timeout parameter. There also are eventfd and timerfd objects that can be used to wake it up based on things that aren't network IO.
@Noah-Kennedy on Discord pointed me in the right direction for what I need to do: tokio::io::unix::AsyncFd
, which allows for adding an arbitrary file descriptor to the event loop, making it possible to receive notifications when data is ready.
For my case, I have a libc::pollfd
, which has a field fd
which is a c_int
, so I can register a "read" interest like so:
use tokio::io::Interest;
use tokio::io::unix::AsyncFd;
use std::os::fd::RawFd;
let fd = get_poll_fd().fd as RawFd;
let monitor = AsyncFd::with_interest(fd, Interest::READABLE)
.unwrap();
while let guard = monitor.ready(Interest::READABLE).await? {
// more safety work needs to be done here but I am omitting
// for brevity
}
In the while
loop, the received event/data could possibly be exposed as a Stream
or used in some other way to share events with code that is interested with them.