Hyper requests timing out when host is reachable

I'm trying to perform multiple hyper requests that have a timeout associated with them. The problem that I'm having is that after a certain point a lot of the futures are timing out when the hosts are actually reachable. Each request is to a different domain and is streamed in and once an element in the stream is ready, the request is spawned on to its own thread and processed. This future contains a hyper request that is attached to a Tokio timeout and if it times out then the request is deemed to be a failure. For the URLs that are streamed in the vast majority of them do exist and only a certain number are unreachable (they will timeout).

URLs are streamed in as:

        let fetches = records_rx
            .take(1000)
            .map(|data| {
                tokio::spawn(Handler::request(
                    data.into(),
                    tx.clone(),
                    rq_tx.clone(),
                ))
            })
            .buffer_unordered(100)
            .collect::<Vec<_>>();
        fetches.await;

Received as:

...
        while let Some(req) = rx.recv().await {
            let fut = RequestHandler::do_request(req, client.clone(), request_timeout);
            let _ = tokio::spawn(fut);
        }
...

And handled as:

    async fn do_request(
        request: Request,
        client: Client<HttpsConnector<HttpConnector>>,
        request_timeout: Duration,
    ) {
        trace!("Performing request to: {:?}", request.uri);

        let req = client.get(request.uri.clone());
        let result = match tokio::time::timeout(request_timeout, req).await {
            Ok(r) => r.map_err(|e| RequestError::Misc {
                err: format!(
                    "Request failed: for URI {:?}. Err: {:?}",
                    request.uri, e
                ),
            }),
            Err(_) => Err(RequestError::TimedOut),
        };

        if let Err(e) = request.tx.send(result) {
            panic!("Failed to send response to caller. Err: {:?}", e);
        } else {
            return;
        }
    }

The duration is set to 30 seconds and the number of domains is ~10,000. I can't quite figure out why this is happening. The only idea that I've had is that the actual hyper request isn't being spawned by the time the timeout occurs and then it will fail. Any help would be appreciated!

Example sites that are timing out are sites such as Amazon and CostCo and will work fine when run from a browser even while the application is running.

records_rx.take(1000)

That will take the first 1000 records and ignore any after that. You say you’ve got 10,000 urls. Could that be the issue?

I probably should have amended that for this question as I was trying it with less records. Unfortunately, that isn't the problem as it still exists when run with 1000 records. I've also tried adding in a user agent string to the request headers (as some fail without it) but the problem still exists. The websites in question work perfectly fine as a single hyper request executed in the same fashion.

I've managed to fix some of the failing requests with the appropriate headers. However, some of the GET requests are still failing. A simple get request to https://www.go.com/ fails and doing the same request over curl as

curl -H "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.89 Safari/537.36" https://go.com/ -v --verbose
*   Trying 23.236.60.174...
* TCP_NODELAY set

just hangs. So, hopefully, once I've resolved that this issue will also be resolved.

go.com doesn't have https. It only works with http

Ah, I must have got too caught up in what I was doing. I assumed that I'd get a more informative error. Stupid of me! A follow up question, how can I check whether or not to perform a HTTP or HTTPS request to some given URL then? Just try both?

You could make a future that tries http first and then tries https on redirect or error. Most web servers will upgrade http to https via a redirect which is why I would try http first.

I'll give that a go, thanks! Silly mistake originally!

I've managed to resolve the host resolution issues that I was having. But I'm still unable to sort why requests are timing out when a large number are made in one go? I've looked at the hyper documentation for the maximum number of active connections and it defaults to the maximum.

Does it work if you keep the number of simultaneous queries low for the same number of overall queries?

Maybe the server you are connecting to only allows so many requests from each ip?

If I run this part:

 let fetches = records_rx
            .take(1000)
            .map(|data| {
                tokio::spawn(Handler::request(
                    data.into(),
                    tx.clone(),
                    rq_tx.clone(),
                ))
            })
            .buffer_unordered(100)
            .collect::<Vec<_>>();
        fetches.await;

With a buffer_unordered value of <500 then it is fine. But say I use 1000, then the requests mainly time out. But I can still access the websites from a browser absolutely fine while the application is running.

What happens if you make a new hyper Client for every 100 requests?

Each request is to a separate website. It seems to be something with the way that I'm using either Tokio or Hyper. I wondered if it was how I was using tokio::timeout but the timeout doesn't actually start until the future is started so it isn't anything to do with the future being started too late.

I have wondered about how I'm doing the threading in this - as one of the reasons I'm doing this is to become more comfortable with it. At the moment, the website list is generated by parsing a large CSV file and once a record is parsed then it is added to a stream and the code in my previous reply is used to stream it to a website processor where this is executed:

        while let Some(req) = rx.recv().await {
            let fut = HostResolver::do_request(req, client.clone(), request_timeout);
            let _ = tokio::spawn(fut);
        }

So this allows each request to run on its own and not block any other request. Is this the most appropriate way of doing this?

Regarding your client comment. At the moment I create a single client at the start of the application and then clone it for each request rather than creating a new one for each one. I have wondered if this is part of the issue. I'll have a look at creating a new one every X requests. Do you think a new one every so often would be more suitable?

The reason you might want to make a new Client is if the Client has some sort of shared resource with an upper limit, and the timeout is due to this upper limit. Then maybe making a new Client would not hit that limit. One thing that they do share is the keep-alive connection to the server, but that shouldn't matter when there is a new domain per connection — maybe you can turn keep alive off since you don't need it anyway?

Here's another option: When hyper does dns requests, I'm pretty sure it does them inside spawn_blocking, which by default has a limit of 512 - num_cpus total tasks at the same time. Perhaps try increasing that limit?

There should be no issue with timeouts — I have previously started millions of timeouts at the same time for testing, and that worked fine.

Thanks for the advice! I did also look at the keep-alive value for the client and I have disabled it.

How exactly do I increase the limit? I have tried:

        let rt = runtime::Builder::new()
            .threaded_scheduler()
            .num_threads(1000)
            .build()
            .unwrap();

But I'm not quite sure what to do with it afterwards? How does one set the default runtime for a Tokio application?

Note that num_threads is deprecated, and you want max_threads. This should be fine:

let rt = runtime::Builder::new()
    .threaded_scheduler()
    .max_threads(1000)
    .build()
    .unwrap();

There is no default Tokio runtime. If you are starting your application with #[tokio::main], you can also use

#[tokio::main(max_threads = 1000)]
async fn main() {
    ...

The runtime that tokio::spawn uses is whatever runtime it is inside, either through a block_on call (which is what #[tokio::main] inserts for you), or through a spawn on that runtime.

1 Like

Ah, I see, thank you. What sort of scenarios would you need to build a new runtime exactly? For limiting certain areas to their own executor?

Unfortunately, upping the max_threads hasn't solved it. I'll have a look at creating new client instances every X requests.

Typically you manually make a runtime in these scenarious:

  1. You want your main to be synchronous, so you can't use #[tokio::main].
  2. You want multiple runtimes, e.g. linkerd currently uses a pair of single-threaded runtimes, one for data-plane forwarding work and the other for control-plane tasks (serving metrics, driving service discovery lookups, and so on). This helps keep the two workloads isolated so neither can starve the other.
  3. You are the author of a crate with a module like reqwest::blocking, which spawns a single-threaded runtime to perform any requests in a blocking manner.
  4. You want to change a setting that can't be set directly on #[tokio::main]

The first being the most common.

1 Like

Another thing to check: Has your process hit the OS imposed limit on the number of connections it can open?