Trying to understand websocket reconnection with pin_mut! and channels

I'm using tokio and tungstenite (specifically the package tokio-tungstenite) and I'm struggling to understand the proper way to try reconnecting a websocket without destroying channels. Here's a minimal example that works fine for an initial connection:

async fn connect(rx: UnboundedReceiver<String>) -> () {
  use futures::stream::StreamExt;
  match connect_async("ws://localhost:8080").await {
    Ok((ws_stream, _response)) => {
      let (write, mut read) = ws_stream.split();
      let mpsc_to_ws = rx.map(WSMessage::from).map(Ok).forward(write);
      let ws_to_mpsc = async {
        while let Some(message) = read.next().await {
          log::debug!("Received {:?}", message);
        }
      };

      pin_mut!(mpsc_to_ws, ws_to_mpsc);
      select(mpsc_to_ws, ws_to_mpsc).await;
    }
    Err(error) => {
      log::error!("Unhandled WS msg: {:?}", error);
    }
  }
  log::error!("Connection failed");
}

let (tx, rx) = futures::channel::mpsc::unbounded::<String>();
tokio::spawn(async {
  connect(rx).await;
});
tx.unbounded_send(msg);

Now, if connect(rx).await returns an error I'd like to call it again and reuse the same rx but it seems that rx is still owned by this function. Is that because of pin_mut!? What am I missing here?

Which failure would you like to handle? A pin_mut will internally shadow the variable with a pinned reference to it.

If the connection is successful it hits the Ok branch of the match and spends time there processing new messages. So any time this function returns it means there has been an irrecoverable error and we need to try again. So in pseudo code:

let (tx, rx) = futures::channel::mpsc::unbounded::<String>();
tokio::spawn(async {
  loop {
    connect(rx).await;
  }
});

would work fine if I can figure out the correct ownership of rx to allow it.

I think the easiest solution would be to just use async-channel, which allows you to clone the receiver.

1 Like