Thread 'tokio-runtime-worker'

Why I'm getting this bug on the tokio runtime?

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(4)', src/main.rs:38:36
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(9)', src/main.rs:38:36
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(25)', src/main.rs:38:36

Code :

/*
[dependencies]
futures-preview = "0.3.0-alpha.19"
*/
use tokio::sync::{mpsc, Mutex};
use std::sync::Arc;

type MutexRecv = Arc<Mutex<mpsc::Receiver<i32>>>;

async fn gen(nums: Vec<i32>) -> MutexRecv {
    let (sender, receiver) = mpsc::channel(1);
    tokio::spawn(async move {
        for num in nums {
            let sender = sender.clone();
            sender.send(num).await.unwrap();
        }
    });
    Arc::new(Mutex::new(receiver))
}

async fn sqr(c: MutexRecv) -> MutexRecv {
    let (sender, receiver) = mpsc::channel(1);
    tokio::spawn(async move {
        while let Some(num) = c.lock().await.recv().await {
            let sender = sender.clone();
            sender.send(num * num).await.unwrap();
        }
    });
    Arc::new(Mutex::new(receiver))
}

async fn merge(cs: Vec<MutexRecv>) -> mpsc::Receiver<i32> {
    let (sender, receiver) = mpsc::channel(1);
    
    let output = |c: MutexRecv| async move {
        while let Some(num) = c.lock().await.recv().await {
            let sender = sender.clone();
            sender.send(num).await.unwrap();
        }
    };
    
    
    for num in cs {
        let output = output.clone();
        tokio::spawn(output(num));
    }
    
    receiver
}

#[tokio::main]
async fn main() {
    // set up pipeline
    let c = gen(vec![1, 2, 3, 4, 5]).await;
    let copy_c = c.clone();
    let out1 = sqr(copy_c).await;
    let out2 = sqr(c).await;
    
    // pipeline response
    while let Some(x) = merge(vec![out1.clone(), out2.clone()]).await.recv().await {
        println!("{}", x);
    }
    
}

code on play.rust-lang.org

Golang version

Minimized example showing the same error:

use tokio::sync::mpsc;

async fn send_twice() -> mpsc::Receiver<()> {
    let (sender, receiver) = mpsc::channel(1);
    tokio::spawn(async move {
        sender.send(()).await.unwrap();
        sender.send(()).await.unwrap();
    });
    receiver
}

#[tokio::main]
async fn main() {
    while let Some(x) = send_twice().await.recv().await {
        println!("{:?}", x);
    }
}

To understand what's going on, we can desugar the while let loop inside main; according to the reference, it is equivalent to the following:

loop {
    match send_twice().await.recv().await {
        Some(x) => {
            println!("{:?}", x);
        }
        _ => break,
    }
}

Here the problem is clear:

  • we're creating a new receiver at each iteration of the loop, pull one item out of it, then discard;
  • however, the sender tries to send more, therefore failing and panicking.

To fix the problem, we have to create the receiver before looping:

let mut receiver = send_twice().await;
while let Some(x) = receiver.recv().await {
    println!("{:?}", x);
}

Playground


Side note: that's why it's usually better to use expect and not unwrap. Here, the only way to see the exact erroring place is to enable stacktraces, which are quite verbose here, due to them containing Tokio runtime stack frames together with your own ones. Having different messages for different expects would, at least, allow to immediately find out what operation exactly has failed.

1 Like

Please be aware that just because the error message contains "tokio-runtime-worker", that does not make it a bug in Tokio. It just means that something went wrong in some asynchronous code running inside Tokio, which in this case is your own code.

2 Likes