I have constructed a proof of concept where tokio::io::copy will hang forever when switching between Cellular / WiFi networks if the reader is a reqwest::async_impl::Response wrapped in a tokio_io::AsyncRead using FuturesAsyncReadCompatExt.
async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
let download = response.bytes_stream();
let download = download
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read();
let download = download.compat();
// Wrap download to be able to get progress in terminal, this is not
// essential to the PoC but helps diagnose it.
let mut download = ProgressReadAdapter::new(download);
let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
.await
.wrap_api_err()?
.wrap_api_err()?;
let mut outfile = tokio::fs::File::create(temp_file.path())
.await
.wrap_api_err()?;
// Code hangs here forever after a network switch
tokio::io::copy(&mut download, &mut outfile)
.await
.wrap_api_err()?;
outfile.flush().await.wrap_api_err()?;
let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
.await
.wrap_api_err()?
.wrap_api_err()?;
Ok(())
}
Is there a way to make tokio::io::copy return an error instead of hanging forever?
This is a general hazard of using TCP; by default there is no detection of the difference between “no data received” and “sender is unreachable or crashed without sending any packets”.
This can be changed by setting the socket option SO_KEEPALIVE. reqwest has a method for this: ClientBuilder::tcp_keepalive().
I managed to make the proof-of-concept smaller by experimenting a bit-
#[tokio::main()]
async fn main() {
let client = ClientBuilder::default()
// Doesn't seem to help
.tcp_keepalive(Some(Duration::from_secs(1)))
// Doesn't seem to help
.connect_timeout(Duration::from_secs(1))
.build()
.unwrap();
let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
// Hangs here as soon as switching WiFi network, and
// then resumes when switching back.
// Confirm using a network monitor, e.b. bmon
let _bytes = response.bytes().await.unwrap();
}
It turns out tokio::io::copy isn't necessary to reproduce. It's also not reproducible when having a wired network and disabling the network card (with WiFi-fallback) then it errors out just fine.
I don't have a Linux laptop so I can't try to reproduce it there unfortunately.
I have no idea what macOS does when switching between WiFi and LTE, but if the traffic on the existing connection is blackholed instead of generating an error indication on the socket level, the switch may be difficult to detect. Connection timeout is irrelevant here, coming into play only when the connection is being established. That leaves operation-level timeouts, which makes it necessary to switch the download to streaming mode, since it's impossible to have a one-size-fits-all timeout for all payloads.
To use streaming in reqwest, you need the stream feature and the StreamExt trait from futures-util, as well as the time feature in Tokio if you're not using the full feature set. The code might look like this:
use std::time::Duration;
use futures_util::StreamExt;
use tokio::time;
...
let mut stream = response.bytes_stream();
while let Ok(Some(chunk)) = time::timeout(Duration::from_secs(5), stream.next()).await {
let _chunk = chunk.unwrap();
}
(The set-up for response can be the same as in the existing code.) This reliably breaks from the while let loop when I start dropping the traffic from the server with packet filtering. This is on Linux; try it on macOS and see.
Thank you @inejge, but I no longer think this is something that is easy to resolve from inside my application by using reqwest.
I am using a variant of what you suggest already, as you can see in my original post, through the tokio_util::compat::FuturesAsyncReadCompatExt trait and then tokio::io::copy.
This is the solution I reached, I wrap AsyncRead with another AsyncRead that detects if a Response is stalled, and stops the download by raising an error if that happens.
/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn't making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won't stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the averge will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor<R: AsyncRead> {
#[pin]
inner: R,
interval: Interval,
interval_bytes: usize,
}
impl<R: AsyncRead> StalledReadMonitor<R> {
pub fn new(inner: R) -> Self {
Self {
inner,
interval: interval_at(
Instant::now().add(Duration::from_millis(5_000)),
Duration::from_millis(5_000),
),
interval_bytes: 0,
}
}
}
impl<R: AsyncRead> AsyncRead for StalledReadMonitor<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
let this = self.project();
let before = buf.filled().len();
let mut result = this.inner.poll_read(cx, buf);
let after = buf.filled().len();
*this.interval_bytes += after - before;
match this.interval.poll_tick(cx) {
Poll::Pending => {}
Poll::Ready(_) => {
if *this.interval_bytes == 0 {
println!("Rate is too low, aborting fetch");
result = Poll::Ready(Err(std::io::Error::new(
ErrorKind::TimedOut,
StalledError {},
)))
}
*this.interval_bytes = 0;
}
};
result
}
}