Using Tokio sync broadcast or async_broadcast ,create broadcast channel with signalling backpressure on sender from reciever

I used async_broadcast ,the idea is the moment channel is filled with 75% put 400ms sleep (reciever will send regulate speed up or down to sender) if its 50% filled drop sleep to 200ms, if its below 50% drop it to 0ms

extern crate async_broadcast;

use crate::async_broadcast::broadcast;
use async_broadcast::RecvError;
use crossbeam_channel::{never, select, tick, unbounded, Receiver, Sender};
use std::collections::VecDeque;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::sync::broadcast;

#[derive(Debug, Clone)]
enum SourceMsg {
    /// Work Send to broadcaster.
    Work(i64),
    /// The source has defenitively stopped producing.
    Stopped,
    /// The tick was adjusted in response to a message.
    TickAdjusted,
}

#[derive(Debug)]
enum RegulateSourceMsg {
    SlowDown,
    SpeedUp,
    Stop,
}

fn is_lagging(err: broadcast::error::RecvError) -> bool {
    matches!(err, broadcast::error::RecvError::Lagged(1))
}

macro_rules! assert_lagged {
    ($e:expr, $n:expr) => {
        match assert_err!($e) {
            broadcast::error::TryRecvError::Lagged(n) => {
                assert_eq!(n, $n);
            }
            _ => panic!("did not lag"),
        }
    };
}

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast(10);
    let mut rx2 = rx1.clone();
    let mut tick_adjusted = Arc::new(Mutex::new(true));
    let (from_processor_sender, from_processor_receiver) = crossbeam_channel::unbounded();
    from_processor_sender.send(RegulateSourceMsg::SlowDown);
    from_processor_receiver.recv();
    let h1 = tokio::spawn(async move {
        loop {
            let recieved_value = rx1.recv().await;
            if recieved_value.is_err() {
                match recieved_value {
                    Ok(_) => {}
                    Err(e) => println!("{:?} ", e),
                }
            } else {
                match recieved_value {
                    Ok(SourceMsg::Work(value)) => {
                        println!("Recieved from RX1  {:?}", value);
                        //   let length=tx.len().clone();
                        // check_buffer_size(length, &from_processor_sender, tick_adjusted.clone());
                    }
                    Err(_) => {}
                    _ => {}
                }
            }
        }
    });

    let h2 = tokio::spawn(async move {
        loop {
            let recieved_value = rx2.recv().await;
            if recieved_value.is_err() {
                match recieved_value {
                    Ok(_) => {}
                    Err(e) => println!("{:?}", e),
                }
            } else {
                match recieved_value {
                    Ok(SourceMsg::Work(value)) => {
                        println!("Recieved from RX2  {:?}", value);
                        //   let length=tx.len();
                        //        check_buffer_size(length, &from_processor_sender, tick_adjusted.clone());
                    }
                    Err(_) => {}
                    _ => {}
                }
                //  check_buffer_size(tx.len(), &from_processor_sender, tick_adjusted.clone());
            }
        }
    });

    let mut current_ticker_duration = Some(20);
    let mut ticker = Arc::new(Mutex::new(tick(Duration::from_millis(
        current_ticker_duration.unwrap(),
    ))));

    let _ = thread::spawn(move || {
        loop {
            select! {
                recv(from_processor_receiver) -> msg => {
                    match msg {
                        Ok(RegulateSourceMsg::SlowDown) => {
                            current_ticker_duration = match current_ticker_duration {
                                Some(tick) => {
                                    if tick > 100 {
                                        Some(100)
                                    } else {
                                        Some(tick * 2)
                                    }
                                },
                                None => continue,
                            };
                            let mut c=ticker.clone().lock().unwrap();
                            *c=tick(Duration::from_millis(current_ticker_duration.unwrap()));
                        },
                        Ok(RegulateSourceMsg::SpeedUp) => {
                            current_ticker_duration = match current_ticker_duration {
                                Some(tick) if tick > 2 => Some(tick / 2),
                                Some(tick) => Some(tick),
                                // If we're in "stopped" mode, re-start slowly.
                                None => Some(100),
                            };

                            let mut c=ticker.lock().unwrap();
                            *c=tick(Duration::from_millis(current_ticker_duration.unwrap()));
                        },
                        Ok(RegulateSourceMsg::Stop) => {
                            current_ticker_duration = None;
                            let mut c=ticker.lock().unwrap();
                            *c=never();
                        },
                        _ => panic!("Error receiving a RegulateSourceMsg."),
                    }
                }
            }
        }
    });

    let b = thread::spawn(move || loop {
        select! {
            recv(ticker.clone().lock().unwrap()) -> msg => {
        }
        }
        tx.try_broadcast(SourceMsg::Work(1));
    });
    b.join();
    h1.await.unwrap();
    h2.await.unwrap();
}

/// Check the current size of the buffer, and modulate the source accordingly.
fn check_buffer_size(
    channel_occupied: usize,
    sender: &Sender<RegulateSourceMsg>,
    tick_adjusted: Arc<Mutex<bool>>,
) {
    let mut tick = tick_adjusted.lock().unwrap();

    if !*tick {
        return;
    }

    match channel_occupied {
        0..=50 => {
            let _ = sender.send(RegulateSourceMsg::SpeedUp);
        }
        50..=75 => {
            return;
        }
        75..=100 => {
            let _ = sender.send(RegulateSourceMsg::SlowDown);
        }
        _ => {
            let _ = sender.send(RegulateSourceMsg::Stop);
        }
    }
    *tick = false;
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.