What is the idiomatic way to run a few "forever" threads in parallel?

I am trying to spawn several threads that each last "forever", with some sharing of messages between them as needed. In my instance one is a Tokio-NSQ message bus listener and another uses a Postgres connection, but they could be other processes.

In a post from one of the Tokio authors, I read that it is preferable for things that "live forever" to have their own dedicated thread.

The code below link to playground simulates this approach for two "forever" processes and compiles. But I am not sure if it is idiomatic. In particular, calling .join().unwrap() on both handles seems awkward. Why does line 33 never print when the "NSQ" process is clearly running? I tried using join!(message_listener_lives_forever, database_conn_lives_forever) but this did not compile. Any guidance in the direction of idiomaticity would be appreciated.

Code from playground (compiles):

use std::sync::mpsc;
use std::thread::sleep;

fn main() {
    let (tx, rx) = mpsc::channel();
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    
    let database_conn_lives_forever = std::thread::spawn(move || {
        // Connect to Postgres here
        for message in rx {
            println!("I'm gonna go persist {}", message);
            // Do some stuff in postgres with message
        }
    });
    
    let message_listener_lives_forever = std::thread::spawn(move || {
        // simulate receiving message from NSQ
        rt.block_on(async {
            let mut message = 0u32;
            loop {
                sleep(std::time::Duration::from_millis(10));
                message += 1;
                println!("Simulated incoming message: {}", message);
                // do some logic and filter out select messages
                if (message %3 == 0) {
                    tx.send(message);
                }
            }
        });
    }); 

    database_conn_lives_forever.join().unwrap();
    println!("DB conn joined"); // I never get printed???
    message_listener_lives_forever.join().unwrap();
}

Output:

Simulated incoming message: 1
Simulated incoming message: 2
Simulated incoming message: 3
I'm gonna go persist 3
Simulated incoming message: 4
Simulated incoming message: 5
Simulated incoming message: 6
I'm gonna go persist 6
...

The message_listener_lives_forever thread starts running as soon as you call spawn. The join method only returns after the associated thread exits, which is why your program never makes it to println!("DB conn joined"): the database_conn_lives_forever thread never exits, so database_conn_lives_forever.join() never returns.

1 Like

Thank you Cole for the quick reply. Your answer makes sense: I guess the database_conn_lives_forever.join().unwrap() is unnecessary- the code still runs when that line is commented out. But I do have one lingering confusion: commenting out both joins results in code that compiles, but doesn't do anything. Leaving either join or both joins in executes the desired result.

Oh wait no I get it. All the forever processes exit as soon as main exits. You just have to .join() any one of them to keep main from ever exiting. I get it now. Thanks!

3 Likes

Right, you got it! I was a little confused about this point myself, hence the quickly-deleted second paragraph in my previous post.

If you want the main thread to keep going (so the whole program isn't cancelled) without joining the child threads, you can stick a loop {} at the end of main, or maybe loop { std::thread::yield_now(); } which should be more efficient. [edit: ignore this, bad advice]

1 Like

Perhaps I'm missing something but it looks to me that if the main thread has a loop, with or without the yield_now, then it is a thread that will be scheduled to run along with the the other threads you have.

But if the main thread hangs in a join it will never be scheduled until the thread it is joining terminates.

I would expect use of join to be more efficient.

3 Likes

Even better would be something like this:

loop {
    std::thread::sleep(...);
    if ! everything_still_ok() {
        break;
    }
}

Or use a channel to notify the main thread of problems:

enum ControlMessage { 
    Quit,
    // ...
};

fn main() {
    let (ctl_tx, ctl_rx) = mpsc::channel::<ControlMessage>();
    /* Spawn threads; let them have clones of ctl_tx */
    for ctl in ctl_rx {
        use ControlMessage::*;
        match ctl {
            Quit => { break; },
            // ...
        }
    }
}
4 Likes

Or you could just spawn one less child thread, and make the main thread do its work instead.

4 Likes