How to lazily split a Non-cloneable stream to two stream

the incorrect_stream_split have a bug : moved tx in FnMut was not droped.
but the correct_stream_split was not lazily

#![allow(warnings)]

use futures; // 0.3.31
use futures::{SinkExt, Stream, StreamExt, stream, pin_mut};
use tokio::sync::mpsc::channel;
use tokio_stream; // 0.1.17
use tokio_stream::wrappers::ReceiverStream;
use rand;


fn pred<T>(_value : &T) -> bool {
    rand::random()
}

fn incorrect_stream_split<T>(
    stream: impl Stream<Item = T>,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) {
    let (tx, rx) = channel(1024);
    let st1 = stream.filter_map(move |x| { 
        let tx_c = tx.clone();
        async move {
            if pred(&x) {
                Some(x)
            } else {
                tx_c.send(x).await.unwrap();
                None
            }
        }
    });
    let st2 = ReceiverStream::new(rx);

    return (st1, st2);
}

fn correct_stream_split<T>(
    mut stream: impl Stream<Item = T> + Send + Unpin + 'static,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) 
where
    T : Send + 'static
{
    let (tx1, rx1) = channel(1024);
    let (tx2, rx2) = channel(1024);
    tokio::spawn(async move {
        while let Some(value) = stream.next().await {
            if pred(&value) {
                tx1.send(value).await.unwrap();    
            } else {
                tx2.send(value).await.unwrap();
            }
        }
    });
    
    
    (
        ReceiverStream::new(rx1),
        ReceiverStream::new(rx2)
    )
}

#[tokio::main]
async fn main() {
    let mut data = stream::iter(vec![1, 2, 3, 4, 5, 6]);
    
    let (mut s1, mut s2) = incorrect_stream_split(data);
    pin_mut!(s1);
    while let Some(num) = s1.next().await {
        println!("get num {} from s1", num);
    }
    while let Some(num) = s2.next().await {
        println!("get num {} from s2", num);
    }
}


What do you mean by lazy? Could you maybe write some tests for us to better understand your goal?

The point of "laziness" is the same as with iterators - if the result future is not polled, the incoming stream should not be polled either (so that it propagates backpressure).

It's certainly possible to do a wildly suboptimal solution where, to poll the stream half, you would need to create a oneshot channel.
The stream_split function would then await for channel's transmit handle, then await for message from the stream, and send it over the channel.

Another problem with the incorrect_stream_split is that calling st2.recv() doesn't make anything happen; it just blocks. Only st1.recv() actually drives any activity upstream.

The bounded channels seem tricky. Tell me if this is a problem:

Say child stream 1 is polled first. It gets a bunch of items from the source stream, but none of them pass the predicate. So it keeps sending them through a channel to child stream 2. Eventually the channel gets full and this blocks. What happens to the last item?

I guess child stream 1 should hold on to it and send it later. But child stream 1 might not get polled again for a long time, for whatever reason---its owner just doesn't need more values for a while. Then child stream 2 could consume its pending items and get stuck waiting for someone to poll child stream 1! That seems wrong.

I tried a couple things and didn't get there.

failed attempt 1 using stream::unfold

This is never going to work because the lifetimes in the type signature of unfold are too strict. I don't see a better tool in futures to turn an AsyncFnMut() -> Option<T> into a stream though.

failed attempt 2 using pin_project!

I'm sure this one would get there, if you kept pushing long enough.

Could you write wildly suboptimal solution to explain?

Sorry I misread the requirement (I thought we need to do one half for sending and another for receiving). I meant approximately this (does not work):

fn chan_stream_split<T: Send + 'static>(
    stream: impl Stream<Item = T> + Send + 'static,
    buffer: usize,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) {
    let (tx_want_false, mut rx_want_false) = channel::<()>(1);
    let (tx_want_true,  mut rx_want_true)  = channel::<()>(1);
    let (tx_false, rx_false) = channel(buffer);
    let (tx_true,  rx_true)  = channel(buffer);
    
    tokio::spawn(async move {
        tokio::pin!{stream};
        
        loop {
            tokio::select!{
                _ = rx_want_false.recv() => {},
                _ = rx_want_true.recv()  => {},
            }
            let Some(value) = stream.next().await else {break};
            
            if pred(&value) {
                let _ = tx_true.send(value).await;
            } else {
                let _ = tx_false.send(value).await;
            }
        }
    });
    
    struct WaitNotifierReceiver<T>(Sender<()>, Receiver<T>, bool);
    impl<T> Stream for WaitNotifierReceiver<T> {
        type Item = T;
        fn poll_next(
            mut self: core::pin::Pin<&mut Self>,
            cx: &mut core::task::Context<'_>
        ) -> core::task::Poll<Option<T>> {
            // Notify that we want an item.
            if !self.2 {
                if let Err(_) = self.0.blocking_send(()) {
                    return core::task::Poll::Ready(None);
                }
                self.2 = true;
            }
            
            // Try receiving.
            let r = self.1.poll_recv(cx);
            
            // If we got an item, the next poll requires a new notification.
            if r.is_ready() {
                self.2 = false;
            }
            
            r
        }
        fn size_hint(&self) -> (usize, Option<usize>) {
            (usize::MAX, None)
        }
    }
    
    (
        WaitNotifierReceiver(tx_want_true,  rx_true,  false),
        WaitNotifierReceiver(tx_want_false, rx_false, false),
    )
}

Though really, if you have 0000001010110 in the channel, do not poll 0 but poll 1, then all those items at the beginning have to be buffered somewhere.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.