Futures Stream or FuturesUnordered for porting to async

Hey,

Im currently porting an api crate from making blocking web requests to async.
There are instances where api endpoints return a list of other api endpoints for the user to call.
Up till now, this was implemented as a struct with a custom IntoIter implementation where calling next() made the web request.

Moving to async, i would prefer to keep the ability to represent this list of api endpoints in a way that can still be used with map/filter/collect etc.
To that end, i found Streams as well as FuturesUnordered and im unsure which one i should use, given that they both seem to serve a similar purpose (at least here)

Ideally, I would like it to be trivial to collect/map/filter the futures concurrently (not neccessarily in parallell, i.e. there should be multiple web requests waiting at the same time, but they need not neccessarily be compleated in parallel) if that helps narrowing it down.

for reference, this is the code i already tried with streams, and it seems to work, although it seems to only drive 1 future at a time.

struct ListIter<T> {...}
impl<T: Send + Sync + Unpin> ListIter<T> {

 #[async_recursion]
 async fn stream_next(&mut self) -> Option<crate::Result<T>> {
       ...
    }
 pub fn into_stream(self) -> impl Stream<Item = crate::Result<T>> + Unpin {
        Box::pin(stream::unfold(self, |mut state| async move {
            let item = state.stream_next().await;
            item.map(|val| (val, state))
        }))
    }
}

...

let cards: Vec<Card> = query
        .search()
        .await?
        .into_stream()
        .filter_map(|card| future::ready(card.ok()) })
        .filter(|card| {
            future::ready(
                card.prices.usd.is_some() || (!card.nonfoil && card.prices.usd_foil.is_some()),
            )
        })
        .collect()
        .await;

FuturesUnordered is a building block - you feed it futures, it runs them concurrently and returns their output in completion order. It's up to you to keep the number of in-flight futures under control, and to cope with the reordering.

There are two higher-level options provided by StreamExt (and try_ versions provided by TryStreamExt - buffered and buffer_unordered. These consume a stream whose next().await returns an impl Future<Output = T>, and run a limited set of those futures concurrently, giving you a stream whose next().await returns a T. buffered has the added property that it maintains the original order for you - while the buffered set can complete out of order, it'll restore the original order before returning items.

All of these have the subexecutor problem that means that if you're not careful, you can deadlock because you're not driving all the futures from the main executor, but instead relying on someone polling the FuturesUnordered often enough to cause the futures to release any resources they're holding onto.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.