Implement a runtime-agnostic async mpsc channel based on std's mpsc

I'm doing my practice to implement runtime-agnotisc async primitives that are used in our applications and see if it contributes to the ecosystem.

After reading current implements, like tokio's, future-channel's, flume's and async-channel's, I realized that we may directly make use of std's mpsc channel. This actually follows how future-channel does, while it reimplements a sync mpsc channel based on this blog.

The performance of my patch above seems not significantly regress (or even better in some cases). Actually, flume uses a channel state lock at the very beginning and still states its good performance.

Welcome to drop your reviews and comments if there is any ceveat or performance consideration.

1 Like

I left a couple of comments on your PR, but the TLDR is that you have a couple of leak issues with the way you store the Waker.

1 Like

I found one unfortunate regression by using std's mpsc: It's not Sync because its recv and try_recv take &self rather than &mut self.

It won't hurt in most scenarios but can prevent a future from being Send when a paraent structure hold an UnboundedReceiver.

I don't know why std's Receiver would take this way and have impl !Sync. Perhaps I need to pick up 1024cores - Non-intrusive MPSC node-based queue again.

UPDATE - Not quite. Since I take &mut self for UnboundedReceiver::recv and UnboundedReceiver::try_recv, I should be able to unsafe impl Sync for UnboundedReceiver<T: Send>.

FYI you can do this safely using the (unstable) std::sync::Exclusive

1 Like

Thank you! I just tried to use Mutex and get_mut but it introduces some overhead to check the poison bit.

I'd book this struct and switch to it when it gets stabilized.