I have something pretty similar to this:
use pbr;
use rayon::prelude::*;
pub struct Bring {
pub repositories: Vec<String>,
}
impl Bring {
pub fn execute(&self) {
let mut multibar = pbr::MultiBar::on(stderr());
self.repositories.par_iter().for_each(|repository| {
let mut bar = multibar.create_bar(100);
});
}
}
This doesn't compile:
error[E0277]: `std::sync::mpsc::Sender<pbr::multi::WriteMsg>` cannot be shared between threads safely
--> src/cli/bring/mod.rs:27:38
|
27 | self.repositories.par_iter().for_each(|repository| {
| ^^^^^^^^ `std::sync::mpsc::Sender<pbr::multi::WriteMsg>` cannot be shared between threads safely
|
= help: within `[closure@src/cli/bring/mod.rs:27:47: 52:10 multibar:&mut pbr::MultiBar<std::io::Stderr>]`, the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<pbr::multi::WriteMsg>`
= note: required because it appears within the type `(std::sync::mpsc::Sender<pbr::multi::WriteMsg>, std::sync::mpsc::Receiver<pbr::multi::WriteMsg>)`
= note: required because it appears within the type `pbr::MultiBar<std::io::Stderr>`
= note: required because it appears within the type `&mut pbr::MultiBar<std::io::Stderr>`
= note: required because it appears within the type `[closure@src/cli/bring/mod.rs:27:47: 52:10 multibar:&mut pbr::MultiBar<std::io::Stderr>]`
pbr::MultiBar
is Send
but it is !Sync
. pbr::ProgressBar
is Send
and Sync
.
I think that in order for this to work, I'll need to somehow zip together a collection of ProgressBar
s which are created ahead of time so that my for_each
will have a signature of
fn map(repository: String, progress_bar: ProgressBar<Pipe>)
How can I 1) create a list of ProgressBar<Pipe>
objects which can be consumed and 2) zip these two lists together so that I can get a mutable reference to a ProgressBar<Pipe>
within my for_each
invocation? Is this the right approach?
Maybe something like this ?
// Create one progress bar per repository on the main thread.
let pbars = self.repositories.iter()
.map(|_| multibar.create_bar(100))
.collect::<Vec<_>>();
// Rayon-based repository processing
self.repositories.par_iter().zip(pbars.par_iter())
.for_each(|(repository, mut bar)| {
// Do what you want 'cause a pbar is free...
});
Given pbr's MultiBar interface, I think this is the cleanest possible approach if you want one progress bar per repository and Rayon-based parallel repository processing. OTOH I'm not sure if processing "repositories", which from name sounds I/O-intensive and blocking, is something which Rayon will shine at 
1 Like
Thanks! I ended up finding a way forward like this, based on your input:
let mut multibar = MultiBar::on(stderr());
let bars: Vec<ProgressBar<Pipe>> = [0 .. self.repository.len()].iter().map(|_| {
multibar.create_bar(100)
}).collect();
self.repositories.par_iter().zip(bars).for_each(|(repository, mut bar)| {
// compiles
});
multibar.listen();
I'm not sure if this is a deadlock on the multibar.listen()
, but I'll have to try and see if it works.
As far as using Rayon for this, this is a CLI tool so I'm just bolting on concurrency for when I need to clone multiple Git repositories simultaneously, which admittedly isn't that often, I'm just being petty and using the twelve cylinders that Rust provides.
1 Like
This will not work, because rayon's for_each blocks until the loop is finished. So your progress bars will not be updated until the end of the computation.
As an alternative, you can start a dedicated thread which calls multibar.listen() before running the Rayon iteration.
Yeah that's probably what I'll have to do.
use rayon::prelude::*;
use api::project::Project;
use git2::Progress;
use pbr::MultiBar;
use pbr::Pipe;
use pbr::ProgressBar;
use std::cmp::min;
use std::error::Error;
use std::io::stderr;
use std::process;
use std::thread;
#[derive(Debug,StructOpt)]
pub struct Bring {
/// A list of repositories to fetch.
pub repositories: Vec<String>,
}
impl Bring {
pub fn execute(&self) {
let mut multibar = MultiBar::on(stderr());
// create an array of bars
let bars: Vec<ProgressBar<Pipe>> = [0 .. self.repositories.len()].iter().map(|_| {
multibar.create_bar(100)
}).collect();
let finish = thread::spawn(move || {
multibar.listen();
});
println!("{:?}", self.repositories);
self.repositories.par_iter().zip(bars).for_each(|(repository, mut bar)| {
println!("{}", repository);
let project = match Project::from(&repository) {
Ok(project) => project,
Err(_) => {
error!("Unable to parse repository URL: {}", repository);
process::exit(1);
}
};
// set prefix
bar.message(&format!("{}/{}: ", project.owner(), project.repository()));
// clone the repository; this is idempotent - will only clone if repo doesn't exist
info!("Cloning remote repository {}", &project);
let clone_result = project.clone(|progress| {
bar.set(self.progress(&progress));
true
});
// always finish
bar.finish_print(&format!("{}/{}: {}", project.owner(), project.repository(), match clone_result {
Ok(_) => "done",
Err(_) => "failed",
}));
// deal with errors or success
match clone_result {
Ok(_) => debug!("Successfully cloned repository."),
Err(e) => {
error!("Unable to clone repository {}: {}", &project.url(), e.description());
process::exit(1);
}
};
// always install hooks, regardless of whether we cloned or not
match project.configure() {
Ok(_) => debug!("Installed hooks successfully."),
Err(e) => error!("Failed to install Git hooks: {}", e),
};
});
finish.join().unwrap_or(());
}
/// Convert the progress of a Git clone into a value between 0 and 100.
fn progress(&self, progress: &Progress) -> u64 {
let (received, indexed, total) = (
progress.received_objects(),
progress.indexed_objects(),
progress.total_objects()
);
// get download progress
let download_progress = if min(received, total) == 0 {
0.0
} else {
((received as f64) / (total as f64)) * 100.0
};
// get index progress
let index_progress = if min(indexed, total) == 0 {
0.0
} else {
((indexed as f64) / (total as f64)) * 100.0
};
((download_progress * 0.5) + (index_progress * 0.5)).round() as u64
}
}
Sorry for wall of code, but something weird is happening; the body of the for_each
is only called once. Everything works, but maybe the zip
is screwing things up? If I print repositories
, I see all repositories that I have passed, but based on logs coming out of the for_each
loop, I'm only seeing it process one repository.
Any ideas?
Can you check what is the length of "bars" ? I suspect that this [0 .. self.repositories.len()]
expression does not do what you think it does. To be more precise, I think that is an array of one Range iterator, which means that after iterating over that array and mapping that single array element into a ProgressBar I would expect bars to be of length one.
By the way, any particular reason why you didn't like my initial self.repositories.iter().map() version ?
Trying it out presently, I just didn't try yours because I thought my version was working 
@HadrienG thanks so much for your help, all is working and ready for merge: https://github.com/naftulikay/nfty/pull/4