How do you use tokio::select with async_stream?

I've been trying for hours, but to no awail. Every stream returned through the async_stream macro has to be pinned either through pin_mut! or Box::pin, only to become completely useless whenever you need to listen for something other than this stream alone with tokio::select.

let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);

while let Some(data) = stream.next().await {
  // great, the data is owned, and you can do
  // whatever needs to be done with it
}

But the following completely messes up any plans you might have about using it:

let (t, mut s) = join(G::tracer(), G::signals()).await;

let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);

loop {
  tokio::select! {
    // s is a Tokio WatchStream, working great
    Some(Signal::Interruption) = s.next() => {
      trace!(t, "interrupted");
      break;
    }
    // stream is an "-> impl Stream<Item = ...>,
    // created through the async_stream::stream! macro
    Some(data) = stream.next() => {
      // data is behind a shared reference,
      // best of luck trying to use it
    }
  }
}

I tried to use regular Tokio wrappers, spending a ton of time mapping channel's data through the futures crate only to discover that it create additional mess of the sort of AndThen<futures::Stream<AndThen<tokio_stream::Stream<..., which is also unusable, as it doesn't satisfy any traits that are supposed to be satisfied for the stream to be used as Iterator.

What am I doing wrong? I'm beyond any frustration at this point.

Can you please post the error you get from your select code? I don't understand how you ended up with data being behind an immutable reference.

Exactly my question as well. Here you go:

With no "pin_mut!"
let (t, mut s) = join(G::tracer(), G::signals()).await;

let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
// pin_mut!(stream);

loop {
  tokio::select! {
    Some(Signal::Interruption) = s.next() => {
      trace!(t, "interrupted");
      break;
    }
    Some(data) = stream.next() => {
      //  
    }
  }
}

Errors:

the method `poll` exists for struct `std::pin::Pin<&mut tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream>>`, but its trait bounds were not satisfied
the following trait bounds were not satisfied:
`impl tokio_stream::Stream: std::marker::Unpin`
which is required by `tokio_stream::stream_ext::next::Next<'_, impl tokio_stream::Stream>: futures::Future`

and

`std::future::from_generator::GenFuture<[static generator@/home/.cargo/registry/src/github.com-1ecc6299db9ec823/async-stream-0.3.2/src/lib.rs:201:9: 201:67]>` cannot be unpinned
consider using `Box::pin`
required because it appears within the type `impl futures::Future`
required because it appears within the type `impl tokio_stream::Stream`

And the other case:

With "pin_mut!"
let (t, mut s) = join(G::tracer(), G::signals()).await;

let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);

loop {
  tokio::select! {
    Some(Signal::Interruption) = s.next() => {
      trace!(t, "interrupted");
      break;
    }
    Some(mut data) = stream.next() => {
      // nope
    }
  }
}

Error:

cannot move out of a shared reference
select.rs[522, 39]: Actual error occurred here
select.rs[560, 9]: Error originated from macro call here
select.rs[575, 9]: Error originated from macro call here
select.rs[575, 9]: Error originated from macro call here
select.rs[599, 9]: Error originated from macro call here
server.rs[42, 10]: data moved here 
// ^ the source line "Some(mut data) = stream.next() => {"

Please post the full error as printed by cargo build.

Here you go:

error[E0507]: cannot move out of a shared reference
  --> src/server.rs:37:3
   |
37 | /   tokio::select! {
38 | |     Some(Signal::Interruption) = s.next() => {
39 | |       trace!(t, "interrupted");
40 | |       break;
41 | |     }
42 | |     Some(mut data) = stream.next() => {
   | |          --------
   | |          |
   | |          data moved here
   | |          move occurs because `data` has type `WebSocketStream<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>`, which does not implement the `Copy` trait
...  |
45 | |     }
46 | |   }
   | |___^
   |
   = note: this error originates in the macro `$crate::select` (in Nightly builds, run with -Z macro-backtrace for more info)

I'm confused, but try removing the mut marker?

That works, but I can't do much with an immutable borrow. Why does this work just fine?

let listener = Listener::new(socket).await?;
let stream = listener.stream().await;
pin_mut!(stream);

while let Some(mut data) = stream.next().await {
  // no problem
}

You can add this line as the first thing in the select! macro:

let mut data = data;

Please file a bug about this.

1 Like

Jesus, was that all that was required? For the love of all that is holy, THANK YOU. That was much easier than I could have ever thought. Where does this come from, though? And how am I able to mutably access something that's supposed to be behind an immutable reference?

I'll file a bug report right away. And thank you a ton once again.

It's not actually behind an immutable reference. The macro is making an immutable reference to it somewhere internally and something went wrong with that reference - not sure what.

Changing mutability of something you have ownership of is always possible.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.