I'm currently using Tokio to handle 2 main tasks in my architecture:
A file watcher, which monitors changes to files in a folder (using the notify crate).
An event processor, which consumes the deserialized events from the file watcher and updates its internal state.
I'm having issues with getting the correct setup, and constantly run into issues:
Using std::fs::File gives me high read speed on the updated file, but because files are updated at a high frequency, the file watcher tends to starve the event processor (from tokio-console, the event processor is busy 21s, scheduled 1m08s and idle 2m40s).
Using tokio::fs::File lowers my reading speed, I can see that I'm getting less throughput on the event processor, but the scheduling looks a lot healthier (from tokio-console, the event processor is busy 34s, scheduled 27s and idle 6m22s).
I'm wondering if there's a way to keep sufficient throughput, all the while maintaining healthy scheduling for my tasks? Currently I've been considering adding a tokio::task::yield_now in the file watcher to periodically yield to the event processor every x events processed.
I'm heavily relying on select! in the event processor, to perform validation tasks, metrics update,... so went with async for this part, which drove me to async for the file watcher as well. Would you expect better results having the file watcher use sync and the event processor async?
Obviously no, but that requires some level of thinking…
File access is always synchronous on Linux, if you are not using io_uring.
Since people have no idea about that Tokio silently implements it with a work threads and message passing. That works, but, arguably, not as well as if you would have simply used threads.
You mentioned using std::fs::File, apparently from within an async task (blocking! don't do that!), and you mentioned using tokio::fs::File, which has overhead because it is an async wrapper around blocking operations. What you're missing, that you should probably be doing, is the middle ground: using the blocking operations and wrapping them.
tokio::fs’s documentation says:
Be aware that most operating systems do not provide asynchronous file system APIs. Because of that, Tokio will use ordinary blocking file operations behind the scenes. This is done using the spawn_blocking threadpool to run them in the background.
The efficiency problem with tokio::fs::File is that it has to do a spawn_blocking for every individual read. Instead, you should call spawn_blocking() yourself, only once per file change, and within that scope, read the contents of the file and send events to the event processor.
(I'm inferring a lot of your program structure since you didn't specify exactly how it is organized or what it does. If this sounds like it doesn't make sense, please tell us more about your program, such as what the main loops of each of your tasks are.)
select! is full of footguns. Avoid it as much as possible.
Instead of loop { select! { a b } } use join!(async { loop a }, async { loop b }). If you have to interleave different kinds of work in the same loop, use enum Message with for msg in channel { match msg { a b } }.
What you inferred is correct, the file watcher is very simple, I'll attach a snippet below. I've removed most of the code that isn't relevant but that's mostly yet: watch a set of folders for file changes, get fs notification for the watcher, read, deserialize, revert read with seek_relative if deserialization fails. The only "gotcha" are the high frequency writes happening, meaning the file watcher almost constantly gets fs events to handle.
pub fn spawn(dir: PathBuf, batch: bool) -> Result<UnboundedReceiver<Event>> {
let (fs_event_tx, fs_event_rx) = unbounded();
let mut watcher = recommended_watcher(move |res| {
let fs_event_tx = fs_event_tx.clone();
if let Err(err) = fs_event_tx.send(res) {
error!(target: "feed::file_watcher", ?err, "Failed to send event from recommended watcher");
}
})?;
// Initialize everything else
....
std::thread::spawn(move || file_watcher.run());
Ok(event_rx)
}
fn run(mut self) {
loop {
crossbeam_channel::select! {
recv(self.watcher_rx) -> event => match event {
Ok(Ok(event)) => self.handle_fs_event(event)
...
}
},
default(WATCHER_TIMEOUT) => {
log_and_panic("Stream has fallen behind (node failed?)");
}
}
}
}
fn handle_fs_event(&mut self) {
self.file.read_to_end(&mut self.read_buf)?;
let lines = std::str::from_utf8(&self.read_buf)?.lines();
for line in lines {
if line.is_empty() {
continue;
}
// use the provided deserializer.
let res = sonic_rs::from_str::<Event>(&line)
.ok_or_else(|| eyre!("missing deserializer {event_source}"))?(line);
let data = match res {
Ok(data) => data,
Err(err) => {
// if we run into a serialization error (hitting EOF), just return to last line.
self.file.seek_relative(-line.len() as i64);
break;
}
};
self.event_tx.send(data)?;
}
Ok(())
}
I'm not sure this would change much, I think the main issue I encountered using tokio::fs::File was indeed the spawn_blocking overhead. Since I only read once per file change anyway (unless the deserialization reverts, in which case I guess it was spawning a second blocking thread), I'm not sure I'd win much spawning my own blocking threads.
I don't see any Tokio or async in this code at all? Is this a version you rewrote to be non-async? I was hoping to see enough of your async code structure that I could look for any blocking-in-async bugs that might be harming your throughput.
I'm not sure I'd win much spawning my own blocking threads.
Tokio's spawn_blocking does not create a thread each time. It creates blocking “tasks” that are executed on a thread pool, which is usually more efficient.
Oh sorry yes this is the version without any async. The async version is exactly the same but uses tokio::select and tokio::fs::File for reading and seeking.
I think there’s more nuance to “exactly the same” than you imagine. But in any case, you asked about how to improve the performance of an async-using program and this isn't an async-using program.
I might be missing some nuance indeed. After going through some tokio issues, I'm going to try this solution, using block_in_place. I will also try spawn_blocking and circle back on results.