let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
let tx = Arc::new(Mutex::new(tx));
(0..hpartition).into_par_iter().for_each(|i| {
(0..vpartition).into_par_iter().for_each(|j| {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
let tx = tx.lock().unwrap();
tx.send(chunk).unwrap();
});
});
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
How do I correctly drop mpsc sender so the channel will close at just the right time? Right now, Since I am not closing the channel, My program just gets stuck forever because mpsc Receiver is still expecting values and it just blocks.
I am using rayon to create the parallel iterators.
I have checked the list of methods on IntoParallelIterator in rayon and I didn't find anything that can call tx at the end. So, I am quite confused as to how I can drop mpsc sender.
fn main() {
// No mutex or arc is required
let (tx, rx) = mpsc::sync_channel(100000);
(0..100).into_par_iter().for_each(|i| {
tx.send(i).unwrap();
});
// Drop it here
drop(tx);
for chunk in rx {
println!("{}", chunk);
}
}
Note that for_each will block the sequential code until it's done, so you're going to be sending everything into the channel before you receive any of it. You could wrap that in a spawn to run separately, or a join with one side sending and the other receiving.
I was thinking, If I put drop there, It would close the sender and compiler will just complain about it. So, I just didn't even try it but it just worked.
Thanks, I'll have to do something else then. I want to draw a chunk as soon as it's computed rather than waiting for all of them to be computed. This code with for_each makes it wait until all the chunks were computed.
And, I can not call the parallel iterators in a std::thread::spawn because I call self.render_chunk within and rustc complains about self not having 'static lifetime.
If you use spawn you don't need to use the parallel iterators. You just for loop and spawn the work:
let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
for i in 0..hpartition {
for j in 0..vpartition {
let tx_ = tx.clone();
rayon::spawn(move || {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
tx_.send(chunk).unwrap();
}
}
}
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
That will render all of your chunks on the thread pool, and in the thread that you started off on, it will loop through the chunks as they are rendered.
Rayon docs do say that parallel iterators can be more efficient than manually spawning, so maybe the join technique would be even better. I think that would look something like this:
let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
rayon::join(
move || {
// Render chunks in one thread
(0..hpartition).into_par_iter().for_each(|i| {
(0..vpartition).into_par_iter().for_each(|j| {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
tx.send(chunk).unwrap();
});
});
},
// Recieve chunks in another
move || {
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
}
);
I can not use rayon::spawn or std::spawnbecause I callself.render_chunkand rustc complains aboutselfnot having'static` lifetime.
I can not use rayon::join because that requires all objects to have Send marker trait and inside for chunk in rx I access sdl2::texture::Texture and sdl2::render::Canvas<Window> and both of which don't implement Send marker trait.
let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
rayon::scope(move |s| {
s.spawn(move |_| {
// Render chunks in one thread
(0..hpartition).into_par_iter().for_each(|i| {
(0..vpartition).into_par_iter().for_each(|j| {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
tx.send(chunk).unwrap();
});
});
});
// Recieve chunks in another
s.spawn(move || {
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
});
});
The way this gets around the 'static requirement is with the scope. The scope makes sure that all of the spawned threads have exited before the rayon::scope() function call exits, which means that the reference to &self is guaranteed to be valid for the whole duration of the threads that reference it. This might require that self be Sync, though... I'm not sure. And it might not fix your sdl2 types not implementing Send.
We've had requests to let scope not require Send, which would let you do a scope.spawn for the parallel work and then do your receive work directly. I'm not sure about changing that by default, but I've been working on a raw_scope alternate to enable this.
OK, then maybe we just have to make sure that we don't spawn a new thread when handling the chunks:
let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
rayon::scope(move |s| {
for i in 0..hpartition {
for j in 0..vpartition {
let tx_ = tx.clone();
s.spawn(move |_| {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
tx_.send(chunk).unwrap();
}
}
}
});
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
This could work, but only if the first closure isn't run in another thread, but unfortunately it is so I don't think that helps and you would still have the Send issue:
rayon::scope(move |s| {
for i in 0..hpartition {
for j in 0..vpartition {
let tx_ = tx.clone();
s.spawn(move |_| {
let mut chunk = Chunk {};
self.render_chunk(&mut chunk);
tx_.send(chunk).unwrap();
}
}
}
for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}
});