Did I mention I actually would like this to work with async too?
Again, I found a solution with Box
(and Pin
)… (and Arc
and Mutex
)
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
async fn foo<C, Fut>(mut closure: C) -> i32
where
C: FnMut(Box<dyn FnMut() -> Pin<Box<dyn Future<Output = ()>>>>) -> Fut,
Fut: Future<Output = ()>,
{
let counter: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let counter2 = counter.clone();
closure(Box::new(move || {
let counter3 = counter2.clone();
Box::pin(async move {
*counter3.lock().unwrap() += 1;
println!("Hello!");
tokio::task::yield_now().await;
})
})).await;
let x = *counter.lock().unwrap();
x
}
#[tokio::main]
async fn main() {
let retval = foo(|mut callback| {
async move {
println!("Calling callback.");
callback().await;
println!("Called callback.");
}
}).await;
println!("Callback used {} times.", retval)
}
(Playground)
Output:
Calling callback.
Hello!
Called callback.
Callback used 1 times.
Is there any way to make this mess more clean?
Edit: Perhaps this is pushing things too far and macros are a better alternative for me.
To see my actual use case, click here.
pub struct FreqShift {
sender: Sender<Chunk<Complex<f32>>>,
receiver: Receiver<Chunk<Complex<f32>>>,
freq: watch::Sender<(isize, isize)>,
}
impl FreqShift {
pub fn new(numerator: isize, denominator: isize) -> Self {
let sender = Sender::new(16); // TODO
let receiver = Receiver::<Chunk<Complex<f32>>>::new();
let (tx, mut rx) = watch::channel((numerator, denominator));
let mut input = receiver.stream();
let output = sender.clone();
spawn(async move {
const TAU: f32 = std::f32::consts::TAU;
assert!(denominator >= 0, "denominator must be non-negative");
let calculate_phase = |(numerator, denominator): (isize, isize)| {
let mut phase_vec: Vec<Complex<f32>> =
Vec::with_capacity(denominator.try_into().unwrap());
let mut i: isize = 0;
for _ in 0..denominator {
let (im, re) = <f32>::sin_cos(i as f32 / denominator as f32 * TAU);
phase_vec.push(Complex::new(re, im));
i += numerator;
i %= denominator;
}
let mut phase_iter = phase_vec.into_iter().cycle();
move || phase_iter.next().unwrap()
};
let mut next_phase = calculate_phase((numerator, denominator));
let mut buf_pool = ChunkBufPool::<Complex<f32>>::new();
loop {
match input.recv().await {
Ok(input_chunk) => {
if rx.has_changed().unwrap() {
let (numerator, denominator) = *rx.borrow_and_update();
next_phase = calculate_phase((numerator, denominator));
}
let mut output_chunk = buf_pool.get_with_capacity(input_chunk.len());
for sample in input_chunk.iter() {
output_chunk.push(sample * next_phase());
}
output.send(output_chunk.finalize());
}
Err(err) => {
output.forward_error(err);
if err == RecvError::Closed {
return;
}
}
}
}
});
FreqShift {
sender,
receiver,
freq: tx,
}
}
pub fn set_freq(&self, numerator: isize, denominator: isize) {
self.freq.send_replace((numerator, denominator));
}
}
impl Consumer<Chunk<Complex<f32>>> for FreqShift {
fn receiver(&self) -> &Receiver<Chunk<Complex<f32>>> {
&self.receiver
}
}
impl Producer<Chunk<Complex<f32>>> for FreqShift {
fn connector(&self) -> SenderConnector<Chunk<Complex<f32>>> {
self.sender.connector()
}
}
(Code is yet under development and just a rough sketch yet.)
I would like to avoid the boilerplate of:
loop {
match input.recv().await {
Ok(input_chunk) => { /* … */ }
Err(err) => {
output.forward_error(err);
/* … */
if err == RecvError::Closed {
return;
}
}
}
}
I thought to make a function which takes two async closures. But I end up in a mess of Box
, dyn
, Arc
, move
, async
, Future
, &mut
, etc. Hence why I tried to break this problem down to a simpler toy example.
Anyway, like I said, I feel like complexity explodes, so maybe just live with the boilerplate and/or consider a macro. (Unless it's easier than I think.)