Why does async_channel::Sender not implement Sink?

Context: I want to use (stream.forward(...)), passing in a Sender from async_channel::bounded(...) as the sink.

When sending needs to wait for space in the channel, you need to put the waker for the waiter somewhere. Currently, the channel puts it in the future returned by send. However, the sink API would require you to store the waker inside the sender struct instead.

So I'm guessing they didn't want to add an additional field that's only used when you use it as a sink.

As a comparison, the Tokio mpsc channel doesn't implement Sink either, but it provides a wrapper type in tokio-util that has an extra field to store the waker and similar, so you can use the wrapper to get a sink.

I don't know if async-channel has a similar wrapper, but you could write one by storing the send future in the wrapper.

1 Like
                    tokio::spawn(
                        async move {
                            loop {
                                match src.next().await {
                                    Some(x) => {
                                        dst.send(x).await;
                                    },
                                    _ => break
                                }
                            }
                        }
                    );

Does this have something to do with 'holding' the x, while waiting for dst.send(x).await ?

I assume you are talking about the forward method from the futures::StreamExt extension trait. the async-channel crate is part of the smol ecosystem. it is perfectly ok to be used with other libraries like futures or tokio, it is however designed with smol family of crates in mind, notably the futures-lite crate. by design, as the crate name suggested, it only provide basic combinators as building blocks and the user is supposed to use the them to build their own abstractions.

futures-lite doesn't provide a forward on the StreamExt trait, nor does it have a Sink trait like futures does, but you can emulate the forward yourself. the Forward future, when polled, just polls the next item from the stream and send it to the sink, with an single item buffer in between.

// emulate the Forward future
type SinkError = async_channel::SendError;
async fn forward<S: Stream, T>(stream: S, sink: Sender<T>) -> Result<(), SinkError> {
    while let Some(item) = stream.next().await {
        sink.send(item).await?;
    }
    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.