Why I'm getting this bug on the tokio runtime?
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(4)', src/main.rs:38:36
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(9)', src/main.rs:38:36
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(25)', src/main.rs:38:36
Code :
/*
[dependencies]
futures-preview = "0.3.0-alpha.19"
*/
use tokio::sync::{mpsc, Mutex};
use std::sync::Arc;
type MutexRecv = Arc<Mutex<mpsc::Receiver<i32>>>;
async fn gen(nums: Vec<i32>) -> MutexRecv {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(async move {
for num in nums {
let sender = sender.clone();
sender.send(num).await.unwrap();
}
});
Arc::new(Mutex::new(receiver))
}
async fn sqr(c: MutexRecv) -> MutexRecv {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(num) = c.lock().await.recv().await {
let sender = sender.clone();
sender.send(num * num).await.unwrap();
}
});
Arc::new(Mutex::new(receiver))
}
async fn merge(cs: Vec<MutexRecv>) -> mpsc::Receiver<i32> {
let (sender, receiver) = mpsc::channel(1);
let output = |c: MutexRecv| async move {
while let Some(num) = c.lock().await.recv().await {
let sender = sender.clone();
sender.send(num).await.unwrap();
}
};
for num in cs {
let output = output.clone();
tokio::spawn(output(num));
}
receiver
}
#[tokio::main]
async fn main() {
// set up pipeline
let c = gen(vec![1, 2, 3, 4, 5]).await;
let copy_c = c.clone();
let out1 = sqr(copy_c).await;
let out2 = sqr(c).await;
// pipeline response
while let Some(x) = merge(vec![out1.clone(), out2.clone()]).await.recv().await {
println!("{}", x);
}
}