Tokio::broadcast::channel not working

use tokio::{sync::{broadcast::{Sender, Receiver, self}, oneshot, mpsc}, time};

#[tokio::main] async fn main() {

let (tx, mut rx) = broadcast::channel::<i32>(64);
//let (tx, mut rx) = mpsc::channel::<i32>(32);

tokio::spawn(async move {
    let mut i = 0;
    loop {
        tx.clone().send(i);
        i = i + 1;
        time::sleep(time::Duration::from_millis(1)).await;
    }
});

tokio::spawn(async move {
    loop {
        let mut result =  rx.try_recv();
        while result.is_ok() {
            let msgData = result.unwrap();
            println!("receive...{}", msgData);
            result =  rx.try_recv();
        }
        time::sleep(time::Duration::from_millis(2000)).await;
    }
});

loop {

}

}

Why only print receive 0,tokio::broadcast::channel not working

Your receiver is too slow. If you remove your while result.is_ok() your program will panic with the following message:

thread 'tokio-runtime-worker' panicked at src/main.rs:27:38:
called `Result::unwrap()` on an `Err` value: Lagged(654)
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Here a playground that makes the lagging behaviour of your receiver a little bit more clear.

2 Likes

Broadcast channels have a feature called "lagging". From the documentation:

Lagging

As sent messages must be retained until all Receiver handles receive a clone, broadcast channels are susceptible to the “slow receiver” problem. In this case, all but one receiver are able to receive values at the rate they are sent. Because one receiver is stalled, the channel starts to fill up.

This broadcast channel implementation handles this case by setting a hard upper bound on the number of values the channel may retain at any given time. This upper bound is passed to the channel function as an argument.

If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released. This frees up space for the new value. Any receiver that has not yet seen the released value will return RecvError::Lagged the next time recv is called.

Once RecvError::Lagged is returned, the lagging receiver’s position is updated to the oldest value contained by the channel. The next call to recv will return this value.

This behavior enables a receiver to detect when it has lagged so far behind that data has been dropped. The caller may decide how to respond to this: either by aborting its task or by tolerating lost messages and resuming consumption of the channel.

You need to handle this kind of error properly.

3 Likes