Hello, I wonder why stream has a method called close() when it can't be used under any circumstance.
Tried doing stream.by_ref().split() but it results that you can't borrow mutably multiple times.
let (mut stream, _) = connect_async(&base).await.unwrap();
let (write, read) = stream.split();
stream.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();
--> src\main.rs:58:5
|
54 | let (mut stream, _) = connect_async(&base).await.unwrap();
| ---------- move occurs because `stream` has type `WebSocketStream<async_tungstenite::stream::Stream<TokioAdapter<tokio::net::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>`, which
does not implement the `Copy` trait
55 |
56 | let (write, read) = stream.split();
| ------- `stream` moved due to this method call
57 |
58 | stream.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
|
note: `futures::StreamExt::split` takes ownership of the receiver `self`, which moves `stream`
You can (&mut stream).split() and drop the halves later to free stream back up, if things like borrow check allow it. (This may be equivalent what you attempted with by_ref(), I'm not sure.)
Wow this is very unintuitive. First split to even use the stream then reunite so that its methods can be used again.
Seems to work so far. But very confusing.
let (stream, _) = connect_async(&base).await.unwrap();
let (write, read) = stream.split();
let mut x = write.reunite(read).unwrap();
x.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();
now when using read before reunite, it moves again.
56 | let (write, read) = stream.split();
| ---- move occurs because `read` has type `SplitStream<WebSocketStream<async_tungstenite::stream::Stream<TokioAdapter<tokio::net::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>>`, which does not implement the `Copy` trait
...
60 | read.for_each(|message| async {
| __________-
61 | |
62 | | dbg!(&message);
63 | |
... |
84 | |
85 | | }).await;
| |______- `read` moved due to this method call
86 |
87 | let mut x = write.reunite(read).unwrap();
| ^^^^ value used here after move
|
note: `futures::StreamExt::for_each` takes ownership of the receiver `self`, which moves `read`
This told me that if S: Stream + Unpin + ?Sized, then &mut S: Stream as well. That's what clued me into one way to not consume your base stream. It comes up with iterators too, for example.
let mut iter = [/* ... */].into_iter();
for item in &mut iter {
if some_condition() { break; }
}
// Can still use `iter` here
So, read and &mut read both implement Stream and we want to call
<&mut TheReadType as StreamExt>::for_each(&mut read, |_| ...)
and not
<TheReadType as StreamExt>::for_each(read, |_| ...)
TheReadType is the first candidate, and so <TheReadType as StreamExt>::for_each is selected. But with this code:
(&mut read).for_each(|_| ...)
&mut TheReadType is the first candidate, and so <&mut TheReadType as StreamExt>::for_each is selected. (And like @mdHMUpeyf8yluPfXI said, you need the parenthesis for precedence.)
Alternatively you could do this, which is arguably cleaner.