I've been trying for hours, but to no awail. Every stream returned through the async_stream macro has to be pinned either through pin_mut! or Box::pin, only to become completely useless whenever you need to listen for something other than this stream alone with tokio::select.
let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);
while let Some(data) = stream.next().await {
// great, the data is owned, and you can do
// whatever needs to be done with it
}
But the following completely messes up any plans you might have about using it:
let (t, mut s) = join(G::tracer(), G::signals()).await;
let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);
loop {
tokio::select! {
// s is a Tokio WatchStream, working great
Some(Signal::Interruption) = s.next() => {
trace!(t, "interrupted");
break;
}
// stream is an "-> impl Stream<Item = ...>,
// created through the async_stream::stream! macro
Some(data) = stream.next() => {
// data is behind a shared reference,
// best of luck trying to use it
}
}
}
I tried to use regular Tokio wrappers, spending a ton of time mapping channel's data through the futures crate only to discover that it create additional mess of the sort of AndThen<futures::Stream<AndThen<tokio_stream::Stream<..., which is also unusable, as it doesn't satisfy any traits that are supposed to be satisfied for the stream to be used as Iterator.
What am I doing wrong? I'm beyond any frustration at this point.
the method `poll` exists for struct `std::pin::Pin<&mut tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream>>`, but its trait bounds were not satisfied
the following trait bounds were not satisfied:
`impl tokio_stream::Stream: std::marker::Unpin`
which is required by `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream>: futures::Future`
and
`std::future::from_generator::GenFuture<[static generator@/home/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.2/src/lib.rs:201:9: 201:67]>` cannot be unpinned
consider using `Box::pin`
required because it appears within the type `impl futures::Future`
required because it appears within the type `impl tokio_stream::Stream`
cannot move out of a shared reference
select.rs[522, 39]: Actual error occurred here
select.rs[560, 9]: Error originated from macro call here
select.rs[575, 9]: Error originated from macro call here
select.rs[575, 9]: Error originated from macro call here
select.rs[599, 9]: Error originated from macro call here
server.rs[42, 10]: data moved here
// ^ the source line "Some(mut data) = stream.next() => {"
error[E0507]: cannot move out of a shared reference
--> src/server.rs:37:3
|
37 | / tokio::select! {
38 | | Some(Signal::Interruption) = s.next() => {
39 | | trace!(t, "interrupted");
40 | | break;
41 | | }
42 | | Some(mut data) = stream.next() => {
| | --------
| | |
| | data moved here
| | move occurs because `data` has type `WebSocketStream<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>`, which does not implement the `Copy` trait
... |
45 | | }
46 | | }
| |___^
|
= note: this error originates in the macro `$crate::select` (in Nightly builds, run with -Z macro-backtrace for more info)
That works, but I can't do much with an immutable borrow. Why does this work just fine?
let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);
while let Some(mut data) = stream.next().await {
// no problem
}
Jesus, was that all that was required? For the love of all that is holy, THANK YOU. That was much easier than I could have ever thought. Where does this come from, though? And how am I able to mutably access something that's supposed to be behind an immutable reference?
I'll file a bug report right away. And thank you a ton once again.
It's not actually behind an immutable reference. The macro is making an immutable reference to it somewhere internally and something went wrong with that reference - not sure what.
Changing mutability of something you have ownership of is always possible.