I don't understand why tx1.closed() doesn't get triggered in this async select code

I asked this question to two different LLMs but they're bullshitting me with made-up explanations and non-functional hallucinated code. So I decided to try BI instead of "AI", or Brain Intelligence or plan B Intelligence.

When the first task holding its interior select! gets dropped early, I don't get the tx1.closed() branch get executed.

use std::time::Duration;

use tokio::sync::oneshot;

async fn some_operation() -> String {
    tokio::time::sleep(Duration::from_millis(500)).await;

    String::from("Some operation finished.")
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                    println!("Some operation cancelled");
                }
        }
    });

    tokio::spawn(async {
        tokio::time::sleep(Duration::from_millis(500)).await;
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Playground link

My guess is: When the first branch in main select wins, it's normal that the tx1.closed() doesn't get executed. When the second branch wins, then because the whole channel 1 with tx1 and rx1 gets dropped, everything in it is also dropped. And drops don't cascade inwards.

One question I have though is: What is the idiomatic way to get the tx1.closed() branch to also run at the drop event then? Without extra further polling happening ideally.

Thank you.

Your code returns from main() immediately after the “main select” completes. When that happens, the Tokio runtime is dropped (by the code generated by #[tokio::main]), and when the runtime is dropped, it cancels all spawned tasks.

If you want your spawned tasks to get the chance to complete, you have to await them before returning from main(), or in some other way signal when they are done.

3 Likes

When the main thread exists all other worker threads will be stopped and as a consequence tasks they are running will never complete. So if the second channel receives message first, then main just drops both receivers and exitst. Thus the first task you have spawn does not finish. If you want to explicitly wait for those tasks, you can join! them. For example:

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let task1 = tokio::spawn(async {
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                    println!("Some operation cancelled");
                }
        }
    });

    let task2 = tokio::spawn(async {
        tokio::time::sleep(Duration::from_millis(400)).await;
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
    
    let _ = tokio::join!(task1, task2);

    println!("THE END");
}

If you remove the race by shortening how long second tasks sleeps you will have following output:

rx2 completed first with Ok("two")
Some operation cancelled
THE END
1 Like

When I do reduce the time of second channel to 400 ms I see the cancellation of the first. But why doesn't it work if the second task wins with a race condition of 500ms each?

It won't even work if I explicitly await the tasks sequentially:

The message gets sent before the drop. Select does not perform both receives.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.