I'm using tokio and tungstenite (specifically the package tokio-tungstenite) and I'm struggling to understand the proper way to try reconnecting a websocket without destroying channels. Here's a minimal example that works fine for an initial connection:
async fn connect(rx: UnboundedReceiver<String>) -> () {
use futures::stream::StreamExt;
match connect_async("ws://localhost:8080").await {
Ok((ws_stream, _response)) => {
let (write, mut read) = ws_stream.split();
let mpsc_to_ws = rx.map(WSMessage::from).map(Ok).forward(write);
let ws_to_mpsc = async {
while let Some(message) = read.next().await {
log::debug!("Received {:?}", message);
}
};
pin_mut!(mpsc_to_ws, ws_to_mpsc);
select(mpsc_to_ws, ws_to_mpsc).await;
}
Err(error) => {
log::error!("Unhandled WS msg: {:?}", error);
}
}
log::error!("Connection failed");
}
let (tx, rx) = futures::channel::mpsc::unbounded::<String>();
tokio::spawn(async {
connect(rx).await;
});
tx.unbounded_send(msg);
Now, if connect(rx).await returns an error I'd like to call it again and reuse the same rx but it seems that rx is still owned by this function. Is that because of pin_mut!? What am I missing here?