I'm going through some challenge problems from protohackers and came across an issue where the stream_select macro doesn't return a struct that impliments stream. I'm confused, since I'm pretty sure all of the arguments are streams. One guess I have is that it's something to do with unpin, but I'm not sure how that works. any help would be appreciated.
relevant code snippet:
let tickets = rx.map(|v| ControlFlow::Ticket(v));
let messages = stream::unfold(tcp_rx, |mut tcp_rx| async move {
match ClientMsg::read_from(&mut tcp_rx).await {
Err(e) => Some((ControlFlow::Err(format!("{e}")), tcp_rx)),
Ok(msg) => Some((ControlFlow::Msg(msg), tcp_rx)),
}
});
let (heartbeats, heart_tx) = heartbeat_stream();
let mut events = futures::stream_select!(tickets, messages, heartbeats);
//main event loop
tokio::spawn(async move { while let Some(event) = events.next().await {} });
here's the code repository if you need full context.
Error msg:
error[E0599]: the method `next` exists for struct `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>`, but its trait bounds were not satisfied
--> src/main.rs:133:62
|
130 | let mut events = futures::stream_select!(tickets, messages, heartbeats);
| ------------------------------------------------------
| |
| method `next` not found for this struct
| doesn't satisfy `_: StreamExt`
| doesn't satisfy `_: Stream`
...
133 | tokio::spawn(async move { while let Some(event) = events.next().await {} });
| ^^^^ method cannot be called on `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>` due to unsatisfied trait bounds
|
= note: the following trait bounds were not satisfied:
`StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>: Stream`
which is required by `StreamSelect<futures::stream::Map<futures::futures_channel::mpsc::Receiver<Ticket>, [closure@src/main.rs:122:26: 122:29]>, futures::stream::Unfold<impl AsyncReadExt + Unpin + Send, [closure@src/main.rs:123:43: 123:55], impl futures::Future<Output = Option<(ControlFlow, impl AsyncReadExt + Unpin + Send)>>>, impl Stream<Item = ControlFlow>>: StreamExt`
note: the following trait must be implemented
--> /home/andrew/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:27:1
|
27 | pub trait Stream {
| ^^^^^^^^^^^^^^^^
For more information about this error, try `rustc --explain E0599`.
error: could not compile `speed_daemon_06` due to previous error