I have a process that connects to a websocket. It handles two kinds of things: RPC requests that require a response, and pushed-to-client data. The process takes an mpsc::Sender
for the pushed data and mpsc::Receiver
for RPC requests. RpcRequests contain a oneshot::Sender
to reply on when the matching message is found.
One thing I'd like for it to do is to detect when the socket connection fails and try to reconnect. This should be invisible to code calling this (i.e. they can continue just using the channels they have). Here's what I did:
pub async fn main_loop(url: String, creds: Creds, recv_rpc: mpsc::Receiver<RpcRequest>, send_push: mpsc::Sender<PushedValue>) {
let state_mutex = Arc::new(Mutex::new(WebsocketState {
token: creds.token.to_string(),
email: creds.email.to_string(),
is_ready: false,
open_rpc_requests: HashMap::new()
}));
loop {
match tokio_tungstenite::connect_async(url.to_string()).await {
Ok((wsg_stream, _)) => {
{
let mut state = state_mutex.lock().await;
state.is_ready = true;
}
let (write_ws_stream, read_ws_stream) = wsg_stream.split();
// buffer channels
let (send_outbound, recv_outbound) = mpsc::channel(10);
let (send_inbound, recv_inbound) = mpsc::channel(100);
// handle various tasks
tokio::select! {
// tie the read stream to the read buffer
_ = link_writes(write_ws_stream, recv_outbound) => {
eprintln!("read buffer exited");
},
// tie the write stream to the write buffer
_ = link_reads(read_ws_stream, send_inbound) => {
eprintln!("write buffer exited");
},
// transmit rpc requests to the server
_ = rpc_loop(state_mutex.clone(), recv_rpc, send_outbound.clone()) => {
eprintln!("rpc loop exited");
},
// receive ws messages and handle them, both rpc and pushed
_ = receive_loop(state_mutex.clone(), recv_inbound, send_push.clone()) => {
eprintln!("receive loop exited");
},
// ping regularly to detect if the link is down, exit to force reconnect it is
_ = ping_loop(state_mutex.clone(), send_outbound.clone()) => {
eprintln!("ping loop exited");
},
}
{
let mut state = state_mutex.lock().await;
state.is_ready = false;
}
},
Err(_) => {
let reconnect_timeout = 5;
eprintln!("Failed to connect to websocket. Reconnecting in {} seconds", reconnect_timeout);
time::sleep(Duration::from_secs(reconnect_timeout)).await;
}
}
println!("Reconnecting to websocket");
}
}
This doesn't work because recv_rpc
gets moved into rpc_loop
. Then it's not available for the second iteration of the loop, so this doesn't compile (it works fine without the loop, but obviously doesn't actually reconnect on failures). I totally get why that happens, but what I'm not sure of is how to fix it.
Ideas I tried:
- just use broadcast instead mpsc for the rpc channel. This seems like weird fit to begin with, but it doesn't work for me anyway, because the RpcRequest contains the reply
oneshot::Sender,
and thus can't be Clone - Have
rpc_loop
move the receiver back to the main loop. I don't know how to do do this, though, because insiderpc_loop
because one of the other branches of the select! can trigger first, and I won't get anything back to move
I suspect I'm simply approaching this wrong, or misunderstanding something important. What should I do to fix this?