Passing wrapper of async function to stream of futures

Hi I am having an issue with buffered futures.
I have code something like

let mut futures:  Vec<_>  = vec![];

for d in devices.iter() {
        // read_device returns vecor of f64
        let future: impl futures::Future<Output = PointValueResults> = read_device(d);
        futures.push(future);
}

let stream_of_futures = stream::iter(futures);
let mut buffered = stream_of_futures.buffer_unordered(2000);

while let Some(point_values) = buffered.next().await {
        for pr in point_values.iter() {
                // Do something with point
        }
}

The above works fine. However, I would like to wrap read_device so I can associate a flag with each call.

I was thinking of something like

async fn read_device_wrapper(flag: String, device_id: &32) -> (String, impl futures::Future<Output = PointValueResults>) {
   (site_group, house_modbus::read_device(device_id)
}

let future = read_device_wrapper("some flag".to_string(), d);
futures.push(future);

I can add these async wrapper functions the same way but when I iterate the buffered results I have issues

It gives

for pr in point_values.1.iter() {
    |                    ^^^^ method not found in `impl futures::Future<Output = PointValueResults>>`
    
help: consider `await`ing on the `Future` and calling the method on its `Output`
    |
236 |             for pr in point_values.1.await.iter() {

I guess the stream of futures does not like the tuple

Any idea how I can do this. Maybe the is a better way ?

I did this simply by using

let future = read_device(d);
let mapped_future = async { ("some flag".to_string(),, future.await) };

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.