So, I'm trying to load some data in parallel using futures_cpupool
. However, as expected, I'm having some errors related to futures and the spawning function. The code:
#![feature(test)]
extern crate netcdf;
#[macro_use(s)]
extern crate ndarray;
extern crate test;
extern crate futures;
extern crate futures_cpupool;
use netcdf::variable::Variable;
use ndarray::{ArrayD, ArrayViewD, ArrayViewMutD, IxDyn, Axis};
use test::Bencher;
use futures::Future;
use futures_cpupool::CpuPool;
fn spawn(mut split: ArrayViewMutD<f32>, var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> Result<(), ()> {
for i in 0..batch_size / 4 {
split.slice_mut(s![i, .., .., ..])
.assign(&(&var).array_at(&[offset + i, 0, patches[i % 30][0], patches[i % 30][1]],
&[9, 1, 64, 64]).unwrap().remove_axis(Axis(1)));
}
Ok(())
}
fn get_batch_pool(pool: &CpuPool, var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
let mut offset = offset;
let mut batch = ArrayD::<f32>::zeros(IxDyn(&[batch_size, 9, 64, 64]));
let (split0, split1) = batch.view_mut().split_at(Axis(0), batch_size / 2);
let (split00, split01) = split0.split_at(Axis(0), batch_size / 4);
let (split10, split11) = split1.split_at(Axis(0), batch_size / 4);
let future00 = pool.spawn(move || spawn(split00, var, offset, batch_size, patches));
offset += batch_size / 4;
let future01 = pool.spawn(move || spawn(split01, var, offset, batch_size, patches));
offset += batch_size / 4;
let future10 = pool.spawn(move || spawn(split10, var, offset, batch_size, patches));
offset += batch_size / 4;
let future11 = pool.spawn(move || spawn(split11, var, offset, batch_size, patches));
offset += batch_size / 4;
future00.join4(future01, future10, future11).wait().unwrap();
return batch
}
fn main() {
let batch_size = 100;
let patches = [[60, 144], [60, 204], [60, 264], [60, 324], [60, 384],
[60, 444], [60, 504], [60, 564], [60, 624], [120, 144],
[120, 204], [120, 264], [120, 324], [120, 384], [120, 444],
[120, 624], [180, 204], [180, 264], [180, 324], [180, 384],
[180, 444], [180, 504], [180, 564], [180, 624], [240, 324],
[240, 384], [240, 444], [240, 504], [240, 564], [240, 624]];
let file = netcdf::open("original_pre_train_0.nc").unwrap();
let temp = file.root.variables.get("thetao").unwrap();
println!("{} - {}", temp.dimensions[0].name, temp.dimensions[0].len);
println!("{} - {}", temp.dimensions[1].name, temp.dimensions[1].len);
println!("{} - {}", temp.dimensions[2].name, temp.dimensions[2].len);
println!("{} - {}", temp.dimensions[3].name, temp.dimensions[3].len);
let mut batches = Vec::new();
let pool = CpuPool::new_num_cpus();
for o in 0..300 {
println!("{}", o);
let batch = get_batch_pool(pool, temp, o * 5, batch_size, &patches[..]);
batches.push(batch);
}
println!("{:?}", batches[0].shape());
}
The issue is I'm getting this weird error, which I really do not understand why it comes, and given my not good knowledge of futures, I was hoping someone could help me understand it and how to fix my code:
error[E0277]: the trait bound `[closure@src/main.rs:60:31: 60:87 split00:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
--> src/main.rs:60:25
|
60 | let future00 = pool.spawn(move || spawn(split00, var, offset, batch_size, patches));
| ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:60:31: 60:87 split00:_, var:_, offset:_, batch_size:_, patches:_]`
error[E0277]: the trait bound `[closure@src/main.rs:62:31: 62:87 split01:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
--> src/main.rs:62:25
|
62 | let future01 = pool.spawn(move || spawn(split01, var, offset, batch_size, patches));
| ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:62:31: 62:87 split01:_, var:_, offset:_, batch_size:_, patches:_]`
error[E0277]: the trait bound `[closure@src/main.rs:64:31: 64:87 split10:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
--> src/main.rs:64:25
|
64 | let future10 = pool.spawn(move || spawn(split10, var, offset, batch_size, patches));
| ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:64:31: 64:87 split10:_, var:_, offset:_, batch_size:_, patches:_]`
error[E0277]: the trait bound `[closure@src/main.rs:66:31: 66:87 split11:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
--> src/main.rs:66:25
|
66 | let future11 = pool.spawn(move || spawn(split11, var, offset, batch_size, patches));
| ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:66:31: 66:87 split11:_, var:_, offset:_, batch_size:_, patches:_]`
error: aborting due to 4 previous errors