Merging a list of mutable items for Rayon


#1

I have something pretty similar to this:

use pbr;
use rayon::prelude::*;

pub struct Bring {
    pub repositories: Vec<String>,
}

impl Bring {
    pub fn execute(&self) {
        let mut multibar = pbr::MultiBar::on(stderr());
        self.repositories.par_iter().for_each(|repository| {
            let mut bar = multibar.create_bar(100);
        });
    }
}

This doesn’t compile:

error[E0277]: `std::sync::mpsc::Sender<pbr::multi::WriteMsg>` cannot be shared between threads safely
  --> src/cli/bring/mod.rs:27:38
   |
27 |         self.repositories.par_iter().for_each(|repository| {
   |                                      ^^^^^^^^ `std::sync::mpsc::Sender<pbr::multi::WriteMsg>` cannot be shared between threads safely
   |
   = help: within `[closure@src/cli/bring/mod.rs:27:47: 52:10 multibar:&mut pbr::MultiBar<std::io::Stderr>]`, the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<pbr::multi::WriteMsg>`
   = note: required because it appears within the type `(std::sync::mpsc::Sender<pbr::multi::WriteMsg>, std::sync::mpsc::Receiver<pbr::multi::WriteMsg>)`
   = note: required because it appears within the type `pbr::MultiBar<std::io::Stderr>`
   = note: required because it appears within the type `&mut pbr::MultiBar<std::io::Stderr>`
   = note: required because it appears within the type `[closure@src/cli/bring/mod.rs:27:47: 52:10 multibar:&mut pbr::MultiBar<std::io::Stderr>]`

pbr::MultiBar is Send but it is !Sync. pbr::ProgressBar is Send and Sync.

I think that in order for this to work, I’ll need to somehow zip together a collection of ProgressBars which are created ahead of time so that my for_each will have a signature of

fn map(repository: String, progress_bar: ProgressBar<Pipe>)

How can I 1) create a list of ProgressBar<Pipe> objects which can be consumed and 2) zip these two lists together so that I can get a mutable reference to a ProgressBar<Pipe> within my for_each invocation? Is this the right approach?


#2

Maybe something like this ?

// Create one progress bar per repository on the main thread.
let pbars = self.repositories.iter()
                             .map(|_| multibar.create_bar(100))
                             .collect::<Vec<_>>();

// Rayon-based repository processing
self.repositories.par_iter().zip(pbars.par_iter())
                            .for_each(|(repository, mut bar)| {
    // Do what you want 'cause a pbar is free...
});

Given pbr’s MultiBar interface, I think this is the cleanest possible approach if you want one progress bar per repository and Rayon-based parallel repository processing. OTOH I’m not sure if processing “repositories”, which from name sounds I/O-intensive and blocking, is something which Rayon will shine at :confused:


#3

Thanks! I ended up finding a way forward like this, based on your input:

let mut multibar = MultiBar::on(stderr());

let bars: Vec<ProgressBar<Pipe>> = [0 .. self.repository.len()].iter().map(|_| {
    multibar.create_bar(100)
}).collect();

self.repositories.par_iter().zip(bars).for_each(|(repository, mut bar)| {
    // compiles
});

multibar.listen();

I’m not sure if this is a deadlock on the multibar.listen(), but I’ll have to try and see if it works.

As far as using Rayon for this, this is a CLI tool so I’m just bolting on concurrency for when I need to clone multiple Git repositories simultaneously, which admittedly isn’t that often, I’m just being petty and using the twelve cylinders that Rust provides.


#4

This will not work, because rayon’s for_each blocks until the loop is finished. So your progress bars will not be updated until the end of the computation.

As an alternative, you can start a dedicated thread which calls multibar.listen() before running the Rayon iteration.


#5

Yeah that’s probably what I’ll have to do.


#6
use rayon::prelude::*;

use api::project::Project;

use git2::Progress;

use pbr::MultiBar;
use pbr::Pipe;
use pbr::ProgressBar;

use std::cmp::min;
use std::error::Error;
use std::io::stderr;
use std::process;
use std::thread;

#[derive(Debug,StructOpt)]
pub struct Bring {
    /// A list of repositories to fetch.
    pub repositories: Vec<String>,
}

impl Bring {

    pub fn execute(&self) {
        let mut multibar = MultiBar::on(stderr());

        // create an array of bars
        let bars: Vec<ProgressBar<Pipe>> = [0 .. self.repositories.len()].iter().map(|_| {
            multibar.create_bar(100)
        }).collect();

        let finish = thread::spawn(move || {
            multibar.listen();
        });

        println!("{:?}", self.repositories);

        self.repositories.par_iter().zip(bars).for_each(|(repository, mut bar)| {
            println!("{}", repository);

            let project = match Project::from(&repository) {
                Ok(project) => project,
                Err(_)      => {
                    error!("Unable to parse repository URL: {}", repository);
                    process::exit(1);
                }
            };

            // set prefix
            bar.message(&format!("{}/{}: ", project.owner(), project.repository()));

            // clone the repository; this is idempotent - will only clone if repo doesn't exist
            info!("Cloning remote repository {}", &project);
            let clone_result = project.clone(|progress| {
                bar.set(self.progress(&progress));
                true
            });

            // always finish
            bar.finish_print(&format!("{}/{}: {}", project.owner(), project.repository(), match clone_result {
                Ok(_)  => "done",
                Err(_) => "failed",
            }));

            // deal with errors or success
            match clone_result {
                Ok(_) => debug!("Successfully cloned repository."),
                Err(e) => {
                    error!("Unable to clone repository {}: {}", &project.url(), e.description());
                    process::exit(1);
                }
            };

            // always install hooks, regardless of whether we cloned or not
            match project.configure() {
                Ok(_)  => debug!("Installed hooks successfully."),
                Err(e) => error!("Failed to install Git hooks: {}", e),
            };
        });

        finish.join().unwrap_or(());
    }

    /// Convert the progress of a Git clone into a value between 0 and 100.
    fn progress(&self, progress: &Progress) -> u64 {
        let (received, indexed, total) = (
            progress.received_objects(),
            progress.indexed_objects(),
            progress.total_objects()
        );

        // get download progress
        let download_progress = if min(received, total) == 0 {
            0.0
        } else {
            ((received as f64) / (total as f64)) * 100.0
        };

        // get index progress
        let index_progress = if min(indexed, total) == 0 {
            0.0
        } else {
            ((indexed as f64) / (total as f64)) * 100.0
        };


        ((download_progress * 0.5) + (index_progress * 0.5)).round() as u64
    }
}

Sorry for wall of code, but something weird is happening; the body of the for_each is only called once. Everything works, but maybe the zip is screwing things up? If I print repositories, I see all repositories that I have passed, but based on logs coming out of the for_each loop, I’m only seeing it process one repository.

Any ideas?


#7

Can you check what is the length of “bars” ? I suspect that this [0 .. self.repositories.len()] expression does not do what you think it does. To be more precise, I think that is an array of one Range iterator, which means that after iterating over that array and mapping that single array element into a ProgressBar I would expect bars to be of length one.

By the way, any particular reason why you didn’t like my initial self.repositories.iter().map() version ?


#8

Trying it out presently, I just didn’t try yours because I thought my version was working :stuck_out_tongue:


#9

@HadrienG thanks so much for your help, all is working and ready for merge: https://github.com/naftulikay/nfty/pull/4