Kill second thread using cloned sender channel

Hi!
I'm learning Rust, so sorry if this is a silly question.

The goal of this fn is simple, spawn a new thread for each string that stream receives, killing the previous thread by dropping the relative thread_tx channel. I would like not to use another channel to do so.

The problem is in the line previous_tx = Some(thread_tx.clone()); because, using the thread_tx.clone() creates a completely new sender channel, and if I drop it, the thread is obviously not blocked, as the reference in previous_tx is not used anywhere.

If I remove the .clone(), the compiler gives me the following error: use of moved value: thread_tx``.

How may I proceed with the code?

Thanks a lot for any suggestions!

ps. The internal thread is an example.

This is the code:

fn stream_from_stream(mut stream: mpsc::Receiver<String>) -> mpsc::Receiver<String> {
    let (tx, rx) = mpsc::channel::<String>();

    thread::spawn(move || {
        let mut previous_tx: Option<Sender<String>> = None;

        for str in stream {
            if let Some(tx) = previous_tx.take() {
                drop(tx); // Drop the previous sender
            }

            let thread_tx = tx.clone();
            previous_tx = Some(thread_tx.clone());

            thread::spawn(move || {
                for y in 0..50 {
                    if thread_tx.send(format!("{}:{} ", str, y)).is_err() {
                        println!("dropping the sender for string {}..", str);
                        break;
                    }
                    thread::sleep(Duration::from_millis(500));
                }
            });
        }
    });

    rx
}

First, I'm not sure which MPSC channel you're using, as there are lots of different implementations in different crates (std library, crossbeam, futures::channel, etc). The have different behavior, but in what I think you're asking, they behave in a similar manner.

You're using an "MPSC" channel, meaning it can have multiple producers (senders), but only one consumer (receiver). That means you can clone the tx side, but not the rx side. Typically you would clone the tx and move each clone to a different thread and all those threads could send messages to the one-and-only receiver thread. The receiver would receive all the messages in the order sent, regardless of which thread sent it. The magic of the channel is that it handles all the thread synchronization and signaling.

The point to remember in your example is that you are only creating one single channel. When you clone a new sender it is still referencing the same channel. That channel will remain open as long as the receiver is alive, and as long as at least one sender is alive.

So only when the last transmitter goes out of scope would the received be notified that the channel was closed.

By keeping a "previous" transmitter alive to clone the next one, you're keeping the channel open.

If you truly want to spawn a thread to handle a single message, then you need to create a new, separate channel for each thread, send one message from the one original transmitter, and then close that transmitter. The receiver will detect it and the thread can shut down.

Note that a channel that is meant to be used to transmit a single message is often called a "one shot". Some crates have a (slightly optimized) implementation specifically for this use case, but any channel would work if you follow the usage pattern.

1 Like

thread_tx is moved inside of the thread, you don't get to control it; other than it is informed when the sender is dropped.

You don't have to use another channel to signal each when to stop.
An Arc<AtomicUsize> can be a counter that gets incremented. Pass the current value and a clone to each thread.

I don't see the design point though, as there is no parallelism and just churn creating new threads.

@fpagliughi, @jonh
Thanks so much for your responses.

@fpagliughi I have created a rust playground to be more clear about channels and what I'm trying to accomplish.

You write:
The point to remember in your example is that you are only creating one single channel. When you clone a new sender it is still referencing the same channel. That channel will remain open as long as the receiver is alive, and as long as at least one sender is alive.

I think I wasn't clear enough in my post, sorry for the misunderstanding. The goal is to stop the second thread from sending new elements as the relative cloned Sender channel is dropped. So to be more clear, I'm not trying to drop the whole channel, which will continue to be alive indefinitely (in my example), but only to drop the cloned sender part of the channel, so the relative spawned thread will stop.

Anyway, the solution you suggested with adding a channel is something that I already have tried and is working, but I'm trying to push forward and trying to not use this additional channel.

@jonh,

You write:
An Arc<AtomicUsize> can be a counter that gets incremented. Pass the current value and a clone to each thread.

Could you give me an example of how to use Arc?

You write:
I don't see the design point though, as there is no parallelism and just churn creating new threads.

I think there is parallelism because I need to start a separate thread to generate (and push to the main channel) String until the new str is getting from stream. I'm not sure how may rewrite it without using a thread. If you have an example, please write it down and I will study it. :slight_smile:

So if I understand, you're considering a scenario where you have three threads; a sender, receiver, and a third thread to signal the sender when it's time to shut down?

If so, then no, you can't do that with the sender. I think your mental model of the structure is incorrect. Each sender that you clone is a separate object that references the channel; you can't communicate from one sender to another, and you can't drop an object in one thread if it's being used in another (at least not in safe Rust).

You probably need a different way for the tx thread to figure out that it is finished, close its sender, and exit cleanly.

But I agree with @jonh that you're threading model may be overkill. Spinning up a thread is pretty heavyweight. You typically wouldn't create a new one for each piece of data that you want to process, unless, maybe, they were massive time-consuming tasks. But even then, a dynamic thread pool would be better to limit the maximum number that can exist at one time.

1 Like

Hi @fpagliughi!

Thanks again for your reply.

After some days of testing, I was able to achieve the goal, using an Arc<Mutex<String>> and saving there the current phrase (from the stream), and checking each time if it is still the same. When different, break the loop and exit from the thread and in the meantime, another thread was created with the new phrase.

Anyway, the solution, as you and @jonh already pointed out, is expensive, not useful and not practical. I rewrote everything using an async/await with Tokio. It was easier than I was thinking.

Thanks again for your time.

PS. @fpagliughi I will select your last comment as a solution as you stuck with me for a longer time :wink:

1 Like

No worries. This did just seem like one of those times where you need to iterate through some design ideas to find the right one.