Problem sharing futures across threads when call from warp filter

Hi I am having trouble with futures and warp.
The idea of the function is that I wanted a list of futures that all do a longish query and processess them at the same time with join_all.
So first I put all the futures into a vector forprocessing.
Below is a simplified version.

pub async fn get_sensors_difference_data_between_range(
        &self,
        sensor_ids: &[i32]
    ) -> Result<Vec<SensorDataValue>, BmosError> {

        let mut return_results: Vec<SensorDataValue> = vec![];

	// Rows is a vector of ids - romoved for simplicity   
        for chunk in &rows.into_iter().chunks(20) {
          
            let mut furtue_results: Vec<Pin<Box<dyn Future<Output = Result<f64, BmosError>>>>> = vec![];
            let mut sensors_results = vec![];
           
            for r in chunk.into_iter() {
                let sensor_id: i32 = r.get::<i32>(0);
                furtue_results.push(Box::pin(self.get_data(sensor_id)));  //   get_data uses tokio-postgres to just collect some row data 
                sensors_results.push(sensor_id);
             }

            let joined_furture_results = join_all(furtue_results).await;
            let iter = joined_furture_results.iter().zip(sensors_results.iter());

            for (value, (sensor_id, sensor_type)) in iter {

                if value.is_err() {
                    continue;
                }

                let val = value.as_ref().unwrap();

                return_results.push(SensorDataValue::new(*sensor_id, &sensor_type, *val));
            }
         }

        Ok(return_results)
    }

This code works fine from my test case. ie

#[tokio::test]
async fn get_sensors_difference_data_between_range_test1() {
use super::*;

let result = bmos.get_sensors_difference_data_between_range(
    &[611720, 612073, 612071, 612072, 612069],
).await;

println!("Result: {:?}", result);
}

but when I call it from a warp filter in its and_then() method I get errors

.and_then(get_sensors_difference_data_between_range);
    |                        ^^^^^^^^ `std::cell::RefCell<itertools::groupbylazy::GroupInner<usize, std::vec::IntoIter<tokio_postgres::row::Row>, itertools::groupbylazy::ChunkIndex>>` cannot be shared between threads safely

if I comment join_all then I can compile.

Whats the issue here? I don't understand why I can compile the test but not when calling from a watrp filter.

I assume my Vec<Pin<Box<dyn Future<Output = Result<f64, BmosError>>>>> is the problem sharing across threads.

How could I correct this ?

Thanks

Regarding formatting, here are a few tips that make it easier for us to help you:

  1. Use backticks as described here instead of four spaces to make code blocks, as you then get syntax-highlighting.
  2. When including types in text, surround the type with a single backtick, e.g. `Vec<...>` becomes Vec<...>.
  3. Make sure that your code has the correct indentation, e.g. your test has no indentation at all.
  4. Include the full error message as printed by cargo build. The error you posted looks like it was copied from an IDEs popup, but it doesn't contain much information, and doesn't really help us that much.

If you use the BoxFuture alias from the futures crate, you will never run into trouble with missing + Send bounds on futures. Additionally, in this case you don't need to box the future, because the future always comes from calling the same function, which means all of the actual future objects have the same type.

let mut future_results = Vec::new();
for r in chunk.into_iter() {
    let sensor_id: i32 = r.get::<i32>(0);
    future_results.push(self.get_data(sensor_id));
    sensors_results.push(sensor_id);
}

As for your error message, it complains about there being a RefCell inside the group_by combinator from the itertools crate. Unfortunately this means that group_by cannot be used in async code whatsoever.

Thanks I will read up on BoxFuture

I need the box as in my real function the future comes from call two functions but I tried to simplify the code.

I solved this by using BoxFuture and removing

chunk.into_iter()

It is unfortunate that something like rows.into_iter().chunks(20) does not work in this situation.

Hopefully it will be addressed as async code becomes more widespread.

Thanks again

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.