Passing messages between tokio async blocks

I am trying to send messages from on part of the program to another (kind of simple observer), where both parts are in loops (redis subscriber and websocket listener). And both running in tokio async blocks. Which would be the best approach (and feasible, since I had no luck with any implementation so far)? At first, I wanted to send some event structs, but now, I will be happy even if I just achieve sending strings.
Something like this (but working):

async fn b(mut rx:Receiver<&str>) {
    while let res = rx.recv().await {
        if res != None {
            println!("{:?}",res);
        } 
    }
} 
async fn a(mut rx:Receiver<&str>){   
    for i in 0..2 {
        tokio::spawn(b(rx));
    }; 
} 
#[tokio::main]
async fn main() -> Result<()> { 
    let (tx,mut rx) = channel(1);  
    tokio::spawn(a(rx));  
    tokio::spawn(async move {  tx.send("Hello!!!!").await; });
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok(())
}

My current error is: rx has an anonymous lifetime '_ but it needs to satisfy a 'static lifetime, but I encountered lot of other errors with different approaches (using Arc, Mutex etc.).

You can't use &str as the message type. Change it to String.

(&'static str will also work, but that only works with hard-coded string constants)

Ah, also, you can't have a function like a. Tokio's mpsc channel is multi-producer single-consumer, so having multiple receivers (consumers) is not allowed and will not compile.

The async-channel crate is mpmc, so it will allow it. However you would need to clone it in that case:

// here the async-channel crate
async fn a(rx: Receiver<String>){   
    for i in 0..2 {
        tokio::spawn(b(rx.clone()));
    }
} 

The correct way to write the while let loop is like this:

async fn b(mut rx: Receiver<String>) {
    while let Some(res) = rx.recv().await {
        println!("{:?}",res);
    }
}
2 Likes

Thank you very much. With this approach, I can even implement global sender and receiver (which was my initial need).

lazy_static! { 
    pub static ref RXTX : (Sender<String>,Receiver<String>) = async_channel::unbounded(); 
} 
...
async fn c(){ 
    while let res = RXTX.1.recv().await {
        println!("{:?}",res);
    }
}
...
#[tokio::main]
async fn main() -> Result<()> {  
    tokio::spawn(c());  
    tokio::spawn(async move {  RXTX.0.send("Hello!!!!".to_string()).await; });
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.