Looking for an async, bounded & observable single-producer channel

I am looking for a channel that is bounded and supports a standard style of send and receive:

// For Sender<T>
pub async fn send(&self, msg: T) -> Result<(), T>;
// For Receiver<T>
pub async fn recv(&self) -> Option<T>;

send waits on a full channel, until there is space to send (Ok(_)) or all receivers have been dropped (Err(msg)).

recv waits on an empty channel, until a message is sent into the channel (Some<T>) or the sender is dropped (None).

Being a single-producer queue, I need the sender to be able to observe the status of the channel:

pub fn len(&self) -> usize;

and own all the in-flight messages after all the ongoing recv()s finish by closing it:

pub async fn close(self) -> impl Iterator<Item = T>;

Are there any existing crate that provides a channel with functionalities close to the above? Thanks!

Have a look at the futures library, tokio-sync, async-std, futures-intrusive.

Are none of these suitable for your requirements? There is probably some more implementations out there that I am not aware of.

The most difficult part of this is the close thing. You can do the len part using an atomic integer that you increment and decrement accordingly.

Upon a second look, I think the close should probably be async so that the messages are only returned after the last recv() finishes. I will update the original post accordingly. Hope this makes it less difficult.

The senders in futures, tokio and futures-intrusive cannot see anything in the channel, not even the len().

The async_std Sender can see the channel's len() and capacity(), but unfortunately it's send() is infallible, so there is apparently no way to wait for errors happening on the receiving end. This API is still marked "unstable", but there are very good reasons (async-std/issues/212) they designed their Sender this way, so I don't think the send() can become fallible in the future.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.