Tokio: How to pass SinkStream into function thus solve "move occurs because ..., which does not implement the Copy trait"

I'm using tokio_tungstenite to connect to websocket. I'm basing on example code at tokio-tungstenite/client.rs at master · snapview/tokio-tungstenite · GitHub but simplify it. The thing is I would like to implement handling logic for on_message, on_connect, etc just like event-driven way, so I would like to pass the SplitStream (as seen as read variable) around into function.

I'm confusing and not sure what to do to solve this situation.
Please suggest what I should be doing here to pass SplitStream or as of my understanding, any types which don't implement Copy trait when move occurs into the function. Thanks in advance.

use futures_util::{future, pin_mut, StreamExt, SinkExt, stream::SplitStream };
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};

type WssStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://some-url").unwrap();
    // expect() will get Ok value and panic if error
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    println!("Connected to websocket");

    let (mut write, read) = ws_stream.split();

    // subscribe to certain topic
    write.send(Message::Text(r#"{"op": "subscribe", "args": ["topic1"]}\n"#.to_string())).await.unwrap();
    println!("subscribed to trade topic");

    let task1 = tokio::task::spawn(read_new_message(&read));
    
    task1.await;
}

async fn read_new_message(read_sink: &SplitStream<WssStream>) {
    let read_future = read_sink.for_each(|message| async {    // <------ this line
        let mut data = message.unwrap().into_data();
        data.append(&mut "\n".as_bytes().to_vec());
        tokio::io::stdout().write_all(&data).await.unwrap();
    });

    read_future.await;
}

The compiler shouts out error as of

error[E0507]: cannot move out of `*read_sink` which is behind a shared reference
...
^ move occurs because `*read_sink` has type `SplitStream<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>`, which does not implement the `Copy` trait
...

You cannot use a sink or stream given only an immutable reference. Change it to take it by ownership instead:

async fn read_new_message(read_sink: SplitStream<WssStream>) {

Also, I generally recommend avoiding for_each when you can just use a loop. I also simplified the append, which was over-complicated on quite a few levels.

async fn read_new_message(read_sink: SplitStream<WssStream>) {
    while let Some(message) = read_sink.next().await {
        let mut data = message.unwrap().into_data();
        data.push(b'\n');
        tokio::io::stdout().write_all(&data).await.unwrap();
    }
}
1 Like

Thank you very much! That's quite straightforward.
Also that looks cleaner for your simplified suggestion as well. Thanks!

As an aside, it's best to reuse the return value of stdout().

Yeah, I can see that as well. I'll do it. Thanks.

Just for the record, and for further new users reading this code. You would need mut for read_sink. Just a minor stuff, the solution works great already!

So it would be

async fn read_new_message(mut read_sink: SplitStream<WssStream>)

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.