Data become corrupted through tcp

I'm trying to record from micro using cpal, and then transfer recorded sound through the tcp, and put it to sound output in other programm reading from tcp. But looks like that data is corrupting while data is transfering.

it's the receiver code:

extern crate anyhow;
extern crate cpal;

use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait};

use std::io::Read;
use std::net::{TcpListener, TcpStream};

fn handle_connection(mut stream: TcpStream) {
    
    loop {
        let mut buffer : [u8; 5] = [0, 0, 0, 0, 0];
        let mut res : f32 = 0.0;
        let mut recorded = [0.0; 1024].to_vec();
        for ind in 0..1024 {
            match stream.read(&mut buffer) {
                Ok(s) => {
                    println!("{:?}", s);
                    println!("{:?}", buffer.clone());
                    let decoded : Option<f32> = bincode::deserialize(&buffer[..]).unwrap();

                    res = match decoded {
                        Some(x) => x,
                        _ => 0.0
                    };
                    recorded[ind] = res;
                    println!("{:?}", res);
                },
                _ =>  panic!("failed") 
            }
        }

        let read_from = recorded.clone();
        std::thread::spawn(move || {
            let host = cpal::default_host();
            let device = host.default_output_device().expect("failed to find a default output device");
            let event_loop = host.event_loop();
            let format = device.default_output_format().unwrap();
            let stream_id = event_loop.build_output_stream(&device, &format).unwrap();
            event_loop.play_stream(stream_id.clone()).unwrap();

            event_loop.run(move |id, result| {
                
                let sample_rate = format.sample_rate.0 as f32;
                let mut ind = 0;

                let data = match result {
                    Ok(data) => data,
                    Err(err) => {
                        eprintln!("an error occurred on stream {:?}: {}", id, err);
                        return;
                    }
                };
        
                match data {
                    cpal::StreamData::Output { buffer: cpal::UnknownTypeOutputBuffer::F32(mut buf) } => {
                        for sample in buf.chunks_mut(format.channels as usize) {
                            for out in sample.iter_mut() {
                                if ind == 1024 { return; }
                                *out = read_from[ind];
                                ind += 1;
                            }
                            return;
                        }
                    },
                    _ => (),
                }
                return;
            });
            event_loop.destroy_stream(stream_id);
            return;
        }); 
    } 
}

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

it's the sender code:

extern crate anyhow;
extern crate cpal;
extern crate ringbuf;

use cpal::traits::{DeviceTrait, EventLoopTrait, HostTrait};
use ringbuf::RingBuffer;

use std::io::prelude::*;
use std::net::TcpStream;

const LATENCY_MS: f32 = 150.0;

fn main() -> Result<(), anyhow::Error> {
    let host = cpal::default_host();
    let event_loop = host.event_loop();

    let mut stream = TcpStream::connect("127.0.0.1:7878")?;
    let input_device = host.default_input_device().expect("failed to get default input device");

    let mut format = input_device.default_input_format().unwrap();
    format.data_type = cpal::SampleFormat::F32;

    let input_stream_id = event_loop.build_input_stream(&input_device, &format).unwrap();

    let latency_frames = (LATENCY_MS / 1_000.0) * format.sample_rate.0 as f32;
    let latency_samples = latency_frames as usize * format.channels as usize;

    let ring = RingBuffer::new(latency_samples * 2);
    let (mut producer, mut consumer) = ring.split();

    for _ in 0..latency_samples {
        producer.push(0.0).unwrap();
    }

    event_loop.play_stream(input_stream_id.clone()).unwrap();

    std::thread::spawn(move || {
        event_loop.run(move |id, result| {
            let data = match result {
                Ok(data) => data,
                Err(err) => {
                    eprintln!("an error occurred on stream {:?}: {}", id, err);
                    return;
                }
            };

            match data {
                cpal::StreamData::Input { buffer: cpal::UnknownTypeInputBuffer::F32(buffer) } => {
                    assert_eq!(id, input_stream_id);
                    let mut output_fell_behind = false;
                    for &sample in buffer.iter() {
                        //println!("{:?}", sample);
                        let obj : Option<f32> = Some(sample); 
                        let encoded : Vec<u8> = bincode::serialize(&obj).unwrap();
                        let mut msg : [u8; 5] = [0,0,0,0,0]; 
                        for (i, item) in encoded.iter().enumerate() {
                            msg[i] = *item;
                            //stream.write(&[*item]);
                        }
                        println!("{:?}", msg.clone());
                        stream.write(&msg).unwrap();
                        if producer.push(sample).is_err() {
                            output_fell_behind = true;
                        }
                    }
                    if output_fell_behind {
                        eprintln!("output stream fell behind: try increasing latency");
                    }
                },
                _ => panic!("error"),
            }
        });
    });

    println!("Playing for 10 seconds... ");
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("Done!");
    Ok(())
} 

buffer's at the println does not match. Do you have any ideas what am i doing wrong? Or may be there are a better way to transfer audio/floats through tcp? Thank you in advance!

P.S.
it;s cargo.toml dependecies of both programms

[dependencies]
anyhow = "1.0.12"
cpal = "0.10.0"
ringbuf = "0.1.6"
bincode = "1.2.0"

[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "0.3", features = ["audiosessiontypes", "audioclient", "coml2api", "combaseapi", "debug", "devpkey", "handleapi", "ksmedia", "mmdeviceapi", "objbase", "std", "synchapi", "winbase", "winuser"] }

You print the full contents of the receiving buffer everytime. I'm going to assume that stream.read(buffer) can and is reading fewer than five bytes sometimes.

Change your receiving print statements to only print the bytes read from the stream. That may fix the "corruption" you're seeing due to stale bytes at the end of the buffer printing again at later points in the stream.

match stream.read(&mut buffer) {
    Ok(s) => {
        println!("{:?}", s);
        println!("{:?}", &buffer[..s]);
        // ...

If that's the fix, then update your buffer deserialize likewise.

let decoded : Option<f32> = bincode::deserialize(&buffer[..s]).unwrap();

If the problem persists, you can create a gist with samples of the sending and receiving stdout, that would help us, too.

1 Like

changed code as you said, but it's not solving the problem, the s value is always 5, and its read exactly 5 bytes, but their values doesn't match
it's the last 5 sent buffers

[1, 30, 231, 31, 189]
[1, 157, 212, 34, 189]
[1, 102, 206, 37, 189]
[1, 184, 39, 39, 189]
[1, 52, 148, 38, 189]

and its is the last 5 received

[1, 2, 249, 127, 189]
[1, 29, 164, 126, 189]
[1, 27, 108, 122, 189]
[1, 109, 218, 123, 189]
[1, 102, 169, 117, 189]

here is a gist with code

  • TCPStream.write does not guarantee writing all bytes before returning, you might want to use write_all instead
  • TCPStream.read does not guarantee filling the buffer, you need to pay attention to its return value (the number of bytes read), or use read_exact
2 Likes

Thanks for you reply, will try this soon.

UPD. Changed to write_all and read_exact, it;s still remains corrupteed :frowning:

I tested your example (updated with read_exact and write_all), and found the last three values printed were garbage.

I created a simpler example to test the TCP transfer with bincode [1]. I found no corruption here.

If I had to venture a guess, I'd say the way your sender program exits and kills the spawned loop may interrupt the write_all and send garbage instead. Only way I know of to verify, is update the sender to exit gracefully.

Try using a channel to send a "quit" message from the main thread to the spawned thread, and have the spawned thread exit cleanly when the message is received. The main thread should wait on the spawned thread's JoinHandle after sending the "quit" message.

[1]: https://gist.github.com/boxofrox/d3e55e7e68ce5e39ff675d999e990e67

2 Likes