How to bound `futures::select_all`?

Say that I have a stream of futures

use futures::future::BoxFuture;
use futures::stream::BoxStream;
pub type BatchFuture = BoxFuture<'static, Result<Vec<i32>, String>>;
pub type BatchStream = BoxStream<'static, BatchFuture>;

and one of my operators is a select_all over multiple streams:

use futures::stream::select_all;

pub fn union(streams: Vec<BatchStream>) -> Result<BatchStream, String> {
    Ok(Box::pin(select_all(streams)))
}

is there a mechanism whereby the select_all is bounded e.g. by a maximum number? I.e. can I control how many concurrent polls are happening?

Would a wrapper type like this work? (Warning: I have not tested this myself.)

use futures::{
    future::BoxFuture,
    stream::{BoxStream, SelectAll},
    Stream,
};
use std::{
    pin::Pin,
    task::{Context, Poll},
};

pub type BatchFuture = BoxFuture<'static, Result<Vec<i32>, String>>;
pub type BatchStream = BoxStream<'static, BatchFuture>;

pub fn union(streams: Vec<BatchStream>) -> Result<BatchStream, String> {
    struct Union {
        streams: Vec<BatchStream>,
        select: SelectAll<BatchStream>,
    }
    impl Stream for Union {
        type Item = BatchFuture;
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            while self.select.len() < 8 {
                match self.streams.pop() {
                    Some(stream) => self.select.push(stream),
                    None => break,
                }
            }
            Pin::new(&mut self.select).poll_next(cx)
        }
    }
    Ok(Box::pin(Union {
        streams,
        select: SelectAll::new(),
    }))
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.