I'm probably missing something obvious so please bear with me. I'm working on a project with Tokio which can be thought of as having two main parts:
Downloads files from a remote server periodically with Reqwest.
Run a small axum server to report status.
I'm using the multithreaded runner but was actually testing on a single CPU VM when I noticed that during the downloads the Axum server was basically unresponsive.
The full code is a bit large but the obvious question is could the download be blocking the thread?
Everything is running in async and this is the section that downloads the file:
let mut output = sender.new_file(file.name(), hash).await?;
let data_response = self
.get_file(file)
.await
.with_context(|| format!("Failed to get file {file:?}"))?;
let mut stream = data_response.bytes_stream();
while let Some(chunk) = stream.next().await {
output.write(chunk?.as_ref()).await?;
}
output.close().await?;
Output here is a wrapper around two things - write to tokio::fs::File and update an md5 hash.
As I write this my new theory is that the hash update is blocking the thread (although I expected this would be cheap on a single chunk). Would block_in_place be the answer here? Or am I just expecting too much from a single-threaded runtime?
I just tried a quick change to wrap the hasher in spawn_blocking but it is going to require copying the data chunk I think (need to look into the Bytes type)
My other thought is whether I should be yielding on each chunk. I kind of get the impression because everything is async that shouldn't be necessary or does the next stream read jump the scheduler queue if it is ready?
Are you hashing gigabytes of data? My general recommendation is that you should yield every 10 to 100 microseconds (not milli-), so you could measure how long you spend on computing the hash and use that to inform your decision.
No, the total file size is around 3MB (but it could occasionally be 10x that). I'll take some measurements though and see what is going on at that level.
At least it sounds like my basic thinking is correct!
Ah I've found my mistake! The download process holds a mutex for the status information the monitoring endpoint uses. Wierd problems almost always seem like you are looking in the wrong place.
Just in case anyone else comes across this, the hashing was taking around 2us, so it's definitely not an issue from a blocking point of view.
Thanks for the fast responses everyone (and the great article @alice, I've referred to that a few times during this project!)
If you're using a tokio::sync::Mutex, replace it with std::sync::Mutex. This will cause a compile error if you try to hold it locked across an await in a spawned task, which is a rather blunt policy but one that ensures that you can never have a problem like this as long as your tasks are otherwise cooperating well.
Also, it can be useful to keep your usage of mutexes as encapsulated as possible: write a struct that contains the mutex, with methods that use the mutex for particular operations, so that the mutex guard can never escape them and be held for longer than intended.
Of course, not every use actually fits either of these patterns; sometimes you really do need a mutex that's held for a long operation. When that happens, write documentation for all the relevant fields and functions, reminding yourself and other maintainers of how the mutex should be used properly.