Convert UnboundedReceiver<T> into IntoStream<T>

https://docs.rs/futures/0.3.5/futures/channel/mpsc/struct.UnboundedReceiver.html

i'm interested in taking this struct:

pub struct PeerChannel {
    server_remote: HdpServerRemote,
    target_cid: u64,
    vconn_type: VirtualConnectionType,
    channel_id: Ticket,
    security_level: SecurityLevel,
    // When the associated virtual conn drops, this gets flipped off, and hence, data won't be sent anymore
    is_alive: Arc<AtomicBool>,
    receiver: UnboundedReceiver<Vec<u8>>
}

and splitting it into a SendHalf and RecvHalf. With the RecvHalf, i have this structure and stream impl:

#[derive(Debug)]
pub struct RecvHalf<S: Stream<Item=Vec<u8>>> {
    // when the state container removes the vconn, this will get closed
    receiver: S,
    target_cid: u64,
    vconn_type: VirtualConnectionType,
    channel_id: Ticket,
    is_alive: Arc<AtomicBool>
}

impl<S: Stream<Item=Vec<u8>>> Stream for RecvHalf<S> {
    type Item = Vec<u8>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if !self.is_alive.load(Ordering::SeqCst) {
            // close the stream
            log::info!("[POLL] closing PeerChannel RecvHalf");
            Poll::Ready(None)
        } else {
            self.receiver.poll_next(cx)
        }
    }
}

What is the best way to treat the UnboundedReceiver<Vec<u8>> as a stream? Something about the way I'm doing it doesn't seem idiomatic nor correct. How should I go about splitting the above structure idiomatically/properly?

Here is the split fn:

    pub fn split(self) -> (SendHalf, RecvHalf<IntoStream<UnboundedReceiver<Vec<u8>>>>) {
        let send_half = SendHalf {
            server_remote: self.server_remote,
            target_cid: self.target_cid,
            vconn_type: self.vconn_type,
            channel_id: self.channel_id,
            security_level: self.security_level,
            is_alive: self.is_alive.clone()
        };
        
        let recv_half = RecvHalf {
            receiver: self.receiver.into_stream(),
            target_cid: self.target_cid,
            vconn_type: self.vconn_type,
            channel_id: self.channel_id,
            is_alive: self.is_alive.clone()
        };

        (send_half, recv_half)
    }

Not sure what the structure is of the rest of the program, but instead of tracking an is_alive variable, is it possible to keep a copy of the UnboundedSender where the is_alive variable gets flipped and instead just send None to the receiver

EDIT: Never mind. I thought you might be able to manually close the channel that way. I guess using this approach, you would instead need to make sure all the send halves of the mpsc channel drop instead of setting is_alive to false.

1 Like

Yeah, I ended up solving it by using a oneshot transmitter (Sender<Waker>) to send the context's waker within the poll_next function to the end which sends to the receiver. Then, when the Vec<u8> transmitter sends the data to the UnboundedReceiver<Vec<u8>>, it wakes the Waker. Works like a charm now

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.