Tracking the state of an async operation

Hey All,

I have an async messaging library that lets you publish messages to a remote host (MQTT). It has an async publish() function something like this:

pub async fn publish(&self, msg: Message) -> Result<()> { ... }

The message will go through several states to complete:

  • Queued - Packet serialized and placed in a local channel
  • Written - The packet was successfully placed on the wire
  • Finished - The packet was ACK'ed by the remote host

If the first step fails, it would happen immediately before publish returns (meaning the Future is complete upon return from the call). The next two steps might not finish for minutes, hours, or even days - if the client is disconnected and waiting to automatically reconnect later.

A user of the library is asking for a way to determine if the first step fails, without blocking on an await, which could take days to complete.

Some ideas:

  • await with a timeout?
  • Something akin to a try_await (non-blocking zero sec timeout)
  • Return the Future in a timeout, in a non-async function, like:
pub fn queue(&self, msg: Message) -> Result<impl Future<Output=Result<()>> { ... }

Do any of these make sense? How do other libraries handle this type of thing?

Thanks.

Returning result-wrapped future makes a lot of sense for things that may fail synchronously.

Another option is to taking some form of callback or channel as an argument that will be notified about progress to each stage.

Or you could return an object that requires user to call and await each stage: e.equeue().await?; e.write().await?; e.finish().await?.

Oh, yeah. That last one is an interesting; hadn't thought of that.

It could be good because the actual, current, implementation does already return an object that implements Future, but could pretty easily implement the calls for intermediate stages while keeping to current behavior that the object itself implements the overall finish(), so doesn't break the existing API.

Meaning:

e.queue().await?;  // Optional
e.write().await?;  // Optional
e.await?;

I had an API with a similar situation, but I just had the future immediately return the error if the preparation failed. It's possible to poll a future once to check if a result is immediately available.

I personally like this option the most... Especially since the underlying C lib already supports callbacks, so it should be reasonably simple to use those.

This option gives you both immediate feedback (is the initial queue successful) and will notify you with the end result (was the message eventually published successfully) without having to add and manage long running tasks yourself.

1 Like

That's what I was thinking by the "try" suggestion, because that's the way that the Future object is implemented, internally, already. But I wasn't sure how to do that from the app. So, umm, how do you poll a future once?

You can use poll_fn.

let res = poll_fn(|cx| Poll::Ready(future.poll(cx))).await;

or now_or_never (on a mutable reference to not consume it)

use futures::future::FutureExt;
let res = (&mut future).now_or_never();

in either case, the future object will still exist even if it completes, by trying to await it afterwards will fail if it completed on your one poll.

Oh, that's awesome!

So I think the combination of the two techniques would be the general solution...

  • The single poll after creation would indicate that the initial creation/serialization/queuing failed, then...
  • Individual Futures for different stages of the operation exposed via individual "stage functions"

I like it. Thanks @alice & @kornel!