Synchronize data in several threads with very different update rates

I got a compute thread which produces new data of type WaveForm = [f32; 1024] with a variable rate of something between 30...300 fps. Then there is an audio thread running at a much faster rate of 48kHz which should receive the data whenever it is new to create sound from it. Finally there is a UI thread running at 60 or 120 fps (depending on OS) which should also receive the data whenever it is new to visualize it.

Currently I'm using channels

let (tx_comp2audio, rx_comp2audio) = channel::<WaveForm>();
let (tx_comp2ui, rx_comp2ui) = channel::<WaveForm>();

and send the data whenever it is produced in the compute thread

tx_comp2audio.send(wave_form).unwrap();
tx_comp2ui.send(wave_form).unwrap();

In the audio thread I listen if there is new data like this

while let Ok(data) = rx_comp2audio.try_recv() {
    audio_wave_form = data;
}

And the same in the UI thread

while let Ok(data) = rx_comp2ui.try_recv() {
    visual_wave_form = data;
}

I do the while loop to drain the channel and to make sure that the audio and UI threads always have the newest data. It works fine (as far as I can hear and see the results).

Questions:

  1. Is this a proper way to do this or is there a better way performance-wise?
  2. For example, I looked into RwLock but couldn't find a way to only read the data in case it has been updated (except for sending an additional update signal). Also, at least the compute and audio threads should never be blocked.

Especially for the audio thread, it is desireable to receive new data as fast as possible.

Your while loop could be written as this, which in my opinion expresses the intent of “get the most recent data sent” better:

if let Some(data) = rx_comp2audio.try_iter().last() {
    audio_wave_form = data;
}

Both this and your loop should probably be the same performance-wise though.

Yes, channels are a fine way to do this (your code feels like the actor pattern, where channels are the solution).

You could try using a different channel library, but I don’t think the channels are a bottleneck in your application.

This problem feels like you want a watch channel, but the most used one is tokio::sync::watch, which would be overkill for your synchronous SPSC program. The tiny[1] watch crate exists[2], but I’ve never used it and it has just 2 reverse dependencies (it is by Alice Ryhl, tokio maintainer)


  1. 200 LoC and no dependencies ↩︎

  2. bonus: you can make both threads connect to one sender by Cloneing the receiver because WatchReceiver: Clone ↩︎

1 Like

Are there cases where the older data is needed? Otherwise, I would recommend Arc<Mutex<Option<Box<WaveForm>>>> or Arc<AtomicOptionBox<WaveForm>> from the atomicbox crate. This also saves you copying the 4KB of data.

1 Like

Maybe

use std::{
    cell::UnsafeCell,
    sync::atomic::{AtomicBool, AtomicU8, Ordering},
};

struct Buf {
    frame: UnsafeCell<[f32; 1024]>,
    old: UnsafeCell<(bool, bool)>, // .0: played; .1: displayed
    writing: AtomicBool,
    reading: AtomicU8,
}

unsafe impl Sync for Buf {}

static BUF: Buf = Buf {
    frame: UnsafeCell::new([0.0; 1024]),
    old: UnsafeCell::new((false, false)),
    writing: AtomicBool::new(false),
    reading: AtomicU8::new(0),
};

fn producer() {
    loop {
        // Wait until no consumers are reading
        while BUF.reading.load(Ordering::Acquire) > 0 {
            std::hint::spin_loop();
        }

        BUF.writing.store(true, Ordering::Release);
        let new_buf = [0.0; 1024]; // New data to write.
        unsafe {
            *BUF.frame.get() = new_buf;
            *BUF.old.get() = (false, false);
        }
        BUF.writing.store(false, Ordering::Release);
    }
}

fn consumer1() {
    loop {
        // Wait until the producer is not writing
        while BUF.writing.load(Ordering::Acquire) {
            std::hint::spin_loop();
        }

        BUF.reading.fetch_add(1, Ordering::SeqCst);

        // Re-check if the writing isn't started immediately after incrementing reading
        if BUF.writing.load(Ordering::Acquire) || unsafe { (*BUF.old.get()).0 } {
            BUF.reading.fetch_sub(1, Ordering::SeqCst);
            continue;
        }

        let _buf = unsafe { *BUF.frame.get() };
        unsafe {
            (*BUF.old.get()).0 = true;
        }
        BUF.reading.fetch_sub(1, Ordering::SeqCst);

        // Do something with _buf
    }
}

fn consumer2() {
    loop {
        // Wait until the producer is not writing
        while BUF.writing.load(Ordering::Acquire) {
            std::hint::spin_loop();
        }

        BUF.reading.fetch_add(1, Ordering::SeqCst);

        // Re-check if the writing isn't started immediately after incrementing reading
        if BUF.writing.load(Ordering::Acquire) || unsafe { (*BUF.old.get()).1 } {
            BUF.reading.fetch_sub(1, Ordering::SeqCst);
            continue;
        }

        let _buf = unsafe { *BUF.frame.get() };
        unsafe {
            (*BUF.old.get()).1 = true;
        }
        BUF.reading.fetch_sub(1, Ordering::SeqCst);

        // Do something with _buf
    }
}

I'm not sure if I used Ordering correctly...

1 Like

@cod10129 Thanks for the insights about different types of channels!
I agree that .try_iter().last() is the better choice semantically and the iterator might even be a tiny bit faster, who knows ; )

@Bruecki Thanks! Indeed, the older data is never needed. So I tried the AtomicOptionBox (what a name!) and it works fine, too.

Regarding the copying of data, just to make sure I understand correctly, so in the compute thread when I do

atomic_option_box.store(Some(Box::new(wave_form)), Ordering::AcqRel);

atomic_option_box just points at the wave form data, so nothing gets copied.
Then in the audio thread when I do

if let Some(new_data) = atomic_option_box_clone.take(Ordering::AcqRel) {
    audio_wave_form = *new_data;
}

the data gets copied from wave_form to audio_wave_form, right?

How does this compare to the channel solution in terms of copying?
Is there one more copy operation involved for channels?

With the given boundary conditions the data is copied twice, the same as when using channels. Depending on your usecase it might make sense to keep the waveform boxed by default, i.e. you make audio_wave_form and wave_form of type Box<WaveForm>. Spontanously, I cannot think of a reason to not do that but there might be other things going on.

Adidtionally, you might want to re-use the allocation in the storing thread.

Right, boxing the wave_form and audio_wave_form is no problem. But how to

...re-use the allocation in the storing thread.

I couldn't figure out.
Generally, I thought about how to avoid creating new boxes (pointers) and just shuffling around the existing boxes.
The one idea I could come up with is to use 3 boxes, one in the atomic_option_box, one in the storing thread and one in the reading thread. Then I could use swap() in the storing thread

storing_box = ... // new data
storing_box = atomic_option_box.swap(Some(storing_box), ...)

and in the reading thread

if new_data_exists {
    reading_box = atomic_option_box.swap(Some(reading_box), ...)
}

but only if some additional new_data_exists signal is received.

But then I get a new problem in the reading thread because this is an audio thread with an FnMut closure wich doesn't allow the swap(Some(reading_box)) because reading_box was moved into the closure and cannot be moved out. So I would have to create a new box in the reading thread each time, which is what I wanted to avoid :crazy_face: