Description
I'm trying to refactor my code to abstract away duplicated logic but I can't quite figure quite how to make it work. In the playground I linked you can see how similar send_directory
and send_bench
are but they each initialize some variables outside of the loop and then do slightly different logic inside each loop potentially using those variables. I've commented one of the functions to highlight the duplicated code below I am trying to abstract away.
async fn send_directory(rx: Receiver<Frame>, rx_events: Receiver<()>, path: PathBuf) -> Result<()> {
// branch specific variable declaration
let canon_path = path.canonicalize()?;
let mut should_exit = false; ///////////////
loop { //
// check for any events //
if let Ok(event) = rx_events.try_recv() { // repeated code
should_exit = true; //
} //
//
match timeout(Duration::from_millis(100), rx.recv_async()).await? { //
Ok(frame) => { ///////////////
// branch specific stuff
use_thing_dir(frame, &canon_path).await?;
} ///////////////
_ => { //
// do stuff //
if should_exit { //
break; // repeated code
} //
} //
} //
} //
//
Ok(()) ///////////////
}
What I've Tried
At first I thought it wouldn't be too bad--create a closure to take in the frame
parameter but still has access to the initialized variables above. Then break out the loop logic into its own method and pass in my closure and execute every loop iteration. But I've not quiet been able to get it to work out because my action I am trying to pass in ends up not being FnMut.
async fn send_directory(rx: Receiver<Frame>, rx_events: Receiver<()>, path: PathBuf) -> Result<()> {
let canon_path = path.canonicalize()?;
let action = |frame: Frame| async move {
use_thing_dir(frame, &canon_path).await?;
Ok::<(), anyhow::Error>(())
};
recv_loop(&config, &rx, &mut action).await
}
async fn recv_loop<A, Fut>(config: &Config, rx: &flume::Receiver<Frame>, action: &mut A) -> Result<()>
where
A: FnMut(Frame) -> Fut,
Fut: futures::Future<Output = Result<()>> {
let mut event_reader = config::bus_reader(&config.event_bus)?;
let recv_duration = Duration::from_millis(100);
let mut should_exit = false;
loop {
match event_reader.try_recv() {
Ok(Event::EXIT) => {
log::info!("exiting recv_directory");
break;
}
Ok(Event::CLEANUP(Component::Receiver)) => {
should_exit = true;
}
_ => {}
}
match rx.recv_timeout(recv_duration) {
Ok(frame) => {
action(frame).await?;
}
_ => {
if should_exit {
break;
}
}
}
}
Ok(())
}