Can't find the original question to give credit Edit: I adapted the struct from this discussion to measure future's poll time:
const THRESHOLD_NANOS: u128 = 1_000_000;
/// Measures the future time of execution and logs a warning if it exceeds the threshold.
#[pin_project(PinnedDrop)]
pub struct Measured<F> {
#[pin]
fut: Option<F>,
label: String,
threshold: u128,
count: AtomicUsize,
last_poll: AtomicUsize,
created: Instant,
force_log: bool,
total_poll: AtomicUsize,
}
impl<F> Debug for Measured<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Measured")
.field("label", &self.label)
.field("created_ms_ago", &self.created.elapsed().as_millis())
.field("threshold", &self.threshold)
.field("count", &self.count.load(Ordering::SeqCst))
.field("last_yield", &self.last_poll.load(Ordering::SeqCst))
.finish()
}
}
#[pinned_drop]
impl<F> PinnedDrop for Measured<F> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if this.fut.is_none() {
return;
}
// Access unpinned fields through the projection - no unsafe needed!
let elapsed = this.created.elapsed();
let count = this.count.load(Ordering::SeqCst);
let total_poll = this.total_poll.load(Ordering::SeqCst);
if *this.force_log {
info!(?elapsed, label=%this.label, poll_ms=%total_poll as f64/1_000_000.0, %count, "Future dropped!");
return;
}
if elapsed > Duration::from_secs(1) {
warn!(?elapsed, label=%this.label, "Long lived future dropped!");
}
if count == 0 && elapsed.as_millis() > 50 {
warn!(?elapsed, label=%this.label, "Long lived future dropped without polling");
}
}
}
impl<F> Measured<F> {
/// Meter passed future's polls labelling with a given label
pub fn wrap(fut: F, label: impl ToString) -> Self {
Self::wrap_t(fut, label, THRESHOLD_NANOS)
}
/// Unconditionally log dropped futures
pub fn with_drop_log(mut self) -> Self {
let elapsed = self.created.elapsed();
info!(?elapsed, label=%self.label, "Future created");
self.force_log = true;
self
}
/// Convert the wrapped future into an ordinary future
pub fn into_inner(mut self) -> F {
std::mem::take(&mut self.fut).unwrap()
}
/// Meter a passed future with a non-standard threshold in nanoseconds
pub fn wrap_t(fut: F, label: impl ToString, threshold: u128) -> Self {
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as usize)
.unwrap_or_default();
Self {
fut: Some(fut),
threshold,
created: Instant::now(),
label: label.to_string(),
count: AtomicUsize::new(0),
last_poll: AtomicUsize::new(millis),
force_log: false,
total_poll: AtomicUsize::new(0),
}
}
}
impl<F: Future> Future for Measured<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, ctx: &mut TaskContext<'_>) -> Poll<F::Output> {
let this = self.project();
let force_log = *this.force_log;
if force_log {
info!(label=%this.label, "Future poll started");
}
let count = this.count.fetch_add(1, Ordering::SeqCst) + 1;
let label = this.label.clone();
let last_poll = this.last_poll.load(Ordering::SeqCst);
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as usize);
if last_poll != 0 {
if let Ok(millis) = millis {
if count == 0 {
if millis - last_poll > 10 {
warn!(elapsed=%(millis - last_poll), %label, "Create to poll too long");
}
} else if millis - last_poll > 1000 {
warn!(elapsed=%(millis - last_poll), %label, %count, "No polling for too long");
}
}
}
if let Ok(millis) = millis {
this.last_poll.store(millis, Ordering::SeqCst);
}
let t = *this.threshold;
let start = Instant::now();
let fut = this.fut.as_pin_mut().unwrap();
let res = fut.poll(ctx);
let measured = start.elapsed();
this.total_poll.fetch_add(measured.as_nanos() as usize, Ordering::SeqCst);
if force_log {
info!(sec=%measured.as_secs_f64(), %label, %count, "Future polled");
} else if measured.as_nanos() > t {
warn!(sec=%measured.as_secs_f64(), %label, %count, "future took too long!");
}
res
}
}
And then the problematic future is started like this:
let jobs = producer_events
.into_iter()
.map(|tuple| {
// unique ID to compare poll timer with progress logs
let uuid = uuid::Uuid::new_v4();
let (tx, rx) = tokio::sync::oneshot::channel();
// cloning of local variables and fields
let future = Measured::wrap(
async move {
let start = Instant::now();
for message in messages {
let result = Measured::wrap(
tryhard::retry_fn(
Measured::wrap(async {
info!(elapsed=?start.elapsed(), %uuid, "send_event start");
let result = producer.send(&message).await;
info!(elapsed=?start.elapsed(), %uuid, "send_event done");
result
}, "producer_send"))
.retries(3)
.fixed_backoff(Duration::from_secs(5))
.on_retry(/* this doesn't trigger in reality */)
),
"publish_with_retry",
)
.await;
if let Err(err) = result {
info!(elapsed=?start.elapsed(), %uuid, "send_event transmit");
tx.send(Err(err)).unwrap();
info!(elapsed=?start.elapsed(), %uuid, "send_event error fin");
return;
}
}
info!(elapsed=?start.elapsed(), %uuid, "send_event transmit");
tx.send(Ok(())).unwrap();
info!(elapsed=?start.elapsed(), %uuid, %messages, "send_event fin");
},
format!("send_event {uuid}"),
)
.with_drop_log()
.boxed();
(future, rx)
})
.fold(JoinSet::new(), |mut set, (job, result)| {
// This is a separate runtime I was advised on trying
self.tokio_handle.spawn(job);
set.spawn(async { result.await.unwrap() });
set
})
.join_all();
Measured::wrap(jobs, "publish_all")
.await
.into_iter()
.collect::<anyhow::Result<()>>()?;
And the logs I get show send_event fin in the log about 100ms before the Future polled log, e.g.:
2025-12-19T13:48:50.239096Z INFO Future created elapsed=250ns label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
2025-12-19T13:48:50.438939Z INFO Future poll started label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
2025-12-19T13:48:50.438946Z INFO send_event start elapsed=2.55µs uuid=e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0
...
2025-12-19T13:48:50.439075Z INFO send_event fin elapsed=131.721µs uuid=e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 messages=1
2025-12-19T13:48:50.539116Z INFO Future polled sec=0.100171739 label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 count=1
2025-12-19T13:48:50.539134Z INFO Future dropped! elapsed=300.038387ms label=send_event e5e4c5bf-f3c3-4fd8-961d-2f579e108ef0 poll_ms=100.171739 count=1
So this specific future spent 200ms until first poll, then the inner future completed in 131µs, but then was busy for another 100ms. Maybe there's something that was being dropped and it took too long, I'll try to check if that's the case.
I'm now thinking of reorganising the code to spawn futures right away, not boxing them, but boxing happens after Measured is constructed, so it shouldn't affect polling the inner future.