Is my usage causing possible UB?

I tested the following with cargo miri test and it doesn't complain. Is Miri correct?

Note: I'm using this on a macbook

#[derive(Clone)]
struct AnonymousMmap {
    ptr: *mut libc::c_void,
    len: usize,
}

unsafe impl Sync for AnonymousMmap {}
unsafe impl Send for AnonymousMmap {}

impl AnonymousMmap {
    pub fn new(len: usize) -> Result<Self, std::io::Error> {
        let ptr = unsafe {
            libc::mmap(
                std::ptr::null_mut(),
                len,
                libc::PROT_READ | libc::PROT_WRITE,
                libc::MAP_ANON | libc::MAP_PRIVATE,
                -1,
                0,
            )
        };

        if ptr == libc::MAP_FAILED {
            return Err(std::io::Error::last_os_error());
        }

        Ok(Self { ptr, len })
    }
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_anonymous_mmap() {
        let len = 4096;
        let mmap = AnonymousMmap::new(len).unwrap();
        assert_eq!(mmap.len, len);
    }

    const ELEMENT_SIZE: usize = 2;
    #[derive(Clone)]
    struct TestMessageBus {
        mmap: AnonymousMmap,
        write_head: std::sync::Arc<std::sync::atomic::AtomicUsize>,
        read_head: std::sync::Arc<std::sync::atomic::AtomicUsize>,
    }

    impl TestMessageBus {
        fn new(len: usize) -> Self {
            let mmap = AnonymousMmap::new(len).unwrap();
            let write_head = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
            let read_head = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
            Self {
                mmap,
                write_head,
                read_head,
            }
        }

        fn read(&self, position: usize) -> Option<&[u8]> {
            let read_head_position = self.read_head.load(std::sync::atomic::Ordering::Acquire);
            if read_head_position == position {
                return None;
            }

            let ptr = self.mmap.ptr as *const u8;
            let ptr = unsafe { ptr.add(position) };

            Some(unsafe { std::slice::from_raw_parts(ptr as *const u8, ELEMENT_SIZE) })
        }

        fn write(&self, data: &[u8]) {
            let position = self
                .write_head
                .fetch_add(data.len(), std::sync::atomic::Ordering::Relaxed);

            let ptr = self.mmap.ptr as *mut u8;
            let ptr = unsafe { ptr.add(position) };

            unsafe {
                libc::memcpy(
                    ptr as *mut libc::c_void,
                    data.as_ptr() as *const libc::c_void,
                    data.len(),
                );
            }

            let new_read_head = position + data.len();

            loop {
                match self.read_head.compare_exchange_weak(
                    position,
                    new_read_head,
                    std::sync::atomic::Ordering::Release,
                    std::sync::atomic::Ordering::Relaxed,
                ) {
                    Ok(_) => break,
                    Err(_) => continue,
                }
            }
        }
    }

    #[test]
    fn test_multi_threaded_anonymous_mmap() {
        let run_task = |mb: TestMessageBus, start: usize, step: usize, stop: usize| {
            let mut position = 0;
            let mut test = 0;

            loop {
                if let Some(data) = mb.read(position) {
                    let val = u16::from_ne_bytes([data[0], data[1]]);
                    assert_eq!(val, test);
                    position += ELEMENT_SIZE;
                    test += 1;
                    // exit if val is greater than stop
                    if val as usize > stop {
                        break;
                    }
                    // loop until val % step == 0
                    if (val as usize + step - start) % step == 0 {
                        mb.write((val + 1).to_ne_bytes().as_ref());
                    }
                }
            }
        };

        let len = 4096;
        let mut handles = Vec::<std::thread::JoinHandle<()>>::new();
        let num_threads: usize = std::thread::available_parallelism()
            .expect("Failed to get number of available CPUs")
            .into();

        let stop = 2000;
        let message_bus = TestMessageBus::new(len);
        message_bus.write(0u16.to_ne_bytes().as_ref());

        for i in 0..num_threads {
            let mb = message_bus.clone();
            handles.push(std::thread::spawn(move || {
                run_task(mb, i, num_threads, stop)
            }));
        }

        for handle in handles {
            handle.join().unwrap();
        }
    }
}

The AnonymousMmap looks ok to me, except it doesn't have a Drop impl.

But the TestMessageBus has UB in its API. It is possible to write more than the mmap capacity and you can also request reads that are out-of-bounds. Neither method does bounds checking.

And because you can read past the write_head, you have mutable aliasing problems:

#[test]
fn mutable_aliasing() {
    let message_bus = TestMessageBus::new(4096);
    let x = message_bus.read(2).unwrap();
    message_bus.write(&[1, 2, 3, 4]);
    println!("{x:?}");
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.