Continuing futures in a specific order

I'm working on a library for a protocol where each protocol object can receive events. I thought it would be interesting to use async for this. I'm using async fns on objects for events, so to receive an event you just await on a method. An object can have multiple events, and I would like to have the ability for events that are not being listened for to just be dropped.

The issue I'm currently running into is once an event comes through, I want the event futures to be completed in the order that they are received, because event ordering in the protocol is important. For example, if you have these objects:

struct A;
impl A {
    async fn event1(&self);
    async fn event2(&self);
}

struct B;
impl B {
    async fn event1(&self);
}

And then you await them in whatever order:

use futures_lite::FutureExt;

a.event1().or(a.event2()).or(b.event1()).await

And you receive the events like:

  1. A::event2
  2. B::event1
  3. A::event1

Then the futures should also be completed in this order. The first thing that comes to mind is to have a queue/channel of events, signal a given event when its event is at the top of the queue, have the event signal back when it's pulled the event, and then move on to the next event. But this seems like a lot of switching back and forth, and maybe there's a more straightforward method of achieving this I'm not thinking of. Any ideas?

your code doesn't do what you described. the or() combinator will complete the first ready future and drop the rest futures.

since you are using futures-lite, you can check out the zip() combinator. it will complete all the futures in the order as they are ready [1].

btw, the equivalence of the zip() combinator in the futures crate is called join().


  1. implementation detail of polling order: if multiple sub-futures were ready when the zipped future was being polled, the first one has higher priority ↩︎

Yes I forgot the loop there, I'm fine with them being dropped

To await a collection of futures and receive the results in the order that they complete, you can use FuturesUnordered from futures or JoinSet from tokio. Note that these both require all the futures in question to have the same Output type.

I'm not trying to await futures in order, I'm trying to have futures wake up in order as a library.

It's not clear what you're after here: In Rust, awaiting a future is exactly making it available to wake, and they complete whenever they run out of things to do. If you're ok with them being dropped and running in a loop, you're probably interested in cancellation safety, which is a fiddly concept.

You can use a runtime spawn method so they become "detached" top level tasks that will execute whenever they are woken (and there are runtime threads available).

Alternatively, you may want use a JoinSet to read the result of spawned tasks as they complete

Maybe I'm not being clear enough: I want to return Futures to users (from a library) that will be completed in the order that the events are processed in. I don't need to handle the futures from the library. I would like to guarantee that the futures are completed in the correct order regardless of the order they are created in.

My current thought is I have to have the futures notify the event sender when it actually receives the event, but I'm wondering if there is a better approach here that doesn't involve opening a communication channel with every Future. Is this even a good use case for futures?

that's not possible in principle. it depends on how the user polls the futures.

Hmm.... I'm still a bit confused, but it might be because you're trying to do something that doesn't quite make sense?

Lets say the async fn for the events is morally something like:

impl A {
  async fn event1(&self) -> Result<Event1> {
    let rx = self.protocol.rx.clone();
    loop {
      if let Event::Event1(event) = rx.recv().await? {
        return Ok(event)
      }
    }
  }
}

where self.protocol is a tokio::sync::broadcast channel pair or similar.

This technically will let each "subscriber" (call to event()) return the next received event of the specified type, but as you can possibly tell from the linked tokio docs, the actual behavior is incredibly subtle and probably not what you actually want; leading to it being incredibly easy to lose events. You also need to consider what the behavior of multiple simultaneous calls to the same event type should be.

Often the most straightforward for the caller behavior is to just expose a raw stream of events, since they can just do:

spawn(async {
  while let Ok(event) = rx.recv().await? {
    ...

and dispatch however they like.

The closest I would suggest to what you're asking for here, with clearer semantics, is something like:

impl A {
  fn event1_iter() -> impl Stream<AEvent1> {
    self.protocol.event_rx.clone().filter_map(|event| {
      match event {
        Event::AEvent1(event) if event.target_id = self.id => Some(event),
        _ => None
      }
    })
  }
}

This means each call to event*_iter creates a fresh and separate stream of events of that type. This under the hood roughly does end up with the "queue/channel of events" (in the broadcast channel) "signal a given event, pull the next ..." (in the cloned receiver).

This still isn't great, since there is a limit on how many events can be "in the pipe" on each receiver:

let mut foo_events = a.event1_iter();
some_other_stuff().await;
foo_events.next().await; // possibly RecvError::Lagged

And there's inherently some difficulty with combining multiple event streams, that you would possibly need complex Rx-like combinators to resolve, but I feel those are lesser issues than deadlocks or lost events that trying to poll for single events would likely lead to.