Tracking an Optional Future as state

I'm working on trying to clean up some code I've written to implement a custom/proprietary protocol over Bluetooth, and running into a challenge I'm not entirely sure how to deal with. The protocol consists of various commands wrapped in frames and sent over a serial link, where the framing layer includes things like sequence numbering, acknowledgement, checksumming.

I learned about tokio_util::codec::Framed and used it to implement the framing, so I now have a Stream + Sink of Frames. Next, I'd like to wrap this duplex stream to produce one working on Commands, taking care of the sequence numbers/acknowledging (and eventually resending with backoff) so that the rest of the code doesn't need to worry about the low-level details.


This is where I've ran into a bit of an issue as I'm trying to implement Stream in such a way that acknowledgements might be sent to the wrapped Sink<Frame> as part of reading from the stream.

I came across this post from three years ago, and via it the async-stream crate, but I don't think it's applicable directly in my usecase and looking at the implementation didn't help me see how to handle it either. Essentially I'm trying to implement the approach of keeping track of a "priority" Future and polling it as part of poll_next, not processing any other frames until we've acknowledged the current one.

Here's a playground with a reduced sketch of my current best attempt: Rust Playground

I'd love for some guidance about ideally both

  • how I can track an optional Future like this and poll it when I have Some(future)
  • if there's a different, better way to approach this that's more idiomatic

Thanks!

For your Send in Option, you don't actually have to worry about it because futures::sink::Send implements Unpin, so you can just Pin::new(future).poll(ctx).

However, there is a more fundamental problem, which happens to not show up until the borrow checking phase: you are attempting to build a self-referential structure (prio_future borrows write_half) out of just a pin_projected struct, and you can’t actually do that. Unfortunately, because that's the whole point of pinning, isn't it? But there’s no way in the language to write the appropriate lifetimes (your 'a cannot work, because a struct's lifetime parameter must refer to borrows which outlive the struct), and no way in the language to ensure that write_half is not mutated while it is borrowed by prio_future, and pin_project doesn't change either of those factors.

(Also, the futures::sink::Send has no owner, but fixing that won't help.)

There are libraries that can help with safely constructing self-referential data, such as ouroboros, yoke, and self_cell; but as far as I know, no one has written one that actually takes advantage of or cooperates with Pin,[1] so they all do some heap allocation, unfortunately. They'll still work for your futures::sink::Send since it is Unpin, but the stream and sink may be less happy. The alternative to using one of those libraries is to write your own unsafe lifetime-extending code, which can avoid an allocation but needs to be written very, very carefully.


  1. I’ve been thinking that such a library really ought to exist, but haven’t gotten around to successfully writing one myself. ↩︎

2 Likes

Hmm, that makes sense--I didn't think of how the prio_future retains a reference to the write_half while it remains alive since it's sending to it. I don't think I'm ready to go the full way of writing careful unsafe code to keep the pinned structures suitably pinned while self-referential...

I'll explore the mentioned crates a bit (currently mostly ouroboros; I think I like the philosophy of self_cell a bit more but I think it doesn't make sense for my usecase to not make the higher-level stream generic, and apparently self_cell doesn't support structs with type parameters), but otherwise might just leave it as a TODO for the future. My fallback would be to just make a struct with my own async functions that await the sending-the-ack before awaiting reading the stream--but of course it would be nicer to have a proper Stream that composes well with other utility functions etc.

I'm still relatively new to Rust and this is pretty much the first time I'm using Pin and working on the "inside" of Futures (as opposed to using async/await syntax "from the outside"), so I think I'll read over the documentation a bit and try to digest it. Thanks for the pointers!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.