Store a Future in a Stream and poll it

Hi :wave:

I'm currently working on a hobby project implementing a packet radio receiver, which should shift it's frequency in the same way a transmitter does. In order to do that, both receiver and transmitter clocks should be synchronized. The project is fully built on no_std Rust. Long story short, I came to a design where both a timer and a receiver are Stream implementations polled "in parallel" via futures_util::stream::select. The receiver has an async rx method which awaits for an interrupt and reads radio data into a buffer transforming it into a packet. Here is how I'm trying to wrap it into a stream:

pin_project! {
    pub struct RxStream<'d, SPI> {
        receiver: Receiver<'d, SPI>,
    }
}

impl<'d, SPI: SpiDevice> RxStream<'d, SPI> {
    pub fn new(rx: Receiver<'d, SPI>) -> Self {
        Self { receiver: rx }
    }

    pub fn receiver(&mut self) -> &mut Receiver<'d, SPI> {
        return &mut self.receiver;
    }
}

impl<'d, SPI: SpiDevice> Stream for RxStream<'d, SPI> {
    type Item = RxEvent;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let projection = self.project();

        let fut = projection.receiver.rx();
        pin_mut!(fut);

        return match fut.poll_unpin(cx) {
            Poll::Ready(packet) => Poll::Ready(Some(RxEvent::Data(packet))),
            Poll::Pending => Poll::Pending,
        };
    }
}

The main problem here is in the poll_next method which creates a new Future from the projection.receiver.rx() each time the RxStream is polled which potentially drops some packets mid-reception. I'm not using futures::stream::fold here as I need an access to the moved Receiver after a corresponding RxEvent, generated either by this stream or by the Clock so I've created a custom stream allowing to access the Receiver it owns via the receiver() method.
I found this solution however it requires std, or at least alloc which I'm trying to avoid doing a pure no_std project.
How may I store the fut in the RxStream, poll it untill ready, and then create a new one, repeating this cycle?

it's not clear to me how or where packets are being lost. are you masking interrupts by default and only unmasking them to poll for events, and the events get dropped because you're not polling frequently enough? if that's the case, you should make your interrupt place events in a buffer, and only mask the interrupt when removing items from that buffer.

if your problem is that the simple act of creating the future causes the event to be consumed, try looking into making your future "cancelation safe", such that it only has side effects if it returns Ready. cancelation saftey is generally required for select! to work properly anyways.

The problem is, rx method is a wrapper around a device driver rx which in addition does packet verification, a simplified version would look like this:

pub async fn rx(&mut self) -> Packet {
    loop {
        match self.device.rx(&mut self.rx_buffer).await {
            Ok((_, rssi)) => {
                if packet_invalid(&mut self.rx_buffer) {
                    continue;
                }
                self.tx_time = Instant::now().as_micros();

                return Packet::new(&mut self.rx_buffer);
            }
            Err(_) => continue,
        }
    }
}

The device.rx in turn is waiting for interrupt, and when it occurs, reading interrupt flags over SPI, then resetting them if there are RX-related flags and after that, in the same method, a data transfers occurs, which also consists of a few methods: get payload length, read FIFO buffer address, read the buffer. All those operations are basically read/write SPI transactions and they are async as well. Taking into account Futures are state machines, being interrupted between those operations due to the fact receiver.rx is invoked in the RxStream::poll_next method most likely makes the packet receiving incomplete. At least this is how I'm understanding Rust async/await, of course I may be wrong due to the lack of experience.

If the Receiver type does not provide a poll method and only lets you receive events using an async method, then you have to use something like futures::stream::unfold.

How about something like this?

async fn unfold_fn(rx: &mut Receiver) -> Option<(Item, &mut Receiver)> {
    let item = rx.rx().await?;
    Some((item, rx))
}

fn receiver_stream(rx: &mut Receiver) -> impl Stream<Item = Item> + '_ {
    unfold(rx, unfold_fn)
}

The stream only takes a mutable reference to Receiver, so it does not consume it.

The pin_project macro has no effect if you don't mark any fields with #[pin]. Don't use it if that's the case.

1 Like

Thank you for the proposed solution, alas in this case there is an error (sorry if I didn't mentioned I need a mutable access):

error[E0499]: cannot borrow `receiver` as mutable more than once at a time
   --> src/main.rs:127:63
    |
117 |     let rx_stream = rx_stream(&mut receiver);
    |                               ------------- first mutable borrow occurs here
...
127 |                         let timing_offsets = receiver.tick();
    |                                                               ^^^^^^^^ second mutable borrow occurs here
...
146 | }
    | - first borrow might be used here, when `rx_stream` is dropped and runs the destructor for type `impl Stream<Item = RxEvent> + '_`

Usage example:

let mut receiver = Receiver::new(device, config);
// Await for initial packet on a pre-defined channel
receiver.lock().await;

let ticker: SynchronizedTimer = SynchronizedTimer::new(Duration::from_hz(200));
let rx_stream = rx_stream(&mut receiver);
pin_mut!(ticker, rx_stream);
let mut combined = select(ticker, rx_stream);

loop {
    match combined.next().await {
        Some(event) => match event {
            RxEvent::Timer(phase) => match phase {
                TimerPhase::Tick => {
                    // Getting an access to the streams via reference, previously receiver was access via the stream instance here
                    let (timer, _) = combined.get_mut();
                    // Here goes an error
                    let timing_offsets = receiver.tick();
                    timer.set_offsets(timing_offsets);
                }
                TimerPhase::Tock => {
                    receiver.tock().await;
                }
            },
            RxEvent::Data(_) => {
                receiver.hop().await;
            }
        },
        None => {}
    }

    receiver.update_connection_stats();
}

The future returned by rx holds a mutable reference on Receiver. This means that from the instant you call rx and until that future completes, the future has exclusive access to the Receiver. That is to say, calling methods on Receiver is not possible during a running rx future.

The only way to do that kind of thing is to cancel the rx future, then call your other methods, and then recreate the rx future. However, this only works if the rx function is cancel safe.

The simplified version of rx you shared is cancel safe if and only if self.device.rx is cancel safe. This is because cancellation always happens at an .await, and the only .await is the call to self.device.rx. Cancellation would mean that the function stops at whichever .await it has reached and starts over from the top.

2 Likes

Thank you for the explanation! It seems like I need to discard my current design, probably I'll look into a more conventional approach with ISR and mutex-based synchronization.