How to prevent StreamExt split() from moving a value?

Hello, I wonder why stream has a method called close() when it can't be used under any circumstance.
Tried doing stream.by_ref().split() but it results that you can't borrow mutably multiple times.

    let (mut stream, _) = connect_async(&base).await.unwrap();

    let (write, read) = stream.split();

    stream.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();

    --> src\main.rs:58:5
     |
54   |     let (mut stream, _) = connect_async(&base).await.unwrap();
     |          ---------- move occurs because `stream` has type `WebSocketStream<async_tungstenite::stream::Stream<TokioAdapter<tokio::net::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>`, which 
does not implement the `Copy` trait
55   |
56   |     let (write, read) = stream.split();
     |                                ------- `stream` moved due to this method call
57   |
58   |     stream.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();
     |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
     |
note: `futures::StreamExt::split` takes ownership of the receiver `self`, which moves `stream`

You can just not split the stream.

You can reuinite the stream.

You can (&mut stream).split() and drop the halves later to free stream back up, if things like borrow check allow it. (This may be equivalent what you attempted with by_ref(), I'm not sure.)

1 Like

Wow this is very unintuitive. First split to even use the stream then reunite so that its methods can be used again.
Seems to work so far. But very confusing.

    let (stream, _) = connect_async(&base).await.unwrap();

    let (write, read) = stream.split();

    let mut x = write.reunite(read).unwrap();

    x.close(Some(CloseFrame { code: CloseCode::Normal, reason: String::from("").into() })).await.unwrap();
    

now when using read before reunite, it moves again.

56   |       let (write, read) = stream.split();
     |                   ---- move occurs because `read` has type `SplitStream<WebSocketStream<async_tungstenite::stream::Stream<TokioAdapter<tokio::net::TcpStream>, TokioAdapter<tokio_native_tls::TlsStream<tokio::net::TcpStream>>>>>`, which does not implement the `Copy` trait
...
60   |       read.for_each(|message| async {
     |  __________-
61   | |
62   | |         dbg!(&message);
63   | |
...    |
84   | |
85   | |     }).await;
     | |______- `read` moved due to this method call
86   |
87   |       let mut x = write.reunite(read).unwrap();
     |                                 ^^^^ value used here after move
     |
note: `futures::StreamExt::for_each` takes ownership of the receiver `self`, which moves `read`

Perhaps this?

let (write, mut read) = stream.split();
(&mut read).for_each(|message| async { /* ... */ }).await;
let mut x = write.reuinte(read).unwrap();

I avoided the split and used "by_ref()" for the loop. This is just part of some small study to get used to this language. thank you.

could you explain how this (&mut read) pattern works? Why are the parentheses important when doing this?

Because otherwise it's parsed like this: &mut (read.foreach ...). It's a matter of operator precedence.

1 Like

This told me that if S: Stream + Unpin + ?Sized, then &mut S: Stream as well. That's what clued me into one way to not consume your base stream. It comes up with iterators too, for example.

let mut iter = [/* ... */].into_iter();
for item in &mut iter {
    if some_condition() { break; }
}
// Can still use `iter` here

So, read and &mut read both implement Stream and we want to call

<&mut TheReadType as StreamExt>::for_each(&mut read, |_| ...)

and not

<TheReadType as StreamExt>::for_each(read, |_| ...)

so that we don't give away ownership of read. If you read up on how method resolution works, you'll find that with this code:

read.for_each(|_| ...)

TheReadType is the first candidate, and so <TheReadType as StreamExt>::for_each is selected. But with this code:

(&mut read).for_each(|_| ...)

&mut TheReadType is the first candidate, and so <&mut TheReadType as StreamExt>::for_each is selected. (And like @jendrikw said, you need the parenthesis for precedence.)

Alternatively you could do this, which is arguably cleaner.

    StreamExt::for_each(&mut read, |_msg| async {}).await;

In this case, the compiler infers you wanted <&mut TheReadType as StreamExt>::for_each from the &mut read you pass in.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.