Packets missing on reading from a Unix Domain Socket

Missing bytes when reading from a Unix Domain Socket

I have the following program that reads from a Unix Domain Socket.
I have a program that writes a file to the Unix Domain Socket.

The file size is about 375 KB . If I set the chunk size to 400KB the whole file is read
and the data is proper

But if the chunk size is set to say 1024 then a lot of packets are skipped. I am not able to find out why that is

fn main() -> io::Result<()> {
    info!(
        " Starting with the following values Chunk Size : {:?}, Socket Name : {:?} ",
        chunk_size, addr
    );
    // Create a socket and listen to it
    let list01 = UnixListener::bind(&addr).unwrap();

    for stream in list01.incoming() {
        match stream {
            Ok(stream) => {
                loop {
                    let mut chunk = Vec::with_capacity(chunk_size);
                    let reference = BufReader::new(&stream);
                    let bytes_read = reference.take(chunk_size as u64).read_to_end(&mut chunk)?;
                    match bytes_read {
                        0 => {
                            info!("bytes_read ==0 Ending file with iteration {:?}", i);
                            break;
                        }

                        _ => {
                            info!("bytes read == {:?} for iteration {:?}", bytes_read, i);
                            let buf1 = Bytes::from(chunk);
                            info!("Buffer read from stream is : {:#?}", buf1);
                        }
                    }
                }
            }
            _ => {
                error!("Error {:?}", stream);
            }
        }
    }

    Ok(())
}

Every time your BufReader goes out of scope, any remaining data in its buffer is lost. You should reuse it.

2 Likes

i moved the BufReader outside the loop but I get the following error (i.e.)

 for stream in list01.incoming() {
      let reference = BufReader::new(&stream);

But the compiler now says trait not satisfied

error[E0277]: the trait bound `&std::result::Result<UnixStream, std::io::Error>: std::io::Read` is not satisfied
  --> src/read-pipe.rs:45:40
   |
45 |         let reference = BufReader::new(&stream);
   |                                        ^^^^^^^ the trait `std::io::Read` is not implemented for `&std::result::Result<UnixStream, std::io::Error>`
   |
   = note: required by `BufReader::<R>::new`

error[E0599]: the method `take` exists for struct `BufReader<&std::result::Result<UnixStream, std::io::Error>>`, but its trait bounds were not satisfied
  --> src/read-pipe.rs:51:44
   |
51 |                 let bytes_read = reference.take(chunk_size as u64).read_to_end(&mut chunk).unwrap();
   |                                            ^^^^ method cannot be called on `BufReader<&std::result::Result<UnixStream, std::io::Error>>` due to unsatisfied trait bounds
   |
  ::: /home/hari/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/io/buffered/bufreader.rs:46:1
   |
46 | pub struct BufReader<R> {
   | -----------------------
   | |
   | doesn't satisfy `_: Buf`
   | doesn't satisfy `_: Iterator`
   | doesn't satisfy `_: std::io::Read`
   |
   = note: the following trait bounds were not satisfied:
           `&std::result::Result<UnixStream, std::io::Error>: std::io::Read`
           which is required by `BufReader<&std::result::Result<UnixStream, std::io::Error>>: std::io::Read`
           `BufReader<&std::result::Result<UnixStream, std::io::Error>>: Buf`
           which is required by `&mut BufReader<&std::result::Result<UnixStream, std::io::Error>>: Buf`
           `BufReader<&std::result::Result<UnixStream, std::io::Error>>: std::io::Read`
           which is required by `&mut BufReader<&std::result::Result<UnixStream, std::io::Error>>: std::io::Read`
           `BufReader<&std::result::Result<UnixStream, std::io::Error>>: Iterator`
           which is required by `&mut BufReader<&std::result::Result<UnixStream, std::io::Error>>: Iterator`

That's because you are no longer checking for errors. The compiler error tells you this by using the Result type where the UnixStream type should be.

Thanks . I had to use the by_ref to fix the error that compiler threw on reusing the moved variable. This is what it looks like

fn main() -> io::Result<()> {
    info!(
        " Starting with the following values Chunk Size : {:?}, Socket Name : {:?} ",
        chunk_size, addr
    );
    // Create a socket and listen to it . We should unbind if the socket exists
    let list01 = UnixListener::bind(&addr).unwrap();

    let mut i = 0;
    let mut current_pointer: usize = 0;
    let mut rewind: usize = 0;
    let mut new_buffer = Bytes::new();
    for stream in list01.incoming() {
        match stream {
            Ok(stream) => {
                let reference = std::io::Read::by_ref(&mut stream);
                loop {
                    let mut chunk = Vec::with_capacity(chunk_size);
                    let bytes_read = reference.take(chunk_size as u64).read_to_end(&mut chunk)?;
                    match bytes_read {
                        0 => {
                            info!("bytes_read ==0 Ending file with iteration {:?}", i);
                            break;
                        }

                        _ => {
                            info!("bytes read == {:?} for iteration {:?}", bytes_read, i);
                            /* Align the pointer to the previous frame */
                            let buf1 = Bytes::from(chunk);
                            info!("Buffer read from stream is : {:#?}", buf1);
                        }
                    }
                }
            }
            _ => {
                error!("Error {:?}", stream);
            }
        }
    }

    Ok(())
}

If you make the following change, then you can take away the temporary reference.

-let bytes_read = reference.take(chunk_size as u64).read_to_end(&mut chunk)?;
+let bytes_read = stream.by_ref().take(chunk_size as u64).read_to_end(&mut chunk)?;

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.