I'm using tokio_tungstenite
to connect to websocket. I'm basing on example code at tokio-tungstenite/client.rs at master · snapview/tokio-tungstenite · GitHub but simplify it. The thing is I would like to implement handling logic for on_message
, on_connect
, etc just like event-driven way, so I would like to pass the SplitStream
(as seen as read
variable) around into function.
I'm confusing and not sure what to do to solve this situation.
Please suggest what I should be doing here to pass SplitStream
or as of my understanding, any types which don't implement Copy
trait when move occurs into the function. Thanks in advance.
use futures_util::{future, pin_mut, StreamExt, SinkExt, stream::SplitStream };
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
type WssStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://some-url").unwrap();
// expect() will get Ok value and panic if error
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("Connected to websocket");
let (mut write, read) = ws_stream.split();
// subscribe to certain topic
write.send(Message::Text(r#"{"op": "subscribe", "args": ["topic1"]}\n"#.to_string())).await.unwrap();
println!("subscribed to trade topic");
let task1 = tokio::task::spawn(read_new_message(&read));
task1.await;
}
async fn read_new_message(read_sink: &SplitStream<WssStream>) {
let read_future = read_sink.for_each(|message| async { // <------ this line
let mut data = message.unwrap().into_data();
data.append(&mut "\n".as_bytes().to_vec());
tokio::io::stdout().write_all(&data).await.unwrap();
});
read_future.await;
}
The compiler shouts out error as of
error[E0507]: cannot move out of `*read_sink` which is behind a shared reference
...
^ move occurs because `*read_sink` has type `SplitStream<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>`, which does not implement the `Copy` trait
...