Need help finding thread starvation source

I have a service that experiences severe thread starvation, but can't seem to point a finger on the reason for that, following are the details.

The service consists of the following parts, mainly:

  1. tokio runtime without specified flavour, but I guess it's single-threaded because of k8s setup (setting higher worker thread count manually did not remove all starvation, but I didn't check if it improved it)
  2. Running in k8s with less than 1 CPU limits, but CPU consumption is usually less than one percent of requests or limits
  3. Several rdkafka consumers for separate brokers
  4. Single flume mpmc channel that was supposed to allow for parallel processing
  5. Single consumer from that channel, for some messages this consumer makes network requests with reqwest
  6. Several rdkafka producers that publish all of the results from (5)

What I tried:

  1. Added measurements for all the futures in my code, it shows that many of them take longer than 0.1ms to complete, but practically none show more than 3ms
  2. Measured poll delays in the same wrapper, those show that futures regularly take more than 1 second, sometimes more than 10 and upwards to more than 1000 in some cases
  3. Used tokio-console to try to find very busy tasks, but the most busy ones had 30s Busy vs 2h Idle, and were axum serving Prometheus metrics.
  4. tokio-console pointed to blocking DNS resolution that takes about 200ms half of the time, but afaik reqwest does that resolution in a separate thread not to block everything

Right now I'm trying to log when every future wrapper finishes poll and connect it to the moment other futures are woken at the same moment, but this feels like it's a bit wrong direction, since full traces from tokio show very little wake ups during the moments when many futures wait for waking, it feels like that should be some task that have a massive poll duration but there are none.

An example would be when at a time T1 several future wrappers report that they were not polled for 3 seconds, but TRACE logs for previous 3 seconds only show several wake ups for tasks with long numerical IDs (I also can't connect those to my tasks, because the latter seem to have short IDs in two digits, not in seven or more). I can try to provide cleaned logs if needed, but right now I'm in need of advice on how to proceed.

Now I also think that long wait for being polled might be ok for IO operations, but the main symptom that started all of my investigation was that network interaction sometimes behaves like that after I wrapped networking in tokio::time::timeout:

  1. Connection is created, await point is reached
  2. Timeout of 30 seconds is reached
  3. Only then a poll of connection happens, some data is transmitted and received
  4. Body is about to be read from the result, but that's another await and by this time we're past timeout
  5. We get timeout error, although we just started actual network request

There also was a suspicious threshold of 3600 seconds, at which point network interactions without timeout wrapper timed out without legible result, I suspect that's some internal max poll wait timeout or something?

Have you considered increasing to at least 1 full CPU? O.O
Async can be bursty, and can exhaust your CPU quota, causing throttling that won't show in CPU averages.

Are you talking about something consistent or sporadic?
One thing to check that you don't mention is having (in)sufficient memory.

I will try 1 CPU, but considering the load is around 0.01 CPU it seems unlikely to help.

The problem is consistent, it shows at least one poll that misses a second long wait every minute. The memory it uses is around 10MB out of 300MB requests

with 1 CPU the problem persists in the same way, from the very first minute of app working

1 Like

definetly sounds like you have some blocking tasks that are locking the whole executor, try to double check that you are not calling blocking functions or holding mutex guards across awaits

Yeah, I checked that already, there are three mutexes, all three are sync, I've logged their access and it shows no issues, they are locked for a very small amount of time and they can't be held across awaits.

Besides, tokio timeout timers are always practically on time, they miss the mark by less than a second, so I assume that it's not a single blocking task that hogs everything, otherwise timeouts would never be able to wake on time if they don't align with a blocking task end by chance (I think, at least)

1 Like

To be honest, I can't really think of anything else apart from separating the reqwest/http I/O into a different runtime from main. It shouldn't be that complex, probably something like:

    // Main runtime
    let main_rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)
        .thread_name("main-rt")
        .enable_all()
        .build()
        .expect("Failed to create main runtime");

    // Dedicated HTTP runtime
    let http_rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(2)
        .thread_name("http-rt")
        .enable_all()
        .build()
        .expect("Failed to create HTTP runtime");

If a potential conflict between rdkafka and reqwest is not the issue, then good luck finding it out :'D

1 Like

Did this actually work? O.O

Don't know yet, didn't get to try it yet, but separating operations onto several runtimes in trying to isolate the actual reason looks like something I can do to get a better understanding of what's happening there

1 Like

let us know if you finally figure out the root cause, probably will be good learning opportunity here :'D

Yeah, I also wanted it to become a learning experience, but now I have mixed feelings about it XD

While trying to separate parts onto different executors (it turned out to require more effort than I thought, because lifetime requirements somehow changed and a lot of things broke at once) I spotted that we used futures_util::future::join_all in two places, one of those places did network call in one of concurrent futures. futures_util::future::join_all internally creates FuturesUnordered and now I remembered that I read about issues with that a few months earlier: it behaves quirky (I can't exactly explain how and why exactly) and when there is significant wall time in one of the futures, it will make everyone wait. There are some details here:

So, I migrated futures_util::future::join_all to tokio::task::JoinSet but it didn't help much. The upside is that after that I was able to see a large group of tasks that constituted one of the JoinSets being busy for 100ms with total of 100ms, so I moved all of them to a separate runtime and it did help to reduce the amount of warnings about polling that happens too rarely.

The tasks in question are the ones doing the publishing to the Kafka and they have an internal queue in order not to wait until the publishing finishes, so I'm still a bit uncertain how to proceed.

Right now I am not sure if wake ups can normally be sparse, as no tasks in tokio-console show long Scheduled time, I take it this means that now tasks wake when they are scheduled to wake, but this sometimes happens to not be often. I am also not sure why Kafka publication is slowing everything down, but for that I might want to check Kafka producer metrics.

Upd: the issue is not completely gone, and groups of publication tasks with long busy time still sometimes appear but it seems to have much less influence now in a separate executor.

2 Likes

I saw that rdkafka has a ThreadedProducer handles polling on its own dedicated thread. Maybe completely decoupling it from tokio runtime may help? Great detective work any way :'D

1 Like

I don't think I see how to decouple it further for now, it's already in a separate thread, and it's wrapped to have a non-blocking channel between producing code and the actual producer.

What I see now is that future that I moved to another runtime often takes 100ms, while producing to Kafka does not even take 50ms of wall time, so I'm maybe looking at a wrong place. At least it's tens of lines, not tens of thousands now.

1 Like

Can't find the original question to give credit Edit: I adapted the struct from this discussion to measure future's poll time:


const THRESHOLD_NANOS: u128 = 1_000_000;

/// Measures the future time of execution and logs a warning if it exceeds the threshold.
#[pin_project(PinnedDrop)]
pub struct Measured<F> {
    #[pin]
    fut: Option<F>,
    label: String,
    threshold: u128,
    count: AtomicUsize,
    last_poll: AtomicUsize,
    created: Instant,
    force_log: bool,
    total_poll: AtomicUsize,
}

impl<F> Debug for Measured<F> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Measured")
            .field("label", &self.label)
            .field("created_ms_ago", &self.created.elapsed().as_millis())
            .field("threshold", &self.threshold)
            .field("count", &self.count.load(Ordering::SeqCst))
            .field("last_yield", &self.last_poll.load(Ordering::SeqCst))
            .finish()
    }
}

#[pinned_drop]
impl<F> PinnedDrop for Measured<F> {
    fn drop(self: Pin<&mut Self>) {
        let this = self.project();
        if this.fut.is_none() {
            return;
        }
        // Access unpinned fields through the projection - no unsafe needed!
        let elapsed = this.created.elapsed();
        let count = this.count.load(Ordering::SeqCst);
        let total_poll = this.total_poll.load(Ordering::SeqCst);
        if *this.force_log {
            info!(?elapsed, label=%this.label, poll_ms=%total_poll as f64/1_000_000.0, %count, "Future dropped!");
            return;
        }
        if elapsed > Duration::from_secs(1) {
            warn!(?elapsed, label=%this.label, "Long lived future dropped!");
        }
        if count == 0 && elapsed.as_millis() > 50 {
            warn!(?elapsed, label=%this.label, "Long lived future dropped without polling");
        }
    }
}


impl<F> Measured<F> {
    /// Meter passed future's polls labelling with a given label
    pub fn wrap(fut: F, label: impl ToString) -> Self {
        Self::wrap_t(fut, label, THRESHOLD_NANOS)
    }

    /// Unconditionally log dropped futures
    pub fn with_drop_log(mut self) -> Self {
        let elapsed = self.created.elapsed();
        info!(?elapsed, label=%self.label, "Future created");
        self.force_log = true;
        self
    }

    /// Convert the wrapped future into an ordinary future
    pub fn into_inner(mut self) -> F {
        std::mem::take(&mut self.fut).unwrap()
    }

    /// Meter a passed future with a non-standard threshold in nanoseconds
    pub fn wrap_t(fut: F, label: impl ToString, threshold: u128) -> Self {
        let millis = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as usize)
            .unwrap_or_default();

        Self {
            fut: Some(fut),
            threshold,
            created: Instant::now(),
            label: label.to_string(),
            count: AtomicUsize::new(0),
            last_poll: AtomicUsize::new(millis),
            force_log: false,
            total_poll: AtomicUsize::new(0),
        }
    }
}

impl<F: Future> Future for Measured<F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, ctx: &mut TaskContext<'_>) -> Poll<F::Output> {
        let this = self.project();
        let force_log = *this.force_log;

        if force_log {
            info!(label=%this.label, "Future poll started");
        }

        let count = this.count.fetch_add(1, Ordering::SeqCst) + 1;
        let label = this.label.clone();
        let last_poll = this.last_poll.load(Ordering::SeqCst);
        let millis = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as usize);
        if last_poll != 0 {
            if let Ok(millis) = millis {
                if count == 0 {
                    if millis - last_poll > 10 {
                        warn!(elapsed=%(millis - last_poll), %label, "Create to poll too long");
                    }
                } else if millis - last_poll > 1000 {
                    warn!(elapsed=%(millis - last_poll), %label, %count, "No polling for too long");
                }
            }
        }
        if let Ok(millis) = millis {
            this.last_poll.store(millis, Ordering::SeqCst);
        }
        let t = *this.threshold;
        let start = Instant::now();

        let fut = this.fut.as_pin_mut().unwrap();
        let res = fut.poll(ctx);
        let measured = start.elapsed();
        
        this.total_poll.fetch_add(measured.as_nanos() as usize, Ordering::SeqCst);

        if force_log {
            info!(sec=%measured.as_secs_f64(), %label, %count, "Future polled");
        } else if measured.as_nanos() > t {
            warn!(sec=%measured.as_secs_f64(), %label, %count, "future took too long!");
        }

        res
    }
}

And then the problematic future is started like this:

let jobs = producer_events
    .into_iter()
    .map(|tuple| {
        // unique ID to compare poll timer with progress logs
        let uuid = uuid::Uuid::new_v4();
        let (tx, rx) = tokio::sync::oneshot::channel();
        // cloning of local variables and fields
        let future = Measured::wrap(
            async move {
                let start = Instant::now();
                for message in messages {
                    let result = Measured::wrap(
                        tryhard::retry_fn(
                            Measured::wrap(async {
                                info!(elapsed=?start.elapsed(), %uuid, "send_event start");
                                let result = producer.send(&message).await;
                                info!(elapsed=?start.elapsed(), %uuid, "send_event done");
                                result
                            }, "producer_send"))
                            .retries(3)
                            .fixed_backoff(Duration::from_secs(5))
                            .on_retry(/* this doesn't trigger in reality */)
                        ),
                        "publish_with_retry",
                    )
                    .await;

                    if let Err(err) = result {
                        info!(elapsed=?start.elapsed(), %uuid, "send_event transmit");
                        tx.send(Err(err)).unwrap();
                        info!(elapsed=?start.elapsed(), %uuid, "send_event error fin");
                        return;
                    }
                }
                info!(elapsed=?start.elapsed(), %uuid, "send_event transmit");
                tx.send(Ok(())).unwrap();
                info!(elapsed=?start.elapsed(), %uuid, %messages, "send_event fin");
            },
            format!("send_event {uuid}"),
        )
        .with_drop_log()
        .boxed();
        (future, rx)
    })
    .fold(JoinSet::new(), |mut set, (job, result)| {
        // This is a separate runtime I was advised on trying
        self.tokio_handle.spawn(job);
        set.spawn(async { result.await.unwrap() });
        set
    })
    .join_all();

Measured::wrap(jobs, "publish_all")
    .await
    .into_iter()
    .collect::<anyhow::Result<()>>()?;

And the logs I get show send_event fin in the log about 100ms before the Future polled log, e.g.:

2025-12-19T13:48:50.239096Z INFO Future created elapsed=250ns label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
2025-12-19T13:48:50.438939Z  INFO Future poll started label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
2025-12-19T13:48:50.438946Z INFO send_event start elapsed=2.55µs uuid=e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
...
2025-12-19T13:48:50.439075Z  INFO send_event fin elapsed=131.721µs uuid=e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 messages=1
2025-12-19T13:48:50.539116Z  INFO Future polled sec=0.100171739 label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 count=1
2025-12-19T13:48:50.539134Z  INFO Future dropped! elapsed=300.038387ms label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 poll_ms=100.171739 count=1

So this specific future spent 200ms until first poll, then the inner future completed in 131µs, but then was busy for another 100ms. Maybe there's something that was being dropped and it took too long, I'll try to check if that's the case.

I'm now thinking of reorganising the code to spawn futures right away, not boxing them, but boxing happens after Measured is constructed, so it shouldn't affect polling the inner future.

The reason for a slow drop was that I used a Kafka producer wrapper that was Clone and Drop, and on drop it flushed the inner rdkafka producer. I think it should've been done only when there are no other wrappers left, and we were really going to drop the underlying producer.

I think there really is a need for tools that allow to find slow sync parts of the program without the need for wrapping everything in measurements, or at least a tool that shows that such parts exist in one's code. tokio-console already has part of that information but it doesn't (and maybe can't) show specific slow parts, impl Drop in this case, and it doesn't show history (or I don't know about how to do that)

Upd3: But the last issue I fixed was not affecting the original code because slow Drop implementation never fired. futures_util::future::join_all seems to have been affecting the execution, after all.

Upd2: Looks like tokio-detectors could've helped, maybe I'll try plugging that in later. I found it accidentally when searching for an unrelated info: Tokio's spawn tasks and join handles - #11 by alice

Upd1: This is what I used to measure drop duration:


/// Logs a line when being dropped and measures the time it takes to drop the inner value.
#[derive(Debug, Clone)]
pub struct DropLogger<T> {
    inner: Option<T>,
    uuid: Uuid,
    id: &'static str,
}

impl<T> DropLogger<T> {
    /// Creates a new instance of [`DropLogger`] with the given inner value and UUID.
    pub fn new(inner: T, id: &'static str, uuid: Uuid) -> Self {
        Self {
            inner: Some(inner),
            uuid,
            id
        }
    }
}

impl<T> Deref for DropLogger<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.inner.as_ref().unwrap()
    }
}

impl<T> Drop for DropLogger<T> {
    fn drop(&mut self) {
        let start = Instant::now();
        drop(std::mem::take(&mut self.inner));
        let elapsed = start.elapsed();
        if elapsed > Duration::from_millis(1) {
            warn!(uuid=%self.uuid, id=%self.id, ?elapsed, "Drop impl takes too long");
        }
    }
}
2 Likes