Futures::stream_select! macro doesn't satisfy stream trait

I'm going through some challenge problems from protohackers and came across an issue where the stream_select macro doesn't return a struct that impliments stream. I'm confused, since I'm pretty sure all of the arguments are streams. One guess I have is that it's something to do with unpin, but I'm not sure how that works. any help would be appreciated.

relevant code snippet:

    let tickets = rx.map(|v| ControlFlow::Ticket(v));
    let messages = stream::unfold(tcp_rx, |mut tcp_rx| async move {
        match ClientMsg::read_from(&mut tcp_rx).await {
            Err(e) => Some((ControlFlow::Err(format!("{e}")), tcp_rx)),
            Ok(msg) => Some((ControlFlow::Msg(msg), tcp_rx)),
        }
    });
    let (heartbeats, heart_tx) = heartbeat_stream();
    let mut events = futures::stream_select!(tickets, messages, heartbeats);

    //main event loop
    tokio::spawn(async move { while let Some(event) = events.next().await {} });

here's the code repository if you need full context.

Error msg:

error[E0599]: the method `next` exists for struct `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>`, but its trait bounds were not satisfied
   --> src/main.rs:133:62
    |
130 |     let mut events = futures::stream_select!(tickets, messages, heartbeats);
    |                      ------------------------------------------------------
    |                      |
    |                      method `next` not found for this struct
    |                      doesn't satisfy `_: StreamExt`
    |                      doesn't satisfy `_: Stream`
...
133 |     tokio::spawn(async move { while let Some(event) = events.next().await {} });
    |                                                              ^^^^ method cannot be called on `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>` due to unsatisfied trait bounds
    |
    = note: the following trait bounds were not satisfied:
            `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>: Stream`
            which is required by `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>: StreamExt`
note: the following trait must be implemented
   --> /home/andrew/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:27:1
    |
27  | pub trait Stream {
    | ^^^^^^^^^^^^^^^^

For more information about this error, try `rustc --explain E0599`.
error: could not compile `speed_daemon_06` due to previous error

Some data structure doesn't meet trait bounds.
A trick can be this to manually check and get some more information:

fn check_stream<T: Stream>(_: &T) {}

let mut events = futures::stream_select!(tickets, messages, heartbeats);
check_stream(&events);
error[E0277]: `[async block@src/main.rs:143:64: 163:7]` cannot be unpinned
   --> src/main.rs:120:18
    |
120 |     check_stream(&events);
    |     ------------ ^^^^^^^ within `futures_util::unfold_state::_::__Origin<'_, HeartbeatState, [async block@src/main.rs:143:64: 163:7]>`, the trait `Unpin` is not implemented for `[async block@src/main.rs:143:64: 163:7]`
    |     |
    |     required by a bound introduced by this call
    |
    = note: consider using `Box::pin`
    = help: the trait `Stream` is implemented for `StreamSelect<_0, _1, _2>`
    = note: required because it appears within the type `futures_util::unfold_state::_::__Origin<'_, HeartbeatState, [async block@src/main.rs:143:64: 163:7]>`
    = note: required for `futures_util::unfold_state::UnfoldState<HeartbeatState, [async block@src/main.rs:143:64: 163:7]>` to implement `Unpin`
    = note: required because it appears within the type `futures::stream::unfold::_::__Origin<'_, HeartbeatState, [closure@src/main.rs:143:56: 143:63], [async block@src/main.rs:143:64: 163:7]>`
    = note: required for `futures::stream::Unfold<HeartbeatState, [closure@src/main.rs:143:56: 143:63], [async block@src/main.rs:143:64: 163:7]>` to implement `Unpin`

Then we can Box & Pin the futures:

use futures::Stream;
let mut events = futures::stream_select!(tickets.boxed(), messages.boxed(), heartbeats.boxed());

This time the error is clear:

error[E0310]: the parameter type `impl AsyncReadExt + Unpin + Send` may not live long enough
   --> src/main.rs:118:63
    |
118 |     let mut events = futures::stream_select!(tickets.boxed(), messages.boxed(), heartbeats.boxed());
    |                                                               ^^^^^^^^^^^^^^^^ ...so that the type `impl AsyncReadExt + Unpin + Send` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound...
    |
71  |                                mut tcp_rx: impl AsyncReadExt + Unpin + Send + 'static, roads: Vec<Road>,
    |                                                                             +++++++++
async fn dispatcher_connection(mut tcp_tx: impl AsyncWriteExt + Send + Unpin,
                               mut tcp_rx: impl AsyncReadExt + Unpin + Send + 'static, // here + 'static
                               roads: Vec<Road>, dispatchers: Dispatchers,
                               pending_tickets: PendingTickets) {

Code pass!

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.