Hi I am having trouble with futures and warp.
The idea of the function is that I wanted a list of futures that all do a longish query and processess them at the same time with join_all.
So first I put all the futures into a vector forprocessing.
Below is a simplified version.
pub async fn get_sensors_difference_data_between_range(
&self,
sensor_ids: &[i32]
) -> Result<Vec<SensorDataValue>, BmosError> {
let mut return_results: Vec<SensorDataValue> = vec![];
// Rows is a vector of ids - romoved for simplicity
for chunk in &rows.into_iter().chunks(20) {
let mut furtue_results: Vec<Pin<Box<dyn Future<Output = Result<f64, BmosError>>>>> = vec![];
let mut sensors_results = vec![];
for r in chunk.into_iter() {
let sensor_id: i32 = r.get::<i32>(0);
furtue_results.push(Box::pin(self.get_data(sensor_id))); // get_data uses tokio-postgres to just collect some row data
sensors_results.push(sensor_id);
}
let joined_furture_results = join_all(furtue_results).await;
let iter = joined_furture_results.iter().zip(sensors_results.iter());
for (value, (sensor_id, sensor_type)) in iter {
if value.is_err() {
continue;
}
let val = value.as_ref().unwrap();
return_results.push(SensorDataValue::new(*sensor_id, &sensor_type, *val));
}
}
Ok(return_results)
}
This code works fine from my test case. ie
#[tokio::test]
async fn get_sensors_difference_data_between_range_test1() {
use super::*;
let result = bmos.get_sensors_difference_data_between_range(
&[611720, 612073, 612071, 612072, 612069],
).await;
println!("Result: {:?}", result);
}
but when I call it from a warp filter in its and_then() method I get errors
.and_then(get_sensors_difference_data_between_range);
| ^^^^^^^^ `std::cell::RefCell<itertools::groupbylazy::GroupInner<usize, std::vec::IntoIter<tokio_postgres::row::Row>, itertools::groupbylazy::ChunkIndex>>` cannot be shared between threads safely
if I comment join_all then I can compile.
Whats the issue here? I don't understand why I can compile the test but not when calling from a watrp filter.
I assume my Vec<Pin<Box<dyn Future<Output = Result<f64, BmosError>>>>> is the problem sharing across threads.
How could I correct this ?
Thanks