Say that I have a stream of futures
use futures::future::BoxFuture;
use futures::stream::BoxStream;
pub type BatchFuture = BoxFuture<'static, Result<Vec<i32>, String>>;
pub type BatchStream = BoxStream<'static, BatchFuture>;
and one of my operators is a select_all
over multiple streams:
use futures::stream::select_all;
pub fn union(streams: Vec<BatchStream>) -> Result<BatchStream, String> {
Ok(Box::pin(select_all(streams)))
}
is there a mechanism whereby the select_all
is bounded e.g. by a maximum number? I.e. can I control how many concurrent polls are happening?
Would a wrapper type like this work? (Warning: I have not tested this myself.)
use futures::{
future::BoxFuture,
stream::{BoxStream, SelectAll},
Stream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
pub type BatchFuture = BoxFuture<'static, Result<Vec<i32>, String>>;
pub type BatchStream = BoxStream<'static, BatchFuture>;
pub fn union(streams: Vec<BatchStream>) -> Result<BatchStream, String> {
struct Union {
streams: Vec<BatchStream>,
select: SelectAll<BatchStream>,
}
impl Stream for Union {
type Item = BatchFuture;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while self.select.len() < 8 {
match self.streams.pop() {
Some(stream) => self.select.push(stream),
None => break,
}
}
Pin::new(&mut self.select).poll_next(cx)
}
}
Ok(Box::pin(Union {
streams,
select: SelectAll::new(),
}))
}
1 Like
system
Closed
July 25, 2022, 8:17pm
3
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.