Selectively progressing a future

I'm having trouble making some async code work the way I need. I did ask around before, but after some bug-hunting I feel I'm clearer on what I really need, so I figured I'd ask again, hoping someone can set me on the right track :slight_smile:

I have an async receiver where I get messages, say of type enum Message { A, B }. I also have async functions handle_A and handle_B to handle those messages. I have the following code, which I've factored into 3 functions because that seems to facilitate what I need to do (but it's not really a requirement):

async fn run_loop() {
  loop {
    handle_receiver_next().await
  }
}

async fn handle_receiver_next() {
    match receiver.next().await {
      Some(m) => handle_message(m).await,
      None => panic!()
    }
}


async fn handle_message(m: Message) {
  match m {
    m @ Message::A => handle_A(m).await,
    m @ Message::B => {
      // What goes here?
    }
  }
}

Now here's what the code at the marked line needs to do:

  • 1 Run handle_B(m)
  • 2a If handle_B(m) finished, stick the result in another channel, and return
  • 2b If handle_B(m) did not finish, then wait for either receiver.next() or handle_B(m) (which are both futures) to be able to make progress (this should not block, there might be other futures scheduled that should be able to run as long as none of those can make further progress)
  • 2ba If receiver.next() is ready, run handle_message().await on the result. Once that finished (recursion might happen here), go to 2b.
  • 2bb if handle_B(m) is ready to make progress, go to 1. again.

This sounds somewhat not-too-complicated, but I just can't make it work. The structure of this code isn't important, I just put it up so I can somewhat explain what I want. The main point is looping over getting the message from an async channel and handling them in the way I described, and being able to recurse when handle_B(m) can't finish "on it's own".

I've tried with manually polling handle_B(m) in a loop, but I don't really have access to the Context here, so I'm not sure if that can be done. I've thought about making my own type Handle_B and implement Future for it, but I don't see how I can recurse into handle_message inside of it, or how I can suspend it while waiting for either receiver.next() or handle_b(m) to be able to make progress.

I'd be grateful for any hints on how to do this :slight_smile: It's library code, so I don't really want to tie it to a specific executor if at all possible. The real code is of course somewhat more complicated, but I guess I can handle the other aspects of it myself if I just understand how to handle the above in principle.

Thanks!

Recursive futures must be wrapped in Pin<Box<dyn Future<Output=sometype>>>. There's a helper for it:

return async move { … }.boxed();

Otherwise the type of the future would be the type of the entire (possibly infinite) chain of operations. It's the same reason why you can't have struct Foo { next: Foo }, but can have struct Foo { next: Box<Foo> }.

Yes, I did that, but that does not solve the problem I'm having. I have code that compiles and runs and that recurses, but it's not doing what I need as I described it above. Thanks though :slight_smile: I changed the threads title to something that's hopefully closer to what I need.

Ah, sorry :slight_smile:

I don't think futures have a high-level concept of "if is able to make progress". You can wait on two futures, or first of two futures, or abort after a timeout, etc. but it's all-or-nothing on the high level.

You could use handle.spawn(handle_B()) to fire-and-forget it, and continue reading from the channel. If you want to throttle processing or limit number of B handles in progress, then add tokio's Semaphore.

Hmm, thanks, but spawn is out of the question, as the messages need to be handled in the order they arrive, save for the mentioned recursion (i.e. while handling Message::B without being able to finish, there must be handling of A or B during it, but those need to finish before further progressing the handling of B).

This sounds a bit like a buffered stream to me - you want to buffer A messages, but not B messages?

Or you could use spawn, and then reorder processed messages. I've done that in sync iterator, but the same principle could work in async stream:

https://github.com/ImageOptim/gifski/blob/04e4723c8cb4c13048d02d842a4646a732ca9d7f/src/ordqueue.rs#L37-L41

When you say channel.next() are you referring to the channel receiver, or some other channel?

This sounds a bit like a buffered stream to me - you want to buffer A messages, but not B messages?

Uhh, I'm not sure, as far as I understand the world "buffering", it's not that, but I might simply be mistaken. The messages arrive from an external source, and need to be handled strictly in order (both A and B types). Just it might happen that we're handling a B message, and during that handling another message arrives. Then that new message needs to be handled first (it's easy to know this since handling B can't finish before), and only after that we can commence handling B... and this whole thing might nest, but it will be (and must be) strictly nested, not in an overlapping fashion. Hope I'm expressing myself somewhat clearly, I'm not a native speaker :slight_smile:

I'll study that, thanks for the hint!

Ah yes indeed, sorry for the confusion, I'll clear up the code.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.