Moving a Receiver back out of a task

I have a process that connects to a websocket. It handles two kinds of things: RPC requests that require a response, and pushed-to-client data. The process takes an mpsc::Sender for the pushed data and mpsc::Receiver for RPC requests. RpcRequests contain a oneshot::Sender to reply on when the matching message is found.

One thing I'd like for it to do is to detect when the socket connection fails and try to reconnect. This should be invisible to code calling this (i.e. they can continue just using the channels they have). Here's what I did:

pub async fn main_loop(url: String, creds: Creds, recv_rpc: mpsc::Receiver<RpcRequest>, send_push: mpsc::Sender<PushedValue>) {

    let state_mutex = Arc::new(Mutex::new(WebsocketState {
        token: creds.token.to_string(),
        is_ready: false,
        open_rpc_requests: HashMap::new()

    loop {
        match tokio_tungstenite::connect_async(url.to_string()).await {
            Ok((wsg_stream, _)) => {
                    let mut state = state_mutex.lock().await;
                    state.is_ready = true;

                let (write_ws_stream, read_ws_stream) = wsg_stream.split();

                // buffer channels
                let (send_outbound, recv_outbound) =  mpsc::channel(10);
                let (send_inbound, recv_inbound) = mpsc::channel(100);

                // handle various tasks
                tokio::select! {

                    // tie the read stream to the read buffer
                    _ = link_writes(write_ws_stream, recv_outbound) => {
                        eprintln!("read buffer exited");

                    // tie the write stream to the write buffer
                    _ = link_reads(read_ws_stream, send_inbound) => {
                        eprintln!("write buffer exited");

                    // transmit rpc requests to the server
                    _ = rpc_loop(state_mutex.clone(), recv_rpc, send_outbound.clone()) => {
                        eprintln!("rpc loop exited");

                    // receive ws messages and handle them, both rpc and pushed
                    _ = receive_loop(state_mutex.clone(), recv_inbound, send_push.clone()) => {
                        eprintln!("receive loop exited");

                    // ping regularly to detect if the link is down, exit to force reconnect it is
                    _ = ping_loop(state_mutex.clone(), send_outbound.clone()) => {
                        eprintln!("ping loop exited");

                    let mut state = state_mutex.lock().await;
                    state.is_ready = false;
            Err(_) => {
                let reconnect_timeout = 5;
                eprintln!("Failed to connect to websocket. Reconnecting in {} seconds", reconnect_timeout);

        println!("Reconnecting to websocket");

This doesn't work because recv_rpc gets moved into rpc_loop. Then it's not available for the second iteration of the loop, so this doesn't compile (it works fine without the loop, but obviously doesn't actually reconnect on failures). I totally get why that happens, but what I'm not sure of is how to fix it.

Ideas I tried:

  1. just use broadcast instead mpsc for the rpc channel. This seems like weird fit to begin with, but it doesn't work for me anyway, because the RpcRequest contains the reply oneshot::Sender, and thus can't be Clone
  2. Have rpc_loop move the receiver back to the main loop. I don't know how to do do this, though, because inside rpc_loop because one of the other branches of the select! can trigger first, and I won't get anything back to move

I suspect I'm simply approaching this wrong, or misunderstanding something important. What should I do to fix this?

If you want to use a value from several different places, you can't do that by value. You'll have to access it by reference. For accessing by-ref across threads/coroutines, you'll in turn need synchronization, and since mpsc::Receiver is Send but not Sync, you'll have to use a Mutex or something similar for synchronizing concurrent accesses.

Does it work better if you tokio::spawn() all those loops? I'm not clear exactly what error semantics you want here but that would potentially make it easier to shuffle things around while trying things?

@H2CO3 Ok, thanks. I was worried this was the only answer.

TBC, I only need it from one place at a time. But since the task that reads from it needs to be recreated (in this case, so that it can relay the messages to a different upstream sender), I don't quite know how to get it back.

@simonbuchan I don't think tokio::spawn() will help. The problem is basically that I'm tying a receiver to some process, ending the process, and trying to reuse the receiver. Since tokio::spawn() would also require me to move the receiver into the closure I pass in, it will have the same issue.

So here's what I ended up doing:

  1. In a parent method, create Arc<Mutex<Option<...>>>s to hold both the sender and receiver for the RPC requests. Pass both to main_loop
  2. inside that main loop method, create the RPC channel and set the sender and receiver on the mutex
  3. pass the receiver mutex to rpc_loop. rpc_loop will lock the mutex for its entire duration, but when it exits, it will release the lock and allow the next iteration of main_loop to reacquire the lock and swap in a new value
  4. When the loop exists, set the mutexes to None
  5. the callers of this whole shebang has the sender, which is now wrapped in a mutex. This is actually quite nice, because if the link is down, the mutexes will (usually) contain None

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.