Context: I want to use (stream.forward(...)), passing in a Sender from async_channel::bounded(...) as the sink.
When sending needs to wait for space in the channel, you need to put the waker for the waiter somewhere. Currently, the channel puts it in the future returned by send
. However, the sink API would require you to store the waker inside the sender struct instead.
So I'm guessing they didn't want to add an additional field that's only used when you use it as a sink.
As a comparison, the Tokio mpsc channel doesn't implement Sink either, but it provides a wrapper type in tokio-util that has an extra field to store the waker and similar, so you can use the wrapper to get a sink.
I don't know if async-channel has a similar wrapper, but you could write one by storing the send future in the wrapper.
tokio::spawn(
async move {
loop {
match src.next().await {
Some(x) => {
dst.send(x).await;
},
_ => break
}
}
}
);
Does this have something to do with 'holding' the x, while waiting for dst.send(x).await ?
I assume you are talking about the forward
method from the futures::StreamExt
extension trait. the async-channel
crate is part of the smol
ecosystem. it is perfectly ok to be used with other libraries like futures
or tokio
, it is however designed with smol
family of crates in mind, notably the futures-lite
crate. by design, as the crate name suggested, it only provide basic combinators as building blocks and the user is supposed to use the them to build their own abstractions.
futures-lite
doesn't provide a forward
on the StreamExt
trait, nor does it have a Sink
trait like futures
does, but you can emulate the forward
yourself. the Forward
future, when polled, just polls the next item from the stream and send it to the sink, with an single item buffer in between.
// emulate the Forward future
type SinkError = async_channel::SendError;
async fn forward<S: Stream, T>(stream: S, sink: Sender<T>) -> Result<(), SinkError> {
while let Some(item) = stream.next().await {
sink.send(item).await?;
}
Ok(())
}