I have the following message bus with a buffer for writing.
pub struct MemoryBus {
buffer: std::sync::Arc<SharedBuffer>,
ptr: ConstPtr<u8>,
}
struct SharedBuffer {
_buffer: std::sync::Mutex<PinnedBuffer>,
write_head: std::sync::atomic::AtomicUsize,
read_head: std::sync::atomic::AtomicUsize,
}
struct PinnedBuffer {
_buffer: std::pin::Pin<std::boxed::Box<[u8; BUFFER_SIZE]>>,
_pin: std::marker::PhantomPinned,
}
struct ConstPtr<T: ?Sized> {
ptr: usize,
_phantom: std::marker::PhantomData<T>,
}
I return a reference to one part of the buffer in the read implementation using
std::slice::from_raw_parts
:
impl traits::Reader for MemoryBus {
fn read(&self, position: usize) -> Option<&[u8]> {
let pos = self
.buffer
.read_head
.load(std::sync::atomic::Ordering::Acquire);
if position >= pos {
return None;
}
let wrapped_pos = position % WRAP_SIZE;
unsafe {
let ptr_offset = self.ptr.as_ptr().add(wrapped_pos);
let buffer = std::slice::from_raw_parts(ptr_offset, messenger::HEADER_SIZE);
let hdr = bincode::deserialize::<messenger::Header>(buffer).unwrap();
let len = messenger::HEADER_SIZE + hdr.size as usize;
let buffer = std::slice::from_raw_parts(ptr_offset, len);
_print_buffer(&buffer);
Some(buffer)
}
}
}
While holding this reference I'm writing new items to the message bus which also takes a
&self
as first argument and a std::slice::from_raw_parts_mut
impl traits::Writer for MemoryBus {
fn write<M: traits::Message, H: traits::Handler>(&self, message: M) {
let size = messenger::align_to_usize(bincode::serialized_size(&message).unwrap() as usize);
let position = self.buffer.write_head.fetch_add(
messenger::HEADER_SIZE + size as usize,
std::sync::atomic::Ordering::Relaxed,
);
let wrapped_pos = position % WRAP_SIZE;
unsafe {
let ptr_offset = self.ptr.as_ptr_mut().add(wrapped_pos);
let len = messenger::HEADER_SIZE + size;
let buffer = std::slice::from_raw_parts_mut(ptr_offset, len);
buffer.fill(0);
bincode::serialize_into(
&mut buffer[..messenger::HEADER_SIZE],
&messenger::Header {
source: H::ID.into(),
message_id: M::ID.into(),
size: size as u16,
},
)
.unwrap();
bincode::serialize_into(&mut buffer[messenger::HEADER_SIZE..], &message).unwrap();
}
let new_read_head = position + messenger::HEADER_SIZE + size;
loop {
match self.buffer.read_head.compare_exchange_weak(
position,
new_read_head,
std::sync::atomic::Ordering::Release,
std::sync::atomic::Ordering::Relaxed,
) {
Ok(_) => break,
_ => {}
}
}
}
}
Given that I'm sure that the write and read function will not operate on the same part of the array.
Can this implementation still cause Undefined Behavior?
Is it safe to have write take &self
instead of &mut self
in this case?
FYI the goal is to have zero copy deserialization.