Need help organizing multithreaded audio code - involving sync <-> async

Hi,

while working on my room impulse response measurement software raumklang I came up with this code for recording new measurements:

Unfortunately, with all the things like async, jacks real-time audio thread and the non-realtime notification thread together, I find it really hard to organize my code. I would really appreciate any help on this.

The basic workflow is:

  1. connect to jack audio server
  2. user chooses input and output ports
  3. calibrate volume with pink-noise
  4. playback sine-sweep and record the measurement

Let me know if you need further information.

There may not be much you can do about the shape of the code; you are already using channels as the synchronization primitives between threads, and the syntactic boilerplate from these threads/async tasks is not going away.

In general terms, the file you linked is above my threshold for "this needs to be split into multiple modules." And you even linked directly to an inline module. Moving this code to another file is a very easy way to start paying down the organizational costs.

The second observation I have is that half of this file is a GUI. It's iced, which is very modular, but you have what looks like everything piled into a single view with deep nesting. If you cannot break these nests into their own widgets, it might be worth turning the match statement into a dispatcher that calls functions. Even that much reintroduces modularity and the functions can be reorganized however you like.

Sorry I can't be more specific. I haven't seen anything in that code which would suggest the organizational problems are inherently related to threading or audio. A few broadly applicable suggestions might good enough to get you started, though.

3 Likes

Thank you very much for your help. I really appreciate it, because I know it's a lot of effort to look into others code - especially if it's such a mess like my current problem case.

That helps a lot, because I thought I did something totally wrong there. That also was the main reason I asked here before trying to refactor things.

Totally agree. I knew that the code has to be separated from the UI right from the beginning, but at the time I started I had no clue what I would need and how to organize it, so I at least put it into the sub-module to ease the follow up refactoring.

That is good to know and it really helped more than you might think.

It's not completely obvious to me what semantics you want from your channel, but when treating apart some other audio libraries recently I found one using

Which is a very efficient "use the latest version of this value" API.

Alternatively, you might just want to chunk your processing and use try_iter to read from your channels.

I don't know which channels you mean, I'm using a lot of them for different purposes. But, because you mentioned the triple_buffer together with stds sync::mpsc channel, be aware, that a mpsc channel might need to allocate and thus is not save to use in the real-time audio thread.

But allocation is paid for by the sender, which is normally fine; whatever channel library you're using should have an equivalent to pull only until exhausted without blocking. You want the iterator mostly so you can slap a take(limit) on there to avoid starving other sources. I'll mention that normally I see an explicit ring buffer library instead of a channel for sample data, which has a fairly different API, you'd just need to make sure you're avoiding a blocking call.

Be aware this should only be done if you expect there to almost always be data from someone to process, otherwise you're essentially going to spin in a loop and possibly starve other threads: you can mitigate that with a call to yield_now in std::thread - Rust

Unfortunately you can't really do much better if you have multiple independent synchronous sources and you want to block until there's work from any of them. The fix there is most often to move all your producers to share a single mpsc of "work items", or move to async.

I have done some refactoring of the audio code. The main idea is to get similar drop behavior like provided by channels.

Is there a way to get rid of the need to explicitly call drop for pop_state in the following code:

let mut pop_state = producer.pop_iter();
match pop_state {
    measurement::PopState::Some(ref mut signal) => {
        write_signal(signal);

        drop(pop_state);
        ProcessHandlerState::Measurement(producer)
    }
    measurement::PopState::Exhausted(ref mut signal) => {
        let buf_empty = write_signal(signal);

        drop(pop_state);
        if buf_empty {
            ProcessHandlerState::Idle
        } else {
            ProcessHandlerState::Measurement(producer)
        }
    }
    measurement::PopState::ConsumerDropped => {
        out_port.fill(0.0);
        ProcessHandlerState::Idle
    }
}

The PopState looks like this:

pub enum PopState<'a> {
    Some(PopIter<'a, HeapCons<f32>>),
    Exhausted(PopIter<'a, HeapCons<f32>>),
    ConsumerDropped,
}

impl Producer {
    pub fn pop_iter(&mut self) -> PopState {
        if self.state.consumer_dropped.load(atomic::Ordering::Acquire) {
            PopState::ConsumerDropped
        } else if self.state.signal_exhausted.load(atomic::Ordering::Acquire) {
            PopState::Exhausted(self.signal_cons.pop_iter())
        } else {
            PopState::Some(self.signal_cons.pop_iter())
        }
    }
}

Full code

Possibly not without extra complexity that you might not find worthwhile: the issue is that PopState both has an (automatic) Drop and burrows from producer, so you'll need to get rid of at least one of those. The easiest way is something like the rather heavy-weight approach of adding an Arc<Mutex<Inner>> shared by PopIter and producer to get some inner state, but all that does is mean you now deadlock if you're trying to do something with producer while pop_state lives; probably not what you're after.

(Might be worthwhile asking the mods to break this out to a new thread so other people think to answer... not sure what the etiquette is there though.)

For the record. The explicit drop felt like a code smell in my data model, so I re-thought my implementation and eventually put the processing code into the measurement producer itself. With this solution all of the atomic stuff representing the internal state of a measurement moved to a central place - the measurement.

To (try and) answer the question nonetheless, here are some local refactorings I could come up with.

E.g.: you could replace the whole take+transform+set logic with one that only does the state transformations when they actually change something. This way the relevant section of code could look e.g. like

match &mut self.state {
    ProcessHandlerState::Idle => {
        out_port.fill(0.0);
    }
    ProcessHandlerState::Measurement(producer) => {
        let in_port = self.in_port.as_slice(process_scope);
        producer.recording_prod.push_slice(in_port);

        let mut write_signal = |signal: &mut PopIter<'_, HeapCons<f32>>| {
            let mut buf_empty = false;
            for o in out_port.iter_mut() {
                if let Some(s) = signal.next() {
                    *o = s * self.amplitued;
                } else {
                    *o = 0.0;
                    buf_empty = true;
                }
            }

            buf_empty
        };

        let stop = match &mut producer.pop_iter() {
            measurement::PopState::Some(signal) => {
                write_signal(signal);
                false
            }
            measurement::PopState::Exhausted(signal) => {
                let buf_empty = write_signal(signal);
                buf_empty
            }
            measurement::PopState::ConsumerDropped => {
                out_port.fill(0.0);
                true
            }
        };
        if stop {
            self.state = ProcessHandlerState::Idle;
        }
    }
};

(this behaves different on panics, not setting the state to Idle then)

I'm also noticing that the measurement::PopState::Some and measurement::PopState::Exhausted variants are containing the same kind of field, produced the exact same way, and processed the exact same way. This might be more nice to work with as a tuple (Flag, PopIter) rather than two separate Flag1(PopIter), Flag2(PopIter) variants..

pub enum PopState<'a> {
    Some(PopStateFlag, PopIter<'a, HeapCons<f32>>),
    ConsumerDropped,
}
pub enum PopStateFlag {
    NonExhausted,
    Exhausted,
}

impl Producer {
    pub fn pop_iter(&mut self) -> PopState {
        if self.state.consumer_dropped.load(atomic::Ordering::Acquire) {
            PopState::ConsumerDropped
        } else {
            let flag = if self.state.signal_exhausted.load(atomic::Ordering::Acquire) {
                PopStateFlag::Exhausted
            } else {
                PopStateFlag::NonExhausted
            };
            PopState::Some(flag, self.signal_cons.pop_iter())
        }
    }
}

and

match &mut self.state {
    ProcessHandlerState::Idle => {
        out_port.fill(0.0);
    }
    ProcessHandlerState::Measurement(producer) => {
        let in_port = self.in_port.as_slice(process_scope);
        producer.recording_prod.push_slice(in_port);

        let mut write_signal = |signal: &mut PopIter<'_, HeapCons<f32>>| {
            let mut buf_empty = false;
            for o in out_port.iter_mut() {
                if let Some(s) = signal.next() {
                    *o = s * self.amplitued;
                } else {
                    *o = 0.0;
                    buf_empty = true;
                }
            }

            buf_empty
        };

        let stop = match &mut producer.pop_iter() {
            measurement::PopState::Some(flag, signal) => {
                write_signal(signal) && matches!(flag, PopStateFlag::Exhausted)
            }
            measurement::PopState::ConsumerDropped => {
                out_port.fill(0.0);
                true
            }
        };
        if stop {
            self.state = ProcessHandlerState::Idle;
        }
    }
};

Then you can even inline the write_signal logic again because of the deduplication :slight_smile:


The same kind of deduplication (but perhaps less nicely) without changing the enum could have also been achieved with an or-pattern, I presume :thinking:


Anyway.. deduplicated:

match &mut self.state {
    ProcessHandlerState::Idle => {
        out_port.fill(0.0);
    }
    ProcessHandlerState::Measurement(producer) => {
        let in_port = self.in_port.as_slice(process_scope);
        producer.recording_prod.push_slice(in_port);

        let stop = match &mut producer.pop_iter() {
            measurement::PopState::Some(flag, signal) => {
                let mut buf_empty = false;
                for o in out_port.iter_mut() {
                    if let Some(s) = signal.next() {
                        *o = s * self.amplitued;
                    } else {
                        *o = 0.0;
                        buf_empty = true;
                    }
                }
                buf_empty && matches!(flag, PopStateFlag::Exhausted)
            }
            measurement::PopState::ConsumerDropped => {
                out_port.fill(0.0);
                true
            }
        };
        if stop {
            self.state = ProcessHandlerState::Idle;
        }
    }
};

While I'm at it, looking at

measurement::PopState::Some(flag, signal) => {
    let mut buf_empty = false;
    for o in out_port.iter_mut() {
        if let Some(s) = signal.next() {
            *o = s * self.amplitued;
        } else {
            *o = 0.0;
            buf_empty = true;
        }
    }
    buf_empty && matches!(flag, PopStateFlag::Exhausted)
}

which works with signal: impl ExactSizeIterator, you could also turn the empty-ness check into a simple case of:

measurement::PopState::Some(flag, signal) => {
    for o in out_port.iter_mut() {
        if let Some(s) = signal.next() {
            *o = s * self.amplitued;
        } else {
            *o = 0.0;
        }
    }
    signal.len() == 0 && matches!(flag, PopStateFlag::Exhausted)
}

which then presents itself for further rewriting e.g. pulling out the *o =

measurement::PopState::Some(flag, signal) => {
    for o in out_port.iter_mut() {
        *o = if let Some(s) = signal.next() {
            s * self.amplitued
        } else {
            0.0
        };
    }
    signal.len() == 0 && matches!(flag, PopStateFlag::Exhausted)
}

then using some Option combinators

measurement::PopState::Some(flag, signal) => {
    for o in out_port.iter_mut() {
        *o = signal.next().map_or(0.0, |s| s * self.amplitued);
    }
    signal.len() == 0 && matches!(flag, PopStateFlag::Exhausted)
}

and then a neat slice-method:

measurement::PopState::Some(flag, signal) => {
    out_port.fill_with(|| signal.next().map_or(0.0, |s| s * self.amplitued));
    signal.len() == 0 && matches!(flag, PopStateFlag::Exhausted)
}

By the way, while looking through the code, I've found 2 issues with this code

pub fn update_from_iter<I>(&mut self, iter: I) -> bool
where
    I: IntoIterator<Item = f32>,
{
    let mut new_peak = false;

    for s in iter {
        new_peak = new_peak || self.update(s);
    }

    new_peak
}

pub fn update(&mut self, sample: f32) -> bool {
    let sample_squared = sample * sample;
    self.square_sum += sample_squared;

    let mut new_peak = false;
    if self.peak < sample {
        self.peak = sample;
        new_peak = true;
    }

    let removed = self.buf.push_overwrite(sample_squared);
    if let Some(r) = removed {
        self.square_sum -= r;
    }

    new_peak
}
  1. new_peak = new_peak || self.update(s)'s behavior when || short-circuits seems to be wrong and/or unintended
  2. handling floating-point values the way they are handled with the (repeated) use of self.square_sum += … and then (later) self.square_sum -= … could result in the running sum value (slowly) drifting away over time, becoming further and further removed from the actual current squared sum of all values in buf
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.