I'm playing with some unsafe Rust code. The following is an incomplete one-way channel for passing data from a writing thread to a reading thread, where I aim to minimize the execution cost by ensuring lock-free execution and minimal atomic synchronization. The general idea is to implement a ring buffer-backed channel.
use std::sync::Arc;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::Relaxed;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::task::AtomicWaker;
use futures::{Stream, StreamExt};
// *****************************************************************************
pub struct Sender<T> {
pub(crate) buf: Arc<[T; 5]>,
pub(crate) waker: Arc<AtomicWaker>,
pub(crate) rindex: Arc<AtomicUsize>,
pub(crate) windex: Arc<AtomicUsize>,
pub(crate) closed: Arc<bool>,
}
impl<T> Sender<T> {
pub fn push(&mut self, data: T) {
let windex = self.windex.load(Relaxed);
let ptr = Arc::as_ptr(&self.buf) as *mut T;
unsafe {
ptr.add(windex).write(data);
}
self.windex.fetch_add(1, Relaxed);
self.waker.wake();
}
pub fn close(&mut self) {
let ptr = Arc::as_ptr(&self.closed) as *mut bool;
unsafe {
ptr.write(true);
}
}
}
// *****************************************************************************
pub struct Receiver<T: Copy> {
pub(crate) buf: Arc<[T; 5]>,
pub(crate) waker: Arc<AtomicWaker>,
pub(crate) rindex: Arc<AtomicUsize>,
pub(crate) windex: Arc<AtomicUsize>,
pub(crate) closed: Arc<bool>,
}
impl<T: Copy> Receiver<T> {
pub fn pop(&mut self) -> Option<T> {
let rindex = self.rindex.load(Relaxed);
let windex = self.windex.load(Relaxed);
if windex > rindex {
self.rindex.fetch_add(1, Relaxed);
Some(self.buf.get(rindex).unwrap().to_owned())
} else {
None
}
}
pub fn is_closed(&self) -> bool {
let rindex = self.rindex.load(Relaxed);
let windex = self.windex.load(Relaxed);
*self.closed && rindex == windex
}
}
impl<T: Copy> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.waker.register(cx.waker());
if self.is_closed() {
Poll::Ready(None)
} else if let Some(item) = self.pop() {
Poll::Ready(Some(item))
} else {
Poll::Pending
}
}
}
// *****************************************************************************
#[tokio::main]
async fn main() {
let buf = Arc::new([
[0u8,1], [2,3], [4,5], [6,7], [8,9],
]);
let waker = Arc::new(AtomicWaker::new());
let rindex = Arc::new(AtomicUsize::new(0));
let windex = Arc::new(AtomicUsize::new(0));
let closed = Arc::new(false);
let mut sender = Sender{
buf: buf.clone(),
waker: waker.clone(),
rindex: rindex.clone(),
windex: windex.clone(),
closed: closed.clone(),
};
let mut receiver = Receiver{
buf: buf,
waker: waker,
rindex,
windex,
closed,
};
let s = tokio::spawn(async move {
let feed = [
[100,101],
[102,103],
[104,105],
[106,107],
[108,109],
];
for msg in feed {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
sender.push(msg);
}
sender.close();
});
let r = tokio::spawn(async move {
while let Some(data) = receiver.next().await {
println!("{:?}", data);
}
});
s.await.unwrap();
r.await.unwrap();
}
The code works on my Mac, but I'm uncertain if it would function properly on other computers. I question whether the direct memory mutation via a pointer, as demonstrated in push
, truly results in the immediate display of altered data to the other thread(s). I surmise that waker
, rindex
, and windex
in this context need to be atomic types, as reading could become corrupted otherwise. When setting closed
, the process progresses in one direction and is always read after the write in another thread is completed. The buf
items are never simultaneously accessed by both threads — in terms of indexes, the writer leads while the reader follows.