Parallel load error - the trait `futures::Future` is not implemented for closure

So, I'm trying to load some data in parallel using futures_cpupool. However, as expected, I'm having some errors related to futures and the spawning function. The code:

#![feature(test)]
extern crate netcdf;
#[macro_use(s)]
extern crate ndarray;
extern crate test;
extern crate futures;
extern crate futures_cpupool;

use netcdf::variable::Variable;
use ndarray::{ArrayD, ArrayViewD, ArrayViewMutD, IxDyn, Axis};
use test::Bencher;
use futures::Future;
use futures_cpupool::CpuPool;

fn spawn(mut split: ArrayViewMutD<f32>, var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> Result<(), ()> {
    for i in 0..batch_size / 4 {
        split.slice_mut(s![i, .., .., ..])
            .assign(&(&var).array_at(&[offset + i, 0, patches[i % 30][0], patches[i % 30][1]],
                                        &[9, 1, 64, 64]).unwrap().remove_axis(Axis(1)));
    }
    Ok(())
}

fn get_batch_pool(pool: &CpuPool,  var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
    let mut offset = offset;
    let mut batch = ArrayD::<f32>::zeros(IxDyn(&[batch_size, 9, 64, 64]));
    let (split0, split1) = batch.view_mut().split_at(Axis(0), batch_size / 2);
    let (split00, split01) = split0.split_at(Axis(0), batch_size / 4);
    let (split10, split11) = split1.split_at(Axis(0), batch_size / 4);
    let future00 = pool.spawn(move || spawn(split00, var, offset, batch_size, patches));
    offset += batch_size / 4;
    let future01 = pool.spawn(move || spawn(split01, var, offset, batch_size, patches));
    offset += batch_size / 4;
    let future10 = pool.spawn(move || spawn(split10, var, offset, batch_size, patches));
    offset += batch_size / 4;
    let future11 = pool.spawn(move || spawn(split11, var, offset, batch_size, patches));
    offset += batch_size / 4;
    future00.join4(future01, future10, future11).wait().unwrap();
    return batch
}

fn main() {
    let batch_size = 100;
    let patches = [[60, 144], [60, 204], [60, 264], [60, 324], [60, 384],
        [60, 444], [60, 504], [60, 564], [60, 624], [120, 144],
        [120, 204], [120, 264], [120, 324], [120, 384], [120, 444],
        [120, 624], [180, 204], [180, 264], [180, 324], [180, 384],
        [180, 444], [180, 504], [180, 564], [180, 624], [240, 324],
        [240, 384], [240, 444], [240, 504], [240, 564], [240, 624]];
    let file = netcdf::open("original_pre_train_0.nc").unwrap();
    let temp = file.root.variables.get("thetao").unwrap();
    println!("{} - {}", temp.dimensions[0].name, temp.dimensions[0].len);
    println!("{} - {}", temp.dimensions[1].name, temp.dimensions[1].len);
    println!("{} - {}", temp.dimensions[2].name, temp.dimensions[2].len);
    println!("{} - {}", temp.dimensions[3].name, temp.dimensions[3].len);
    let mut batches = Vec::new();
    let pool = CpuPool::new_num_cpus();
    for o in 0..300 {
        println!("{}", o);
        let batch = get_batch_pool(pool, temp, o * 5, batch_size, &patches[..]);
        batches.push(batch);
    }
    println!("{:?}", batches[0].shape());
}

The issue is I'm getting this weird error, which I really do not understand why it comes, and given my not good knowledge of futures, I was hoping someone could help me understand it and how to fix my code:

error[E0277]: the trait bound `[closure@src/main.rs:60:31: 60:87 split00:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
  --> src/main.rs:60:25
   |
60 |     let future00 = pool.spawn(move || spawn(split00, var, offset, batch_size, patches));
   |                         ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:60:31: 60:87 split00:_, var:_, offset:_, batch_size:_, patches:_]`

error[E0277]: the trait bound `[closure@src/main.rs:62:31: 62:87 split01:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
  --> src/main.rs:62:25
   |
62 |     let future01 = pool.spawn(move || spawn(split01, var, offset, batch_size, patches));
   |                         ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:62:31: 62:87 split01:_, var:_, offset:_, batch_size:_, patches:_]`

error[E0277]: the trait bound `[closure@src/main.rs:64:31: 64:87 split10:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
  --> src/main.rs:64:25
   |
64 |     let future10 = pool.spawn(move || spawn(split10, var, offset, batch_size, patches));
   |                         ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:64:31: 64:87 split10:_, var:_, offset:_, batch_size:_, patches:_]`

error[E0277]: the trait bound `[closure@src/main.rs:66:31: 66:87 split11:_, var:_, offset:_, batch_size:_, patches:_]: futures::Future` is not satisfied
  --> src/main.rs:66:25
   |
66 |     let future11 = pool.spawn(move || spawn(split11, var, offset, batch_size, patches));
   |                         ^^^^^ the trait `futures::Future` is not implemented for `[closure@src/main.rs:66:31: 66:87 split11:_, var:_, offset:_, batch_size:_, patches:_]`

error: aborting due to 4 previous errors
2 Likes

The CPU pool's spawn method expects a future, not a function. For the purpose of asynchronously running a closure on the CPU pool, you can use pool.spawn_fn() instead.

1 Like

Thanks that indeed seems to solve the issue with the trait. Now, however, I have this:

error[E0621]: explicit lifetime required in the type of `var`
  --> src/main.rs:60:25
   |
54 | fn get_batch_pool(pool: &CpuPool,  var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
   |                                    --- consider changing the type of `var` to `&'static netcdf::variable::Variable`
...
60 |     let future00 = pool.spawn_fn(move || spawn(split00, var, offset, batch_size, patches));
   |                         ^^^^^^^^ lifetime `'static` required

Given that I explicitly await the completion of all work inside the same function, I assume it is safe to change the lifetime of var to static? Or is there something deeper I'm missing?

PS: Actually how to cast &T to &'static T even with unsafe?

If you have a look at the CpuPool's API contract, you will find that it expects the functor which is passed as a parameter to be 'static.

This basically means that your closure should not capture any data which has a finite lifetime. The rationale being that the Rust compiler cannot prove that said data will not go out of scope before the CpuPool is done with it.

Here, you are passing a reference of finite lifetime from file.root.variables.get("thetao"), which is why the compiler is not happy: this reference is not supported by CpuPool's interface contract.

Just out of curiosity, why are you using futures_cpupool here? Do you intend to make use of futures in your workflow later on? If not, a lifetime-aware parallelization library like Rayon or crossbeam's scoped thread pool might be a better fit for your needs.

Well, I that was actually the first thing I found fit. In rayon I did not find the right abstraction for an arbitrary function to be spawned on a thread pool. Most of what I saw rotates around mapping arrays around simple lambdas, while here I need to call an outside library. More or a less I need the most basic thing you can think of when you here a thread pool - execute a bunch of things in parallel with no crossing over. It's almost SMID. Yet I'm finding it a nightmare to do in Rust.

Use std::mem::transmute:

fn hide<T>(x: &T) -> &'static T {
   unsafe { std::mem::transmute(x) }
}

You can also put the type into an Arc and avoid a reference. In addition, I think the crossbeam crate has scoped threads where you can share stack references across (scoped) cross-thread execution.

1 Like

Ok almost there... now I get that the borrow checker is complaining that batch does not live long enough. How can I force the compiler to ignore this (an unsafe block still does not help):

  --> src/main.rs:61:23
   |
61 |         let (split10, split11) = split1.split_at(Axis(0), batch_size / 4);
   |                       ^^^^^^^
   |
   = note: to avoid this warning, consider using `_split11` instead

error[E0597]: `batch` does not live long enough
  --> src/main.rs:59:32
   |
59 |         let (split0, split1) = batch.view_mut().split_at(Axis(0), batch_size / 2);
   |                                ^^^^^ borrowed value does not live long enough
...
79 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

I guess this is because the closure might live more than the batch?

Here is a minimal modification of your solution based on crossbeam's scoped threads:

#![feature(test)]
extern crate netcdf;
#[macro_use(s)]
extern crate ndarray;
extern crate crossbeam;
extern crate test;

use netcdf::variable::Variable;
use ndarray::{ArrayD, ArrayViewD, ArrayViewMutD, IxDyn, Axis};
use test::Bencher;

fn spawn(mut split: ArrayViewMutD<f32>, var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) {
    for i in 0..batch_size / 4 {
        split.slice_mut(s![i, .., .., ..])
            .assign(&var.array_at(&[offset + i, 0, patches[i % 30][0], patches[i % 30][1]],
                                        &[9, 1, 64, 64]).unwrap().remove_axis(Axis(1)));
    }
}

fn get_batch_pool(var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
    let mut offset = offset;
    let mut batch = ArrayD::<f32>::zeros(IxDyn(&[batch_size, 9, 64, 64]));

    {

        let (split0, split1) = batch.view_mut().split_at(Axis(0), batch_size / 2);
        let (split00, split01) = split0.split_at(Axis(0), batch_size / 4);
        let (split10, split11) = split1.split_at(Axis(0), batch_size / 4);

        crossbeam::scope(|scope| {
            scope.spawn(move || spawn(split00, var, offset, batch_size, patches));
            offset += batch_size / 4;
            scope.spawn(move || spawn(split01, var, offset, batch_size, patches));
            offset += batch_size / 4;
            scope.spawn(move || spawn(split10, var, offset, batch_size, patches));
            offset += batch_size / 4;
            scope.spawn(move || spawn(split11, var, offset, batch_size, patches));
        });

    }

    batch
}

fn main() {
    let batch_size = 100;
    let patches = [[60, 144], [60, 204], [60, 264], [60, 324], [60, 384],
        [60, 444], [60, 504], [60, 564], [60, 624], [120, 144],
        [120, 204], [120, 264], [120, 324], [120, 384], [120, 444],
        [120, 624], [180, 204], [180, 264], [180, 324], [180, 384],
        [180, 444], [180, 504], [180, 564], [180, 624], [240, 324],
        [240, 384], [240, 444], [240, 504], [240, 564], [240, 624]];
    let file = netcdf::open("original_pre_train_0.nc").unwrap();
    let temp = file.root.variables.get("thetao").unwrap();
    println!("{} - {}", temp.dimensions[0].name, temp.dimensions[0].len);
    println!("{} - {}", temp.dimensions[1].name, temp.dimensions[1].len);
    println!("{} - {}", temp.dimensions[2].name, temp.dimensions[2].len);
    println!("{} - {}", temp.dimensions[3].name, temp.dimensions[3].len);
    let mut batches = Vec::new();
    for o in 0..300 {
        println!("{}", o);
        let batch = get_batch_pool(temp, o * 5, batch_size, &patches[..]);
        batches.push(batch);
    }
    println!("{:?}", batches[0].shape());
}

Now, this compiles (and hopefully works, you tell me), but it can still use improvements. For example, breaking up data manually into a fixed number of tasks like this is a bit of a multithreading antipattern as it results in less readable repetitive code, and will not scale with growing numbers of CPU cores.

Which is why I would rather spend extra time trying to use rayon's fork-join API in order to get code which naturally scales to an arbitrary number of CPUs.

EDIT : Fixed crossbeam scoped thread API usage

1 Like

Thanks for this. I really appreciate it. Technically I don't see why you can't detect the number of cores and split the batch accordingly. As for rayon, that indeed might be a better solution, however, I have not seen any examples for filling up data coming from an arbitrary another library call. Most of what I've seen revolves around standard numerical mappings arrays. Also not sure if I can use ndarray in there either. If there were good examples of how to do what I want, I would do, but otherwise I'm gona stick to the dirties solution, as now I have to interface with python and so on, and really don't have that much time to spent on optimizing this more (it took me quite a while just going from python to rust anyway).

1 Like

There is a crate called ndarray_parallel which implements Rayon-based automatic ndarray parallelization. But with it, you are probably going to face the same issue that you had before: this style of parallelization assumes that you have direct access to all the data structures that are being parallelized upon, and can let the parallelization library transparently split them for you. Whereas here you are accessing an external netcdf array in an index-driven fashion, and need to keep track of which indices you are accessing.

Where I think Rayon could strike back here is that this library uses a layered design, of which parallel container iteration is only the topmost layer. If that high-level interface does not fit your need, you can quite easily go lower-level as much as required, for example by implementing your own parallel iterator (which is basically a splittable task, similar to what can be seen in Intel TBB if you have used that library before), or using a more recursive fork-join programming style.

In any case, glad I could help!

So, I looked through the rayon docs a bit, and it turns out that there is actually a nice pre-made convenience function for defining a custom parallel iterator out of splittable data. Can you tell me if the following code works as expected?

#![feature(test)]
extern crate netcdf;
#[macro_use(s)]
extern crate ndarray;
extern crate rayon;
extern crate test;

use netcdf::variable::Variable;
use ndarray::{ArrayD, ArrayViewD, ArrayViewMutD, IxDyn, Axis};
use rayon::prelude::*;
use test::Bencher;

fn spawn(mut split: ArrayViewMutD<f32>, var: &Variable, offset: usize, patches: &[[usize;2]]) {
    for i in 0..split.len_of(Axis(0)) {
        split.slice_mut(s![i, .., .., ..])
            .assign(&var.array_at(&[offset + i, 0, patches[i % 30][0], patches[i % 30][1]],
                                        &[9, 1, 64, 64]).unwrap().remove_axis(Axis(1)));
    }
}

fn get_batch_pool(var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
    let mut batch = ArrayD::<f32>::zeros(IxDyn(&[batch_size, 9, 64, 64]));
    {
        // We will parallelize the inner loop using a TBB-like splittable task
        struct Splittable<'a> {
            view: ArrayViewMutD<'a, f32>,
            offset: usize,
        }
        let split_data = Splittable { view: batch.view_mut(), offset };

        // Here is how we split one of our tasks
        fn split_fun(s: Splittable) -> (Splittable, Option<Splittable>) {
            // We split data along the first axis, if it has >= 2 items
            let size = s.view.len_of(Axis(0));
            if size < 2 { return (s, None); }
            let half_size = size / 2;
            let (view1, view2) = s.view.split_at(Axis(0), half_size);

            // The offset is preserved across splits for NetCDF interaction
            let s1 = Splittable { view: view1, offset: s.offset };
            let s2 = Splittable { view: view2, offset: s.offset + half_size };
            (s1, Some(s2))
        };

        // Run a parallel loop using this strategy
        rayon::iter::split(split_data, split_fun).for_each(|s| {
            spawn(s.view, var, s.offset, patches);
        });
    }
    batch
}

fn main() {
    let batch_size = 100;
    let patches = [[60, 144], [60, 204], [60, 264], [60, 324], [60, 384],
        [60, 444], [60, 504], [60, 564], [60, 624], [120, 144],
        [120, 204], [120, 264], [120, 324], [120, 384], [120, 444],
        [120, 624], [180, 204], [180, 264], [180, 324], [180, 384],
        [180, 444], [180, 504], [180, 564], [180, 624], [240, 324],
        [240, 384], [240, 444], [240, 504], [240, 564], [240, 624]];
    let file = netcdf::open("original_pre_train_0.nc").unwrap();
    let temp = file.root.variables.get("thetao").unwrap();
    println!("{} - {}", temp.dimensions[0].name, temp.dimensions[0].len);
    println!("{} - {}", temp.dimensions[1].name, temp.dimensions[1].len);
    println!("{} - {}", temp.dimensions[2].name, temp.dimensions[2].len);
    println!("{} - {}", temp.dimensions[3].name, temp.dimensions[3].len);
    let mut batches = Vec::new();
    for o in 0..300 {
        println!("{}", o);
        let batch = get_batch_pool(temp, o * 5, batch_size, &patches[..]);
        batches.push(batch);
    }
    println!("{:?}", batches[0].shape());
}

EDIT: Also, I can't help but notice that the way you access the "patches" array in the spawn() function depends on the parallelization scheme that is being used, since it is based on the local index i of the active data slice, which is ultimately a parallelization detail. Is that behaviour really expected? Or did you actually mean to use the global index instead, i.e. to access patches[(i + offset) % 30]?

I'll test it when I get back. And yes there is an actual global index for the patches. You can basically imagine that the patches length is the same as batch_size, but for testing purposes, I did this.

1 Like

Here's a version with fixed patch indexing, then:

#![feature(test)]
extern crate netcdf;
#[macro_use(s)]
extern crate ndarray;
extern crate rayon;
extern crate test;

use netcdf::variable::Variable;
use ndarray::{ArrayD, ArrayViewD, ArrayViewMutD, IxDyn, Axis};
use rayon::prelude::*;
use test::Bencher;

fn spawn(mut split: ArrayViewMutD<f32>, var: &Variable, offset: usize, patches: &[[usize;2]]) {
    for loc_i in 0..split.len_of(Axis(0)) {
        let glob_i = loc_i + offset;
        split.slice_mut(s![loc_i, .., .., ..])
             .assign(&var.array_at(&[glob_i, 0, patches[glob_i % 30][0], patches[glob_i % 30][1]],
                                   &[9, 1, 64, 64]).unwrap().remove_axis(Axis(1)));
    }
}

fn get_batch_pool(var: &Variable, offset: usize, batch_size: usize, patches: &[[usize;2]]) -> ndarray::ArrayD<f32> {
    let mut batch = ArrayD::<f32>::zeros(IxDyn(&[batch_size, 9, 64, 64]));
    {
        // We will parallelize the inner loop using a TBB-like splittable task
        struct Splittable<'a> {
            view: ArrayViewMutD<'a, f32>,
            offset: usize,
        }
        let split_data = Splittable { view: batch.view_mut(), offset };

        // Here is how we split one of our tasks
        fn split_fun(s: Splittable) -> (Splittable, Option<Splittable>) {
            // We split data along the first axis, if it has >= 2 items
            let size = s.view.len_of(Axis(0));
            if size < 2 { return (s, None); }
            let half_size = size / 2;
            let (view1, view2) = s.view.split_at(Axis(0), half_size);

            // The offset is preserved across splits for NetCDF interaction
            let s1 = Splittable { view: view1, offset: s.offset };
            let s2 = Splittable { view: view2, offset: s.offset + half_size };
            (s1, Some(s2))
        };

        // Run a parallel loop using this strategy
        rayon::iter::split(split_data, split_fun).for_each(|s| {
            spawn(s.view, var, s.offset, patches);
        });
    }
    batch
}

fn main() {
    let batch_size = 100;
    let patches = [[60, 144], [60, 204], [60, 264], [60, 324], [60, 384],
        [60, 444], [60, 504], [60, 564], [60, 624], [120, 144],
        [120, 204], [120, 264], [120, 324], [120, 384], [120, 444],
        [120, 624], [180, 204], [180, 264], [180, 324], [180, 384],
        [180, 444], [180, 504], [180, 564], [180, 624], [240, 324],
        [240, 384], [240, 444], [240, 504], [240, 564], [240, 624]];
    let file = netcdf::open("original_pre_train_0.nc").unwrap();
    let temp = file.root.variables.get("thetao").unwrap();
    println!("{} - {}", temp.dimensions[0].name, temp.dimensions[0].len);
    println!("{} - {}", temp.dimensions[1].name, temp.dimensions[1].len);
    println!("{} - {}", temp.dimensions[2].name, temp.dimensions[2].len);
    println!("{} - {}", temp.dimensions[3].name, temp.dimensions[3].len);
    let mut batches = Vec::new();
    for o in 0..300 {
        println!("{}", o);
        let batch = get_batch_pool(temp, o * 5, batch_size, &patches[..]);
        batches.push(batch);
    }
    println!("{:?}", batches[0].shape());
}