Async task does not run

I am struggling implementing an async solution for a quite simple problem. I think I kind of mix sync and async concepts, but I cannot wrap my head around how to solve that.

Basically, what I aim to do is:

  • have a task that opens a device, and send the events obtained into a channel.
  • in the main task, wait for these events on the other side of the channel
  • at the same time, use a separate channel to signal the "worker task" when it should shut down

When I try to implement this asynchronously, the issue is that my async task does not run. I understand that this is because I wait on the channel, not the task, but it is unclear to me how to solve this. If I could implement something like generators in python that "yield", I could await the task itself, but I think that is not possible (?)

Here is a simple MWE that illustrates what I try to do:

use std::process::ExitCode;
use tokio::fs::File;
use tokio;
use tokio::time::{Duration, sleep};
use tokio::sync::{oneshot, mpsc};

async fn task(file_name: &String,
              ev_tx: mpsc::Sender<u16>,
              stop_rx: oneshot::Receiver<()>)
{
    eprintln!("task started: {}", file_name);
    match File::open(file_name).await {
        Ok(handle) => {
            eprintln!("Success opening {}", file_name);

            loop {
                // ultimately: select on handle.read_exact and stop_tx
                ev_tx.send(1).await;
                tokio::time::sleep(Duration::from_millis(1000));
            }
        },
        Err(_) => {
            eprintln!("Failed opening {}", file_name);
        }
    }
}


#[tokio::main]
async fn main() -> ExitCode {
    let devname = "/some/device/name";

    let (mut ev_tx, mut ev_rx) = mpsc::channel::<u16>(10);
    let (stop_tx, stop_rx) = oneshot::channel::<()>();

    let task_handle = tokio::spawn(async move {
        task(&devname.to_string(), ev_tx, stop_rx).await;
    });

    loop {
        // in loop: eventually, send to stop_rx and break
        let ev = ev_rx.recv().await;
        if let Some(ev) = ev {
            eprintln!("ev={}", ev);
        } else {
            eprintln!("No ev received");
            break;
        }
    }

    task_handle.await.unwrap();
    ExitCode::SUCCESS
}

So my questions is:

  • How can I transform the above example to work as async fn?

What does the program print, if anything? Does it exit? What happens when you run it?

Usually futures don't do anything at all until you .await them. However, tokio::spawn is an exception, and it runs the future itself immediately.

So I don't see an issue with this code. You don't need to await task_handle for task() to run.

If you need to awiat multiple futures, there's futures::join! and futures::join_all.


BTW: in function arguments use &str, never &String. The &String type doesn't make sense in Rust, because String is an owning type and can grow, but & forbids taking ownership and forbids mutation, so you're both requiring and forbidding these things at the same time. &str is simply a temporary permission to use any string-like type.

1 Like

I feel stupid, now everything works as it should. I don't know what the problem was before :frowning:

1 Like

As disasters go, this is a small one. Not like when I deleted the "wrong" AWS disk recently!

Note that sleep returns a Future you need to .await, or it won't run. This might or might not have been the problem.

2 Likes

Another comment on the code, usually with channels you want to loop on them and exit the loop when you get an error. You get an error when the holder of the other half of the channel drops it, so this is a natural way to terminate async tasks (applies to sync channels as well for that matter). You don't generally need explicit messages to stop ( in my experience ).

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.