I used async_broadcast ,the idea is the moment channel is filled with 75% put 400ms sleep (reciever will send regulate speed up or down to sender) if its 50% filled drop sleep to 200ms, if its below 50% drop it to 0ms
extern crate async_broadcast;
use crate::async_broadcast::broadcast;
use async_broadcast::RecvError;
use crossbeam_channel::{never, select, tick, unbounded, Receiver, Sender};
use std::collections::VecDeque;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
enum SourceMsg {
/// Work Send to broadcaster.
Work(i64),
/// The source has defenitively stopped producing.
Stopped,
/// The tick was adjusted in response to a message.
TickAdjusted,
}
#[derive(Debug)]
enum RegulateSourceMsg {
SlowDown,
SpeedUp,
Stop,
}
fn is_lagging(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Lagged(1))
}
macro_rules! assert_lagged {
($e:expr, $n:expr) => {
match assert_err!($e) {
broadcast::error::TryRecvError::Lagged(n) => {
assert_eq!(n, $n);
}
_ => panic!("did not lag"),
}
};
}
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast(10);
let mut rx2 = rx1.clone();
let mut tick_adjusted = Arc::new(Mutex::new(true));
let (from_processor_sender, from_processor_receiver) = crossbeam_channel::unbounded();
from_processor_sender.send(RegulateSourceMsg::SlowDown);
from_processor_receiver.recv();
let h1 = tokio::spawn(async move {
loop {
let recieved_value = rx1.recv().await;
if recieved_value.is_err() {
match recieved_value {
Ok(_) => {}
Err(e) => println!("{:?} ", e),
}
} else {
match recieved_value {
Ok(SourceMsg::Work(value)) => {
println!("Recieved from RX1 {:?}", value);
// let length=tx.len().clone();
// check_buffer_size(length, &from_processor_sender, tick_adjusted.clone());
}
Err(_) => {}
_ => {}
}
}
}
});
let h2 = tokio::spawn(async move {
loop {
let recieved_value = rx2.recv().await;
if recieved_value.is_err() {
match recieved_value {
Ok(_) => {}
Err(e) => println!("{:?}", e),
}
} else {
match recieved_value {
Ok(SourceMsg::Work(value)) => {
println!("Recieved from RX2 {:?}", value);
// let length=tx.len();
// check_buffer_size(length, &from_processor_sender, tick_adjusted.clone());
}
Err(_) => {}
_ => {}
}
// check_buffer_size(tx.len(), &from_processor_sender, tick_adjusted.clone());
}
}
});
let mut current_ticker_duration = Some(20);
let mut ticker = Arc::new(Mutex::new(tick(Duration::from_millis(
current_ticker_duration.unwrap(),
))));
let _ = thread::spawn(move || {
loop {
select! {
recv(from_processor_receiver) -> msg => {
match msg {
Ok(RegulateSourceMsg::SlowDown) => {
current_ticker_duration = match current_ticker_duration {
Some(tick) => {
if tick > 100 {
Some(100)
} else {
Some(tick * 2)
}
},
None => continue,
};
let mut c=ticker.clone().lock().unwrap();
*c=tick(Duration::from_millis(current_ticker_duration.unwrap()));
},
Ok(RegulateSourceMsg::SpeedUp) => {
current_ticker_duration = match current_ticker_duration {
Some(tick) if tick > 2 => Some(tick / 2),
Some(tick) => Some(tick),
// If we're in "stopped" mode, re-start slowly.
None => Some(100),
};
let mut c=ticker.lock().unwrap();
*c=tick(Duration::from_millis(current_ticker_duration.unwrap()));
},
Ok(RegulateSourceMsg::Stop) => {
current_ticker_duration = None;
let mut c=ticker.lock().unwrap();
*c=never();
},
_ => panic!("Error receiving a RegulateSourceMsg."),
}
}
}
}
});
let b = thread::spawn(move || loop {
select! {
recv(ticker.clone().lock().unwrap()) -> msg => {
}
}
tx.try_broadcast(SourceMsg::Work(1));
});
b.join();
h1.await.unwrap();
h2.await.unwrap();
}
/// Check the current size of the buffer, and modulate the source accordingly.
fn check_buffer_size(
channel_occupied: usize,
sender: &Sender<RegulateSourceMsg>,
tick_adjusted: Arc<Mutex<bool>>,
) {
let mut tick = tick_adjusted.lock().unwrap();
if !*tick {
return;
}
match channel_occupied {
0..=50 => {
let _ = sender.send(RegulateSourceMsg::SpeedUp);
}
50..=75 => {
return;
}
75..=100 => {
let _ = sender.send(RegulateSourceMsg::SlowDown);
}
_ => {
let _ = sender.send(RegulateSourceMsg::Stop);
}
}
*tick = false;
}