Task vs Thread for program live job

Hello!
I am creating listener and sender for my app. I am struggling with what should I use.
The listener will listen for messages and then handle them. The sender will check the queue for message to send. This sound rather simple but I am struggle with what should I use.
Is it correct to create separate thread for listening for messages and then new tokio runtime inside for handling messages concurrently ?
Simplified example:

thread::spawn(|| {
//new Runtime

tokio::spawn(async move || {handle_message(message)});

});

I am afraid that when I will use multiple tokio runtimes there will be problem with number of available threads.

Thanks in advance !`

I have been struggling with this question recently.

In my case I have clients connecting via websockets and requesting whatever data streams they are interested in. I use tokio-tunstenite to handle the web sockets all, asyncronous tokio code there.

Meanwhile I use the nats crate to communicate to my NATS messaging server, where all those streams come from. At the time I did this nats did not support async.

That make a bit of a problem. How to communicate between those async and sync worlds? Solved by having mpsc channels communicate between my tokio world of async tasks doing the web socket stuff, and my normal of world synchronous threads running the nats stuff.

This is not satisfactory as I end up having to start a real thread, to handle nats, for every web socket connection.

Luckily, there is now a new version of the nats crate that supports async.

With that in place my entire program becomes async, all managed by tokio.

There would be only one tokio run time. The one that runs my main() function asynchronously from the get go. Everything else becomes async tokio tasks spawned from there.

So, it seems to me the ideal situation is to go all async throughout if at all possible.

It also seems that if one really needs real threads in ones code it's still preferable to have only one tokio runtime. One can spawn real threads from async tasks with tokio::spawn_blocking.

But how do you keep constantly listen for new connections ? My whole app is about sending and reading data from serial port. I can use tokio-serial to open and send data when I need but what about keep checking for new data ? This still sound like I need new thread for listening. As far as i know task::spawn could be use for task that will finish in some time. I also do not see anything like subscribe to serial port.

I'm not sure what you mean by "connections" with respect a serial port. A serial port just receives and transmits raw bytes. This is a point to point connection between two machines. The concept of "connections" as in a TCP/IP socket does not exist.

Anyway my work with serial pots also use tokio-serial. I have separate tokio tasks for receiving and sending on a serial port. The magic here is tokio::io::split() which returns a reader and a writer 'handle' for a single serial port. I can then pass those reader and writer 'handles' to whatever functions/tasks I like.

My serial port handling code then starts out like this:

    let mut settings = tokio_serial::SerialPortSettings::default();
    settings.baud_rate = 115200;

    let port = tokio_serial::Serial::from_path(&tty_path, &settings)?;

    // Make separate serial port reader and writer streams.
    let (serial_reader, serial_writer) = tokio::io::split(port);
    let serial_reader = Arc::new(Mutex::new(serial_reader));
    let serial_writer = Arc::new(Mutex::new(serial_writer));

You may or may not need to wrap them in Arc like that. I do because I am about to pass them to other tokio-tasks.

Then I start a serial listener task passing it the serial reader handle like so:

  // Run the threads!
    tokio::select! {
        _ = serial_reader_task(serial_reader.clone(), clients.clone(), nc.clone()) => {
            Err(Box::new(Ser2NetError::ThreadFailure{code:51}))
        }
        // Other threads here.
    }

The serial writer handle is passed to other tasks that will wait on other things and write to the serial port. I use select! here because all my tasks are expected to run forever. The select! catches any task failure and exits the whole program.

My serial reader task is then an loop that runs forever and looks like this:

fn serial_reader_task(
    serial_reader: SerialReader,
    // Other params...
) -> tokio::task::JoinHandle<Result<(), MyError>> {
    tokio::spawn(async move {
        loop {
            let buf: &mut [u8] = &mut [0; 1024];
            let mut serial_reader = serial_reader.lock().await;
            let n = match serial_reader.read(buf).await {
                Ok(n) => n,
                Err(e) => {
                    println!("Error reading serial port {:?}", e);
                    break;
                }
            };
            // println!("Serial got: {:?}", &buf[0..n]);

            // Do something with serial bytes.
        }
    })
}
1 Like

I over simplified with these connections.
I am looking at your answer and I was so damn close.
I was booed on other forum for using task::spawn for such thing and they suggested me to use thread::spawn instead. That is why I keep asking about composition of such thing. I have currently the same solution as you presented but made in C#. Reader, writer, seperate task for handling it all, etc..
So to it is okay to use task for such a long running thing like serial read?

When you are inside async code (i.e. inside tokio::spawn), you cannot use blocking IO. Non-blocking IO is fine. This is probably the basis of you being booed.

The tokio-serial crate provides non-blocking IO. What were you using to read from the serial port?

It has nothing to do with it being short or long running.

1 Like

I was planning to use tokio-serial to stay in tokio. Finally everything is clear. Thanks you all for help. Rust is awesome community !!

1 Like