Help: Reqwest .bytes() hangs when switching WiFi network on macOS / iOS [title updated]

I have constructed a proof of concept where tokio::io::copy will hang forever when switching between Cellular / WiFi networks if the reader is a reqwest::async_impl::Response wrapped in a tokio_io::AsyncRead using FuturesAsyncReadCompatExt.

The full code is here GitHub - bes/network-switch-hang: reqwest / tokio network switch hang

Here is the offending function:

async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to get progress in terminal, this is not
    // essential to the PoC but helps diagnose it.
    let mut download = ProgressReadAdapter::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code hangs here forever after a network switch
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

Is there a way to make tokio::io::copy return an error instead of hanging forever?

Thanks.

This is a general hazard of using TCP; by default there is no detection of the difference between “no data received” and “sender is unreachable or crashed without sending any packets”.

This can be changed by setting the socket option SO_KEEPALIVE. reqwest has a method for this: ClientBuilder::tcp_keepalive().

2 Likes

Thank you, but unfortunately setting tcp_keepalive didn't help.

    let client = ClientBuilder::default()
        .tcp_keepalive(Some(Duration::from_secs(1)))
        .build()
        .unwrap();

It still just keeps hanging forever.

A connect timeout might work

Thank you, I tried that as well, and it did not work :C

Current state:

    let client = ClientBuilder::default()
        .tcp_keepalive(Some(Duration::from_secs(1)))
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();

(not working = still hangs forever when switching networks)

I managed to make the proof-of-concept smaller by experimenting a bit-

#[tokio::main()]
async fn main() {
    let client = ClientBuilder::default()
        // Doesn't seem to help
        .tcp_keepalive(Some(Duration::from_secs(1)))
        // Doesn't seem to help
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();

    // Hangs here as soon as switching WiFi network, and
    // then resumes when switching back.
    // Confirm using a network monitor, e.b. bmon
    let _bytes = response.bytes().await.unwrap();
}

It turns out tokio::io::copy isn't necessary to reproduce. It's also not reproducible when having a wired network and disabling the network card (with WiFi-fallback) then it errors out just fine.

I don't have a Linux laptop so I can't try to reproduce it there unfortunately.

I can easily reproduce on macOS & iOS though.

I have no idea what macOS does when switching between WiFi and LTE, but if the traffic on the existing connection is blackholed instead of generating an error indication on the socket level, the switch may be difficult to detect. Connection timeout is irrelevant here, coming into play only when the connection is being established. That leaves operation-level timeouts, which makes it necessary to switch the download to streaming mode, since it's impossible to have a one-size-fits-all timeout for all payloads.

To use streaming in reqwest, you need the stream feature and the StreamExt trait from futures-util, as well as the time feature in Tokio if you're not using the full feature set. The code might look like this:

use std::time::Duration;
use futures_util::StreamExt;
use tokio::time;
...
let mut stream = response.bytes_stream();
while let Ok(Some(chunk)) = time::timeout(Duration::from_secs(5), stream.next()).await {
    let _chunk = chunk.unwrap();
}

(The set-up for response can be the same as in the existing code.) This reliably breaks from the while let loop when I start dropping the traffic from the server with packet filtering. This is on Linux; try it on macOS and see.

Thank you @inejge, but I no longer think this is something that is easy to resolve from inside my application by using reqwest.

I am using a variant of what you suggest already, as you can see in my original post, through the tokio_util::compat::FuturesAsyncReadCompatExt trait and then tokio::io::copy.

I found this thread for libcurl libcurl gets stuck after a network interruption · Issue #8345 · curl/curl · GitHub which suggests to me that the network stack (in my case on macOS / iOS) is to blame. There are no combinations of parameters that can be used to detect this from a TCP / reqwest point-of-view.

The suggestion in libcurl is to use CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME, which would help me but those aren't available in reqwest.

Maybe I can do a feature request, but seeing as this is possible to implement at the application level, I don't know if reqwest would want this.

This is the solution I reached, I wrap AsyncRead with another AsyncRead that detects if a Response is stalled, and stops the download by raising an error if that happens.

/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn't making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won't stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the averge will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor<R: AsyncRead> {
    #[pin]
    inner: R,
    interval: Interval,
    interval_bytes: usize,
}

impl<R: AsyncRead> StalledReadMonitor<R> {
    pub fn new(inner: R) -> Self {
        Self {
            inner,
            interval: interval_at(
                Instant::now().add(Duration::from_millis(5_000)),
                Duration::from_millis(5_000),
            ),
            interval_bytes: 0,
        }
    }
}

impl<R: AsyncRead> AsyncRead for StalledReadMonitor<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        let this = self.project();

        let before = buf.filled().len();
        let mut result = this.inner.poll_read(cx, buf);
        let after = buf.filled().len();

        *this.interval_bytes += after - before;
        match this.interval.poll_tick(cx) {
            Poll::Pending => {}
            Poll::Ready(_) => {
                if *this.interval_bytes == 0 {
                    println!("Rate is too low, aborting fetch");
                    result = Poll::Ready(Err(std::io::Error::new(
                        ErrorKind::TimedOut,
                        StalledError {},
                    )))
                }
                *this.interval_bytes = 0;
            }
        };
        result
    }
}

StackOverflow answer: rust - Why does a reqwest Response hang when switching WiFi networks on macOS / iOS? - Stack Overflow

PoC + solution: GitHub - bes/network-switch-hang: reqwest / tokio network switch hang

I can't tell you it's the best solution, but it does work for me.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.