Data access in audio callback

Could someone explain how to get access to the data source which is a ring buffer within the audio callback. I have a simple requirement which is to stream to the default output device. I chose to use cpal for this module. I think the answer may be to use a closure but not sure how to do that.

use cpal::{Data, Sample, SampleFormat};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::vec;

use crate::app::common::ringb;

//==================================================================================
// Audio output 
pub struct AudioData<'a> {
    rb_audio : &'a ringb::SyncByteRingBuf,
}

impl AudioData<'_> {
	// Create a new instance and initialise the default data
	pub fn new(rb_audio : & ringb::SyncByteRingBuf) -> AudioData {
		
        AudioData {
            rb_audio : rb_audio,
        }
    }

    // Create an audio output stream
    pub fn init_audio(&mut self) {
        let host = cpal::default_host();
        let device = host.default_output_device().expect("no output device available");

        let mut supported_configs_range = device.supported_output_configs()
        .expect("error while querying configs");
        let supported_config = supported_configs_range.next()
        .expect("no supported config?!")
        .with_max_sample_rate();

        let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
        let sample_format = supported_config.sample_format();
        let config = supported_config.into();
        let stream = match sample_format {
        SampleFormat::F32 => device.build_output_stream(&config, write_audio::<f32>, err_fn),
        SampleFormat::I16 => device.build_output_stream(&config, write_audio::<i16>, err_fn),
        SampleFormat::U16 => device.build_output_stream(&config, write_audio::<u16>, err_fn),
        }.unwrap();

        // Start the default stream
        stream.play().unwrap();
    }

}

// Callback when the audio output needs more data
fn write_audio<T: Sample>(data: &mut [T], _: &cpal::OutputCallbackInfo) {
    let mut data: Vec<f32> = vec![0.0; data.len()];
    let mut i = 0;
    // Check the ring buffer for data
    let audio_data = self.rb_audio.read().read(&mut data);
    match audio_data {
        Ok(_sz) => {
            for sample in data.iter_mut() {
                *sample = data[i];
                i += 1;
            }
        }
        Err(e) => println!("Read error on rb_iq {:?}. Skipping cycle.", e),
    }
}

You'll need to take ownership of the data you need in the callback somehow. Without knowing more about how your ringbuffer type works it's hard to know if this will work exactly correctly, but here's one potential way to fix it

use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Data, Sample, SampleFormat};
use std::vec;

use crate::app::common::ringb;

//==================================================================================
// Audio output
pub struct AudioData {
    rb_audio: ringb::SyncByteRingBuf,
}

impl AudioData {
    // Create a new instance and initialise the default data
    pub fn new(rb_audio: ringb::SyncByteRingBuf) -> AudioData {
        AudioData { rb_audio }
    }

    // Create an audio output stream
    pub fn init_audio(self) {
        let host = cpal::default_host();
        let device = host
            .default_output_device()
            .expect("no output device available");

        let mut supported_configs_range = device
            .supported_output_configs()
            .expect("error while querying configs");
        let supported_config = supported_configs_range
            .next()
            .expect("no supported config?!")
            .with_max_sample_rate();

        let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
        let sample_format = supported_config.sample_format();
        let config = supported_config.into();
        let stream = match sample_format {
            SampleFormat::F32 => device.build_output_stream(
                &config,
                move |data, info| self.write_audio::<f32>(data, info),
                err_fn,
            ),
            SampleFormat::I16 => device.build_output_stream(
                &config,
                move |data, info| self.write_audio::<i16>(data, info),
                err_fn,
            ),
            SampleFormat::U16 => device.build_output_stream(
                &config,
                move |data, info| self.write_audio::<u16>(data, info),
                err_fn,
            ),
        }
        .unwrap();

        // Start the default stream
        stream.play().unwrap();
    }

// Callback when the audio output needs more data
    fn write_audio<T: Sample>(&mut self, data: &mut [T], _: &cpal::OutputCallbackInfo) {
        let mut data: Vec<f32> = vec![0.0; data.len()];
        let mut i = 0;

        let audio_data = self.rb_audio.read().read(&mut data);
        match audio_data {
            Ok(_sz) => {
                for sample in data.iter_mut() {
                    *sample = data[i];
                    i += 1;
                }
            }
            Err(e) => println!("Read error on rb_iq {:?}. Skipping cycle.", e),
        }
    }
}

The closure build_output_stream accepts is FnMut so if we move the data into that closure we can mutate it freely. If your ringbuffer type needs to be shared you'll have to use Arc and interior mutability[1] to share the buffer with the closure.

Note: This solution moves AudioData when you call init_audio so you won't be able to use the AudioData instance again after your code calls init_audio

You could also just move the ringbuffer instead of moving the whole AudioData struct into the closure.


  1. you might already be using interior mutability since you only had a shared reference to the buffer ↩ī¸Ž

Thanks. So I can see that now the callback is within the impl block so self can be used. I've only used move with threads up to now and that explicitly passes in the moved parameters. Here however we only seem to be moving the parameters to the callback. How does that implicitly move AudioData and if I wanted to just move the ring buffer would I add that into the closure? Moving the whole thing will prevent a tidy close and pause and play calls which I will need.

If we use this closure as a callback:

move |data, info| self.write_audio::<f32>(data, info)

data and info are not variables captured by move - they are sent as parameters from the audio backend when it uses the callback to get more data. We are, however, capturing self (the AudioData) because it is implicitly used by write_audio as the &mut self parameter (i.e. it's a method call).

You could go back to write_audio as a free function and send only the ringbuffer as an extra parameter (as noted above):

fn write_audio<T: Sample>(data: &mut [T], _: &cpal::OutputCallbackInfo, ringbuf: SyncByteRingBuf)

Then you could write the callback like:

move |data, info| write_audio::<f32>(data, info, some_cloned_value /* Edited out: self.rb_audio */)

As a side note, write_audio shadows the data variable here:

fn write_audio<T: Sample>(data: &mut [T], _: &cpal::OutputCallbackInfo) {
    let mut data: Vec<f32> = vec![0.0; data.len()];

That means you are dropping the reference to the buffer that the audio backend sent to be filled, and it won't get any data. You could just rename the new data variable to tmp_buf to fix that.

Thanks. I can probably figure the various options now but I have a different problem. With a simple example the callback fires but not with the real code. I've made both runable using cpal = "0.14.0"

use std::io::{stdin, stdout, Read, Write};
use cpal::{Data, Sample, SampleFormat};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};

fn main() {
    let host = cpal::default_host();
    let device = host.default_output_device().expect("no output device available");

    
    let mut supported_configs_range = device.supported_output_configs()
    .expect("error while querying configs");
    let supported_config = supported_configs_range.next()
    .expect("no supported config?!")
    .with_max_sample_rate();
    println!("{:?}",supported_config);

    let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
    let sample_format = supported_config.sample_format();
    println!("{:?}", sample_format);
    let config = supported_config.into();
    println!("{:?}", config);
    let stream = match sample_format {
    SampleFormat::F32 => device.build_output_stream(&config, write_silence::<f32>, err_fn),
    SampleFormat::I16 => device.build_output_stream(&config, write_silence::<i16>, err_fn),
    SampleFormat::U16 => device.build_output_stream(&config, write_silence::<u16>, err_fn),
    }.unwrap();
    stream.play().unwrap();
    pause();
}

fn write_silence<T: Sample>(data: &mut [T], _: &cpal::OutputCallbackInfo) {
    println!("Callback");
    for sample in data.iter_mut() {
        *sample = Sample::from(&0.0);
    }
}

fn pause() {
    let mut stdout = stdout();
    stdout.write(b"\nPress Enter to close...\n\n").unwrap();
    stdout.flush().unwrap();
    stdin().read(&mut [0]).unwrap();
}

However with the changes in whatever configuration the callback never fires.

use std::io::{stdin, stdout, Read, Write};
use cpal::{Data, Sample, SampleFormat};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use std::vec;
use std::sync::Arc;


//==================================================================================
// Audio output 
pub struct AudioData {

}

impl AudioData {
	// Create a new instance and initialise the default data
	pub fn new() ->  AudioData { 
		
        AudioData {
            
        }
    }


    // Create an audio output stream
    pub fn init_audio(mut self) {
        let host = cpal::default_host();
        let device = host.default_output_device().expect("no output device available");

        let mut supported_configs_range = device.supported_output_configs()
        .expect("error while querying configs");
        let supported_config = supported_configs_range.next()
        .expect("no supported config?!")
        .with_max_sample_rate();

        let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
        let sample_format = supported_config.sample_format();
        let config = supported_config.into();
        let stream = match sample_format {
        SampleFormat::F32 => device.build_output_stream(&config, write_audio::<f32>, err_fn),
        SampleFormat::I16 => device.build_output_stream(&config, write_audio::<i16>, err_fn),
        SampleFormat::U16 => device.build_output_stream(&config, write_audio::<u16>, err_fn),
        }.unwrap();

        // Start the default stream
        println!("Streaming");
        stream.play().unwrap();
    }

}

// Callback when the audio output needs more data
fn write_audio<T: Sample>(data: &mut [T], _: &cpal::OutputCallbackInfo) {
    println!("Callback");
    for sample in data.iter_mut() {
        *sample = Sample::from(&0.0);
    }
}

fn main() {
    // Create the local audio
    let i_local_audio = AudioData::new();
    i_local_audio.init_audio();
    pause();
}

fn pause() {
    let mut stdout = stdout();
    stdout.write(b"\nPress Enter to close...\n\n").unwrap();
    stdout.flush().unwrap();
    stdin().read(&mut [0]).unwrap();
}

I guess stream is going out of scope, probably not helping. I will fix that and try again.

1 Like

I've put everything in the struct but makes no difference. Any ideas?

It looks like stream is still being dropped at the end of init_audio, so you can store it in the struct instead:

pub struct AudioData {
    stream: Option<cpal::Stream>,
}

and then

    // Start the default stream
    println!("Streaming");
    stream.play().unwrap();
    self.stream = Some(stream);

But then you still consume the stream with init_audio(mut self), so change it to pub fn init_audio(&mut self).

Sorry to be a pain. I think I'm getting there. In the simple example it now calls the callback although I don't know why just storing the channel didn't work and it had to be stored after the play.
In the real world it's throwing an error I don't understand.

use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Data, Sample, SampleFormat, PlayStreamError};
use std::vec;
use std::io::Read;
use std::sync::Arc;

use crate::app::common::ringb;

//==================================================================================
// Audio output
pub struct AudioData {
    rb_audio: Arc<ringb::SyncByteRingBuf>,
    stream: Option<cpal::Stream>,
}

impl AudioData {
    // Create a new instance and initialise the default data
    pub fn new(rb_audio: Arc<ringb::SyncByteRingBuf>) -> AudioData {
        AudioData {
            rb_audio: rb_audio,
            stream: None,
        }
    }

    // Create an audio output stream
    pub fn init_audio(mut self) {
        println!("Initialising local audio...");
        let host = cpal::default_host();
        let device = host
            .default_output_device()
            .expect("no output device available");

        let mut supported_configs_range = device
            .supported_output_configs()
            .expect("error while querying configs");
        let supported_config = supported_configs_range
            .next()
            .expect("no supported config?!")
            .with_max_sample_rate();

        let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
        let sample_format = supported_config.sample_format();
        let config = supported_config.into();
        let stream = match sample_format {
            SampleFormat::F32 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<f32>(data, info, self.rb_audio),
                err_fn,
            ),
            SampleFormat::I16 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<i16>(data, info, self.rb_audio),
                err_fn,
            ),
            SampleFormat::U16 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<u16>(data, info, self.rb_audio),
                err_fn,
            ),
        }
        .unwrap();

        println!("Starting audio stream");
        stream.play().unwrap();
        self.stream = Some(stream);

    }   
}

// Callback when the audio output needs more data
fn write_audio<T: Sample>(data: &mut [f32], _: &cpal::OutputCallbackInfo, rb_audio: Arc<ringb::SyncByteRingBuf>) {
    // Byte data from ring buffer
    println!("Len {}", data.len());
    
    let mut rb_data: Vec<u8> = vec![0; data.len()*4];
    let mut in_data: Vec<f32> = vec![0.0; data.len()];
    let mut i = 0;

    let audio_data = rb_audio.read().read(&mut rb_data);
    match audio_data {
        Ok(_sz) => {
            for sample in data.iter_mut() {
                *sample = in_data[i];
                i += 1;
            }
        }
        Err(e) => println!("Read error on rb_iq {:?}. Skipping cycle.", e),
    }
    
}

// Convert a Vec:u8 to a Vec:f32
fn u8_to_f32() {
    // The U8 data in the ring buffer is ordered as LE 16 bit values

}

error[E0507]: cannot move out of self.rb_audio, a captured variable in an FnMut closure
--> src\app\audio\audio_out.rs:84:66
|
53 | pub fn init_audio(mut self) {
| -------- captured outer variable
...
84 | move |data, info| write_audio::(data, info, self.rb_audio),
| ----------------- ^^^^^^^^^^^^^ move occurs because self.rb_audio has type Arc<SyncByteRingBuf>, which does not implement the Copy trait
| |
| captured by this FnMut closure

error[E0507]: cannot move out of self.rb_audio, a captured variable in an FnMut closure
--> src\app\audio\audio_out.rs:74:66
|
53 | pub fn init_audio(mut self) {
| -------- captured outer variable
...
74 | move |data, info| write_audio::(data, info, self.rb_audio),
| ----------------- ^^^^^^^^^^^^^ move occurs because self.rb_audio has type Arc<SyncByteRingBuf>, which does not implement the Copy trait
| |
| captured by this FnMut closure

error[E0507]: cannot move out of self.rb_audio, a captured variable in an FnMut closure
--> src\app\audio\audio_out.rs:79:66
|
53 | pub fn init_audio(mut self) {
| -------- captured outer variable
...
79 | move |data, info| write_audio::(data, info, self.rb_audio),
| ----------------- ^^^^^^^^^^^^^ move occurs because self.rb_audio has type Arc<SyncByteRingBuf>, which does not implement the Copy trait
| |
| captured by this FnMut closure

Sorry - looks like my original advice to send self.rb_audio to write_audio is wrong. To get an owned value to write_audio, you need to clone the Arc first:

let cloned_rb_audio = self.rb_audio.clone();
let stream = match sample_format {
    SampleFormat::F32 => device.build_output_stream(
        &config,
        move |data, info| write_audio::<f32>(data, info, cloned_rb_audio),
        err_fn,
    ),
...
1 Like

This is so frustrating. I can never seem to fix everything. The clone got audio_out to compile. Now when I try to call self.i_local_audio.init_audio(); from my app module it blows up because of this 'can't move out of self thing'. I change init_audio to be '&mut self' that fixes app but blows up in audio_output again with the same error as before. I just don't understand why a simple method call with no parameters has to pass ownership at all. If you want to call it a day I will quite understand.

You don't actually need ownership here, I took ownership in my original example because it didn't require an Arc and I didn't know anything more about how you wanted to use AudioData. Now that you have an Arc around the buffer you can just take self by reference and return the stream rather than trying to store it on self.

use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Data, PlayStreamError, Sample, SampleFormat};
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::vec;

mod ringb {
    use std::io;

    pub struct SyncByteRingBuf;

    impl SyncByteRingBuf {
        pub fn read(&self) -> impl io::Read {
            io::Cursor::new([0u8])
        }
    }
}

//==================================================================================
// Audio output
pub struct AudioData {
    rb_audio: Arc<ringb::SyncByteRingBuf>,
}

impl AudioData {
    // Create a new instance and initialise the default data
    pub fn new(rb_audio: Arc<ringb::SyncByteRingBuf>) -> AudioData {
        AudioData { rb_audio: rb_audio }
    }

    // Create an audio output stream
    // Return the stream instead of storing it somewhere.
    pub fn init_audio(&self) -> cpal::Stream {
        println!("Initialising local audio...");
        let host = cpal::default_host();
        let device = host
            .default_output_device()
            .expect("no output device available");

        let mut supported_configs_range = device
            .supported_output_configs()
            .expect("error while querying configs");
        let supported_config = supported_configs_range
            .next()
            .expect("no supported config?!")
            .with_max_sample_rate();

        let err_fn = |err| eprintln!("an error occurred on the output audio stream: {}", err);
        let sample_format = supported_config.sample_format();
        let config = supported_config.into();

        let rb_audio = self.rb_audio.clone();
        let stream = match sample_format {
            SampleFormat::F32 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<f32>(data, info, &rb_audio),
                err_fn,
            ),
            SampleFormat::I16 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<i16>(data, info, &rb_audio),
                err_fn,
            ),
            SampleFormat::U16 => device.build_output_stream(
                &config,
                move |data, info| write_audio::<u16>(data, info, &rb_audio),
                err_fn,
            ),
        }
        .unwrap();

        println!("Starting audio stream");
        stream.play().unwrap();
        stream
    }
}

// Placeholder for the struct you appear to be keeping `AudioData` in.
struct State {
    i_local_audio: AudioData,
}

// Test the usage of the init_audio method to simulate the error you were hitting.
fn check_init(state: &State) {
    let stream = state.i_local_audio.init_audio();
}

// Callback when the audio output needs more data
fn write_audio<T: Sample>(
    data: &mut [f32],
    _: &cpal::OutputCallbackInfo,
    rb_audio: &ringb::SyncByteRingBuf,
) {
    // Byte data from ring buffer
    println!("Len {}", data.len());

    let mut rb_data: Vec<u8> = vec![0; data.len() * 4];
    let mut in_data: Vec<f32> = vec![0.0; data.len()];
    let mut i = 0;

    let audio_data = rb_audio.read().read(&mut rb_data);
    match audio_data {
        Ok(_sz) => {
            for sample in data.iter_mut() {
                *sample = in_data[i];
                i += 1;
            }
        }
        Err(e) => println!("Read error on rb_iq {:?}. Skipping cycle.", e),
    }
}

// Convert a Vec:u8 to a Vec:f32
fn u8_to_f32() {
    // The U8 data in the ring buffer is ordered as LE 16 bit values
}
1 Like

Thank you for all the help. Finally got there. I thought I might have exactly the same issue with the stream. I had to store it as an Option in app as it would go out of scope otherwise. Then could I call another method with it. Finally this worked.

// Close stream
    pub fn close_audio(&mut self, stream: &cpal::Stream) {
        println!("Closed");
    }

and called from app by

self.i_local_audio.close_audio(&(self.stream.as_ref().unwrap()));

Now I need to get some real data going down this path.