I gave the async
approach a go, and this should be fairly straightforward.. well more-or-less. I tried avoiding additional allocations and needed to work around a bit around limitations of HRTB closures that return futures, or bad closure type inference.
My current code doesn't properly handle the Context
/Waker
s yet; there's probably be a problem when the closure passed its |iter|
argument e.g. into a separately spawned task; it should already work properly as long as the closure directly awaits the stream's elements in the same context... anyways, I won't continue working on this before tomorrow, here's what it currently looks like, also using stream extension traits from async_std
for the main
function (because they're most similar to the standard library's Iterator
API):
#[allow(clippy::redundant_closure)]
fn main() {
let my_collection = [(), (), ()];
let (a, b) = futures::executor::block_on(
stream::from_iter(my_collection.iter().map(|_x| some_expensive_computation()))
.broadcast()
.consume_with(
|iter| {
iter.filter(|x| some_condition(x))
.map(|x| some_transformation(x))
.max()
},
|iter| {
iter.map(|x| some_other_transformation(x))
.filter(|x| some_other_condition(x))
.count()
},
),
);
dbg!(a, b);
}
The .broadcast().consume_with(_, _)
is two steps because the second method takes &mut self
which allows the passed closures not to be generic over the lifetime. The code uses some shared struct on the stack for the different futures to communicate, and this data can be part of the struct returned by broadcast
in order to lift the need for HRTBs while avoiding allocation, too.
It uses Cell
, but could be changed to support Sync
/Send
. The naming "broadcast" is inspired by the fact that you could (probably) achieve the same kind of functionality - probably less efficient - by using a tokio::sync.:broadcast channel.
Here's the full code
/*
[dependencies]
async-std = { version = "1.10.0", features = ["unstable"] }
futures = "0.3.19"
pin-project = "1.0.8"
*/
#![warn(clippy::todo)]
#![allow(clippy::type_complexity)]
use std::{cell::Cell, task::Poll};
use async_std::{prelude::*, stream};
use futures::{future::MaybeDone, ready};
use pin_project::pin_project;
struct StreamBroadcast<S: Stream> {
stream: Option<S>,
items: Cell<[Poll<Option<S::Item>>; 2]>,
}
trait StreamBroadcastExt: Sized + Stream {
fn broadcast(self) -> StreamBroadcast<Self>;
}
impl<S> StreamBroadcastExt for S
where
S: Stream,
S::Item: Clone,
{
fn broadcast(self) -> StreamBroadcast<Self> {
StreamBroadcast {
stream: Some(self),
items: Cell::new([Poll::Pending, Poll::Pending]),
}
}
}
// todo: does not properly notify context, so it won't work in spawned tasks and the like right now.
/// Note: Does not work anymore if it escapes the `consume_with` call.
struct BroadcastReceiver<'a, Item>(&'a Cell<Poll<Option<Item>>>);
impl<'a, Item> Stream for BroadcastReceiver<'a, Item> {
type Item = Item;
fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
self.0.replace(Poll::Pending)
}
}
impl<S> StreamBroadcast<S>
where
S: Stream,
S::Item: Clone,
{
async fn consume_with<'a, F1, F2, Fut1, Fut2>(
&'a mut self,
f1: F1,
f2: F2,
) -> (Fut1::Output, Fut2::Output)
where
F1: FnOnce(BroadcastReceiver<'a, S::Item>) -> Fut1,
F2: FnOnce(BroadcastReceiver<'a, S::Item>) -> Fut2,
Fut1: Future,
Fut2: Future,
{
let items = Cell::as_slice_of_cells(&self.items);
BroadcastFuture {
stream: self.stream.take().expect(
"StreamBroadcast::consume_with must not be called multiple times on the same value",
),
items: Some(&self.items),
fut1: MaybeDone::Future(f1(BroadcastReceiver(&items[0]))),
fut2: MaybeDone::Future(f2(BroadcastReceiver(&items[1]))),
}
.await
}
}
#[pin_project]
struct BroadcastFuture<'a, S, Fut1, Fut2>
where
S: Stream,
Fut1: Future,
Fut2: Future,
{
#[pin]
stream: S,
items: Option<&'a Cell<[Poll<Option<S::Item>>; 2]>>,
#[pin]
fut1: MaybeDone<Fut1>,
#[pin]
fut2: MaybeDone<Fut2>,
}
impl<'a, S, Fut1, Fut2> Future for BroadcastFuture<'a, S, Fut1, Fut2>
where
S: Stream,
S::Item: Clone,
Fut1: Future,
Fut2: Future,
{
type Output = (Fut1::Output, Fut2::Output);
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(loop {
if this.fut1.as_mut().poll(cx).is_ready() & this.fut2.as_mut().poll(cx).is_ready() {
match (this.fut1.take_output(), this.fut2.take_output()) {
(Some(r1), Some(r2)) => break (r1, r2),
_ => panic!("BroadcastFuture was polled after completion"),
}
}
if let Some(items_ref) = *this.items {
let items = items_ref.replace([Poll::Pending, Poll::Pending]);
if matches!(items, [Poll::Pending, Poll::Pending]) {
match ready!(this.stream.as_mut().poll_next(cx)) {
None => {
items_ref.set([Poll::Ready(None), Poll::Ready(None)]);
*this.items = None;
}
Some(next_item) => items_ref.set([
Poll::Ready(Some(next_item.clone())),
Poll::Ready(Some(next_item)),
]),
}
}
}
})
}
}
fn some_expensive_computation() -> u32 {
println!("expensive!!");
42
}
fn some_condition(n: &u32) -> bool {
n % 2 == 0
}
fn some_other_condition(n: &u32) -> bool {
n % 2 == 1
}
fn some_transformation(n: u32) -> u32 {
n + 100
}
fn some_other_transformation(n: u32) -> u32 {
n + 1000
}
#[allow(clippy::redundant_closure)]
fn main() {
let my_collection = [(), (), ()];
let (a, b) = futures::executor::block_on(
stream::from_iter(my_collection.iter().map(|_x| some_expensive_computation()))
.broadcast()
.consume_with(
|iter| {
iter.filter(|x| some_condition(x))
.map(|x| some_transformation(x))
.max()
},
|iter| {
iter.map(|x| some_other_transformation(x))
.filter(|x| some_other_condition(x))
.count()
},
),
);
dbg!(a, b);
}
expensive!!
expensive!!
expensive!!
[src\main.rs:181] a = Some(
142,
)
[src\main.rs:181] b = 0