Need some help debugging tokio/mio/hyper code (deadlock)

I have a block of code that makes bunch of HTTP requests to the server in chunks of 5 (concurrency == 5), which looks like the following:

  let mut core = tokio_core::reactor::Core::new().unwrap();
  let handle = core.handle();
  let items = ... // long list of Hyper requests
  let client = Client::new(&handle);
  let requests = stream::iter_result(items)
    .chunks(concurrency)
    .for_each(|reqs| {
      info!("Running {} tasks ...", concurrency);
      let vec = reqs
        .into_iter()
        .map(|(path, request)| process_request(&client, &path, request))
        .collect::<Vec<_>>();
      future::join_all(vec).map(|_results| ())
    });

  core.run(requests).unwrap();

The problem I am observing is that after a while the process gets stuck and simply does nothing. Even stopping the server does not unblock the client (i.e, it seem not to care). I connected to the client process via lldb and the stack trace looks like the following:

* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  * frame #0: 0x0000000106b7bd96 libsystem_kernel.dylib`kevent + 10
    frame #1: 0x0000000104f94c03 piston`mio::sys::unix::kqueue::Selector::select::h5908d164b552abc8(self=0x0000000106e33020, evts=0x00007fff5bca9f48, awakener=(__0 = 18446744073709551615), timeout=<unavailable>) at kqueue.rs:84
    frame #2: 0x0000000104f8e2fa piston`mio::poll::Poll::poll2::h47cfb2893a485481(self=0x0000000106e33020, events=0x00007fff5bca9f48, timeout=Option<std::time::duration::Duration> @ 0x00007fff5bca8ce8) at poll.rs:1161
    frame #3: 0x0000000104f8e026 piston`mio::poll::Poll::poll::h1c5a4889decc5479(self=0x0000000106e33020, events=0x00007fff5bca9f48, timeout=Option<std::time::duration::Duration> @ 0x00007fff5bca91a0) at poll.rs:1125
    frame #4: 0x0000000104f6a90b piston`tokio_core::reactor::Core::poll::h355d296d2c61bccc(self=0x00007fff5bca9f48, max_wait=Option<std::time::duration::Duration> @ 0x00007fff5bca9e18) at mod.rs:276
    frame #5: 0x0000000104354499 piston`tokio_core::reactor::Core::run::h3e432d0bd23cd991(self=0x00007fff5bca9f48, f=<unavailable>) at mod.rs:241
    frame #6: 0x000000010481faad piston`piston::submit::execute::h35469783a8e181fc(matches=0x0000000106e7c018) at submit.rs:231

(the last line is the line calling core.run())

So it looks to me that it peacefully waits for the next event, which never comes for whatever reason.

The log file ends with the following:

 TRACE tokio_core::reactor        > event Readable Token(9)
 TRACE hyper::proto::h1::dispatch > Dispatcher::poll
 TRACE hyper::proto::h1::conn     > Conn::read_keep_alive
 DEBUG hyper::proto::h1::io       > read 0 bytes
 TRACE hyper::proto::h1::conn     > try_empty_read; found EOF on connection: State { reading: Init, writing: Init, keep_alive: Idle, error: None, read_task: Some(Task) }
 TRACE hyper::proto::h1::conn     > State::close_read()
 TRACE hyper::proto::h1::conn     > maybe_notify; no task to notify
 TRACE hyper::proto::h1::conn     > flushed State { reading: Closed, writing: Init, keep_alive: Disabled, error: None, read_task: None }
 TRACE hyper::proto::h1::conn     > shut down IO
 TRACE hyper::proto::h1::dispatch > Dispatch::poll done
 DEBUG tokio_core::reactor        > loop process - 1 events, Duration { secs: 0, nanos: 333734 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 6936 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044919947936 }
 TRACE tokio_core::reactor        > event Readable Token(0)
 DEBUG tokio_core::reactor        > consuming notification queue
 DEBUG tokio_core::reactor        > dropping I/O source: 2
 DEBUG tokio_core::reactor        > loop process - 1 events, Duration { secs: 0, nanos: 150870 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 7879 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044920136964 }
 DEBUG tokio_core::reactor        > loop process - 0 events, Duration { secs: 0, nanos: 68928 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 66135666 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044986364766 }
 TRACE tokio_core::reactor        > event Readable | Hup Token(12)
 DEBUG tokio_core::reactor        > notifying a task handle
 DEBUG tokio_core::reactor        > loop process - 1 events, Duration { secs: 0, nanos: 297320 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 27044 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044986724160 }
 TRACE tokio_core::reactor        > event Readable Token(3)
 TRACE hyper::proto::h1::dispatch > Dispatcher::poll
 TRACE hyper::proto::h1::conn     > Conn::read_keep_alive
 DEBUG hyper::proto::h1::io       > read 0 bytes
 TRACE hyper::proto::h1::conn     > try_empty_read; found EOF on connection: State { reading: Init, writing: Init, keep_alive: Idle, error: None, read_task: Some(Task) }
 TRACE hyper::proto::h1::conn     > State::close_read()
 TRACE hyper::proto::h1::conn     > maybe_notify; no task to notify
 TRACE hyper::proto::h1::conn     > flushed State { reading: Closed, writing: Init, keep_alive: Disabled, error: None, read_task: None }
 TRACE hyper::proto::h1::conn     > shut down IO
 TRACE hyper::proto::h1::dispatch > Dispatch::poll done
 DEBUG tokio_core::reactor        > loop process - 1 events, Duration { secs: 0, nanos: 477553 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 10227 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044987273115 }
 TRACE tokio_core::reactor        > event Readable Token(0)
 DEBUG tokio_core::reactor        > consuming notification queue
 DEBUG tokio_core::reactor        > dropping I/O source: 5
 DEBUG tokio_core::reactor        > loop process - 1 events, Duration { secs: 0, nanos: 148844 }
 DEBUG tokio_core::reactor        > loop poll - Duration { secs: 0, nanos: 6562 }
 DEBUG tokio_core::reactor        > loop time - Instant { t: 376044987455246 }
 DEBUG tokio_core::reactor        > loop process - 0 events, Duration { secs: 0, nanos: 68577 }

Any ideas what could be wrong here?

Ok, so the intermediate results are the following:

Just before the client is stuck, there is one token that gets event like "event Readable | Writable | Error | Hup Token(4)" and doesn't get anything after that -- and it's the one that I not see in "dropping I/O source: x". Other 4 sources are "dropped" (I process in chunks of 5).

P.S. Seems like events of interest are:

// Got HUP from the other side? Seems like when this event is processed, Hyper does not "rearm" reading task?
TRACE tokio_core::reactor               > event Readable | Hup Token(2)
DEBUG tokio_core::reactor               > TKIO: is readable writer.is_none() == true, reader.is_none() == false
// Client writes some data to the connection (token 0 is the same as Token(2) above, as (2-2)/2 = 0
DEBUG tokio_core::net::tcp              > DBG: WRITE BUFS: (token 0) Ok(63817) <-- this one I added to AsyncWrite.write_buf
DEBUG hyper::proto::h1::io              > flushed 63817 bytes
// Finally, this is where error is returned from mio (error code is 54, ECONNRESET)
TRACE tokio_core::reactor               > event Writable Token(6) <-- this is normal token and it has reader task
DEBUG tokio_core::reactor               > TKIO: is writable writer.is_none() == true, reader.is_none() == false
TRACE tokio_core::reactor               > event Readable | Writable | Error | Hup Token(2) <-- this is the failing token (same as "token 0" above, as (2-2)/2 == 0)
DEBUG tokio_core::reactor               > TKIO: is writable writer.is_none() == true, reader.is_none() == true <-- no reader task to notify

What is the content of process_request. To me one of the futures it returns never resolves.

I tracked the issue down to Hyper's keep-alive: https://github.com/hyperium/hyper/issues/1439

It looks like bug in Hyper (disabling keep-alive for connection seem to fix the issue). I think, it loses event sometimes.

P.S. This is my process_request:

fn process_request(
  client: &Client<HttpConnector>,
  path: &Path,
  request: Request,
) -> Box<Future<Item = (), Error = Error>> {
  info!(
    "Submitting resource at '{}' loaded from '{}'...",
    request.uri(),
    path.display()
  );
  trace!("Request: {:?}", request);
  let f = client
    .request(request)
    .and_then(|resp| {
      debug!("Response status: {}", resp.status());
      debug!("Headers: {}", resp.headers());
      resp
        .body()
        .concat2()
        .map(|chunk| debug!("{}", String::from_utf8_lossy(&chunk)))
    })
    .map(|_| info!("...done."))
    .map_err(Error::from);
  Box::new(f)
}
1 Like