Sending data over a non blocking socket corrupts data

The assert in this code block fails on random iteration, why? It is failing more reiably on linux than Mac for some reason which is who I found it. Am I doing something wrong in terms of usage of the lib?

bytes = "1.4"
active toolchain

nightly-aarch64-apple-darwin (default)
rustc 1.73.0-nightly (439d066bc 2023-08-10)

use std::time::Instant;

use bytes::{Bytes, BytesMut};

pub fn random_bytes(size: usize) -> &'static [u8] {
    let data = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
    let leaked_ref: &'static [u8] = Box::leak(data.into_boxed_slice());
    leaked_ref
}

const TEST_SEND_FRAME_SIZE: usize = 128;
const WRITE_N_TIMES: usize = 100_000_00;
pub struct MsgFramer;
impl MsgFramer {
    fn get_frame(bytes: &mut BytesMut) -> Option<Bytes> {
        if bytes.len() < TEST_SEND_FRAME_SIZE {
            return None;
        } else {
            let frame = bytes.split_to(TEST_SEND_FRAME_SIZE);
            return Some(frame.freeze());
        }
    }
}

fn main() {
    use rand::Rng;
    let mut rng = rand::thread_rng();
    let send_bytes = random_bytes(TEST_SEND_FRAME_SIZE);

    println!("send_bytes: {:?}", send_bytes);

    let mut buffer = BytesMut::with_capacity(TEST_SEND_FRAME_SIZE);

    let start = Instant::now();
    for i in 0..WRITE_N_TIMES {
        let split = rng.gen_range(0..TEST_SEND_FRAME_SIZE);
        loop {
            buffer.extend_from_slice(&send_bytes[..split]);
            println!("iteration: {}, split: {}, buffer.len(): {} ", i, split, buffer.len());
            let frame = MsgFramer::get_frame(&mut buffer);
            match frame {
                Some(frame) => {
                    let recv_bytes = &frame[..];
                    assert_eq!(recv_bytes, send_bytes, "iteration: {}", i);
                    drop(frame);
                    break;
                }
                None => {
                    buffer.extend_from_slice(&send_bytes[split..]);
                }
            }
        }
    }
    let elapsed = start.elapsed();
    println!(
        "elapsed per write: {:?}, elapsed total: {:?}",
        elapsed / WRITE_N_TIMES as u32,
        elapsed,
    );
}

What are you trying to do, and how exactly is it failing?


By the way, leaking the random data looks suspicious – not because it should directly cause your error, but because you might be misunderstanding something. You could just return the Vec by-value.

Leak is irrelevant I am just creating a slice type .

The program emulates a continuous stream of byte frames from a network socket and appends each slice as they arrive to the bytesmut. After each time some bytes are appended it checks of the bytesmut has enough bytes to make up a frame if yes it will split it off and consume it.

The frame size is fixed so the get_frame supposed to just cut off when there are exactly N bytes. Assert checks that resulting frame is identical to the one which was appended to the buffer. However it is not and assert fails

  • i = 0
    • let split = 80
    • loop
      • You append the first 80 send bytes to the buffer
      • Not large enough: you append the tailing 48 send bytes to the buffer and go to top of loop
      • n.b.: buffer == send_bytes at this point
      • :warning: You append the first 80 send bytes to the buffer
      • Large enough: you consume the first 128 bytes of buffer
      • n.b.: Remaining contents of buffer are first 80 send bytes
      • assert succeeds, you break the loop
  • i = 1
    • let split = 100
    • loop
      • You append the first 100 send bytes to the buffer
      • n.b. buffer contents now first 80 send bytes + first 100 send bytes
      • Large enough: you consume the first 128 bytes of buffer
      • assert fails: (first 80 + first 48 send bytes) != (all 128 send bytes)

It works if you move the appending of the first send bytes to outside of the inner loop, but I don't know if that's what you meant to emulate or not.

3 Likes

Thanks, I realized I have a bug around the loop and had to move it. Now I need to come up with a different example to try to replicate it. I have a project which seems to randomly fail a bench mark with the same assert except it is actually writing via socket. It only fails on linux but not macOS very strange but it seems that after reading about 1m messages via a socket something goes wrong and the recv_frame is not longer equal send_frame, really strange, If I can't figure out how extract a repeatable example I will post the link to the project with a bench mark run command. The bench mark is doing exactly what the example what meant to do, which is read over a non blocking tcpstream and hence populate the BytesMut gradually.

Ok, so I have extracted a "minimum" amount of code to replicate it and posted it here with details of cargo toml and actual error message. This is quite a bit of code to go thought but here is the overall structure

  1. This has one example file called /examples/bytes_bug.rs

  2. The file has two #[test]'s so you can run each one independently

    a) test #1

    called test_emulated() and it writes a fixed size, random bytes frame but arbitrarily splitting it in "half" and then tries to reconstruct the frame but checking if enough bytes are available in the bytes::BytesMut buffer. It seems to never fail on either linux or Mac

    b) test #2

    called test_socket() and this test has two thread

    a new thread which runs server socket, it will read bytes and try to reassemble and count frames.

    the main thread has a client socket and will write N frames, N has to be set pretty high, to get a failure.

    Note: in this test, both read_frame & write_frame take place via a helper struct which will indicate whether io operation completed or socket was not ready. This is due to socket being intentionally non-blocking.

  3. Post:

After playing around with the code a little bit, the issue is that you can't use write_all with non-blocking sockets like this. write_all may write some of the bytes to the socket and then hit the WouldBlock error. In this case, it can't unwrite the bytes it already sent, nor can it communicate the amount of bytes it already wrote so that the sender can try sending the rest of the frame. So then the receiver ends up with a partial frame and that causes the mismatch.

Try replacing write_frame with something like this

pub fn write_frame(&mut self, mut bytes: &[u8]) -> Result<WriteStatus, Box<dyn Error>> {
    let mut wrote = 0;
    while !bytes.is_empty() {
        match self.writer.write(bytes) {
            Ok(0) => {
                panic!("failed to write");
            }
            Ok(n) => {
                wrote += n;
                bytes = &bytes[n..];
            }
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
            Err(e) => return Err(e.into()),
        }
    }
    if bytes.is_empty() {
        Ok(WriteStatus::Completed)
    } else {
        Ok(WriteStatus::Partial(wrote))
    }
}

and change the sender to account for partial writes.

I was also able to reproduce the issue without bytes or mio. If you suspect an issue in a library, it's a good idea to try to reproduce it without them. In this case, it was easy to refactor the code to use Vec<u8> and std's networking types and confirm bytes and mio aren't at fault. Of course, when it comes to very widely used libraries like bytes or mio, it's overwhelmingly likely that there's something wrong with your own code.

By the way, this is undefined behaviour:

let mut buf: [u8; MAX_MESSAGE_SIZE] = unsafe { MaybeUninit::uninit().assume_init() };

Please make sure to read the docs very carefully when using unsafe functions, or better yet, don't use unsafe when it's not necessary.

https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#initialization-invariant

uninitialized memory is special in that it does not have a fixed value (“fixed” meaning “it won’t change without being written to”). Reading the same uninitialized byte multiple times can give different results. This makes it undefined behavior to have uninitialized data in a variable even if that variable has an integer type

let x: i32 = unsafe { MaybeUninit::uninit().assume_init() }; // undefined behavior! ⚠️

I also wanted to mention that for a bug reproduction, functions like find_available_port and random_bytes aren't really necessary (unless you're unable to reproduce the issue with a set port and set of bytes of course), you can just set them to whatever and if someone happens to be using the port you chose or whatever they can just change it.

4 Likes

@Heliozoa , thanks for your valuable feedback, I also just now realized by messing with my setup that it is in fact the socket read which is already corrupted but not the bytes buffer. You pointing out that the write_all API is not compatible with non-blocking sockets is very helpful as I would not realize it so quickly.

This raises a very interesting std lib question. Is this a known fact that write all shall not be used because from pure API perspective it is not particularly obvious.

It may be a good idea to highlight this in the docs somewhere, not sure where or how to best word it though. You could create an issue at Issues · rust-lang/rust · GitHub in the "Documentation problem" category. Even if they decide the docs don't need to be updated and the issue is closed, it may be helpful to someone else that hits this issue.

cross ref with an issue raised for documentation

Oh, the undefined behaviour is limited only to your incorrect usage of unsafe with MaybeUninit. Using write_all with non-blocking sockets isn't UB, it just doesn't work as you might expect because write_all is likely to only write part of the data and erro with WouldBlock and it's not really possible to handle the error properly because the knowledge of how much of the data written is lost.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.