Where do I drop MPSC Sender to ensure receiver doesn't block forever?

Hi!

I have this snippet of code

let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
let tx = Arc::new(Mutex::new(tx));
(0..hpartition).into_par_iter().for_each(|i| {
  (0..vpartition).into_par_iter().for_each(|j| {
    let mut chunk = Chunk {};
    self.render_chunk(&mut chunk);
    let tx = tx.lock().unwrap();
    tx.send(chunk).unwrap();
  });
});

for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}

How do I correctly drop mpsc sender so the channel will close at just the right time? Right now, Since I am not closing the channel, My program just gets stuck forever because mpsc Receiver is still expecting values and it just blocks.

I am using rayon to create the parallel iterators.

I have checked the list of methods on IntoParallelIterator in rayon and I didn't find anything that can call tx at the end. So, I am quite confused as to how I can drop mpsc sender.

I think you want to do something like this:

fn main() {
    // No mutex or arc is required
    let (tx, rx) = mpsc::sync_channel(100000);
    
    (0..100).into_par_iter().for_each(|i| {
        tx.send(i).unwrap();
    });
    
    // Drop it here
    drop(tx);
    
    for chunk in rx {
        println!("{}", chunk);
    }
}

( playground )

3 Likes

Note that for_each will block the sequential code until it's done, so you're going to be sending everything into the channel before you receive any of it. You could wrap that in a spawn to run separately, or a join with one side sending and the other receiving.

1 Like

Thank you soo much.

I was thinking, If I put drop there, It would close the sender and compiler will just complain about it. So, I just didn't even try it but it just worked.

1 Like

Ah, yes, I just did this:

https://github.com/katharostech/cast2gif/blob/master/src/lib.rs#L59-L89

Oh, that's a good idea, too. I didn't think of that.

Thanks, I'll have to do something else then. I want to draw a chunk as soon as it's computed rather than waiting for all of them to be computed. This code with for_each makes it wait until all the chunks were computed.

And, I can not call the parallel iterators in a std::thread::spawn because I call self.render_chunk within and rustc complains about self not having 'static lifetime.

If you use spawn you don't need to use the parallel iterators. You just for loop and spawn the work:

let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);
for i in 0..hpartition {
  for j in 0..vpartition {
    let tx_ = tx.clone();
    rayon::spawn(move || {
        let mut chunk = Chunk {};
        self.render_chunk(&mut chunk);
        tx_.send(chunk).unwrap();
    }
  }
}

for chunk in rx {
// Fill sdl2::texture with the data from chunk and present it to canvas.
}

That will render all of your chunks on the thread pool, and in the thread that you started off on, it will loop through the chunks as they are rendered.


Rayon docs do say that parallel iterators can be more efficient than manually spawning, so maybe the join technique would be even better. I think that would look something like this:

let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);

rayon::join(
    move || {
        // Render chunks in one thread
        (0..hpartition).into_par_iter().for_each(|i| {
            (0..vpartition).into_par_iter().for_each(|j| {
                let mut chunk = Chunk {};
                self.render_chunk(&mut chunk);
                tx.send(chunk).unwrap();
            });
        });
    },
    // Recieve chunks in another
    move || {
        for chunk in rx {
        // Fill sdl2::texture with the data from chunk and present it to canvas.
        }
    }
);

I can not use rayon::spawn or std::thread::spawnbecause I callself.render_chunkand rustc complains aboutselfnot having'static` lifetime.

I can not use rayon::join because that requires all objects to have Send marker trait and inside for chunk in rx I access sdl2::texture::Texture and sdl2::render::Canvas<Window> and both of which don't implement Send marker trait.

OK, then I think you can use a thread scope to spawn. Stand by.

OK, this should fix the self not 'static problem:

let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);

rayon::scope(move |s| {
    s.spawn(move |_| {
        // Render chunks in one thread
        (0..hpartition).into_par_iter().for_each(|i| {
            (0..vpartition).into_par_iter().for_each(|j| {
                let mut chunk = Chunk {};
                self.render_chunk(&mut chunk);
                tx.send(chunk).unwrap();
            });
        });
    });
    
    // Recieve chunks in another
    s.spawn(move || {
        for chunk in rx {
            // Fill sdl2::texture with the data from chunk and present it to canvas.
        }
    });
});

The way this gets around the 'static requirement is with the scope. The scope makes sure that all of the spawned threads have exited before the rayon::scope() function call exits, which means that the reference to &self is guaranteed to be valid for the whole duration of the threads that reference it. This might require that self be Sync, though... I'm not sure. And it might not fix your sdl2 types not implementing Send.

1 Like

Thanks!

It does fixes the 'static life time requirement but I still have the Send marker trait related issues with sdl2 types.

We've had requests to let scope not require Send, which would let you do a scope.spawn for the parallel work and then do your receive work directly. I'm not sure about changing that by default, but I've been working on a raw_scope alternate to enable this.

2 Likes

OK, then maybe we just have to make sure that we don't spawn a new thread when handling the chunks:

let (tx, rx) = mpsc::sync_channel(hpartition * vpartition);

rayon::scope(move |s| {
    for i in 0..hpartition {
        for j in 0..vpartition {
            let tx_ = tx.clone();
            s.spawn(move |_| {
                let mut chunk = Chunk {};
                self.render_chunk(&mut chunk);
                tx_.send(chunk).unwrap();
            }
        }
    }
});

for chunk in rx {
    // Fill sdl2::texture with the data from chunk and present it to canvas.
}

A scope has to wait for all of its spawns to complete, just like the issue with for_each.

2 Likes

:man_facepalming: Duh. Yeah, that won't help.

This could work, but only if the first closure isn't run in another thread, but unfortunately it is so I don't think that helps and you would still have the Send issue:

rayon::scope(move |s| {
    for i in 0..hpartition {
        for j in 0..vpartition {
            let tx_ = tx.clone();
            s.spawn(move |_| {
                let mut chunk = Chunk {};
                self.render_chunk(&mut chunk);
                tx_.send(chunk).unwrap();
            }
        }
    }
    
    for chunk in rx {
        // Fill sdl2::texture with the data from chunk and present it to canvas.
    }
});

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.