Need Help with Implementing Looping Async Logic

Hi. I'm Hassan. I am building a video streaming server with GStreamer. The video server needs to communicate with a signalling server over websockets for sending over SDP messages and registering session and maintaining it. For a bit of context, it works something like this.

The video server connects over websockets to the signalling server and registers itself with an id. It then waits for client(s) to be connected, which the signalling server will notify it of. A client connects by requesting a room(ID) with the signalling server, then requests the video server to join said room i.e. it creates a conference/meeting session. Any other clients with the meeting ID can join this room and will be able to partake in the meeting.

This is the part that I'm struggling with. Once the meeting is done i.e. all the clients leave, I want the video server to loop back on it's initial state i.e. wait for the clients once again. The relevant code is as follows:

async fn async_main() -> Result<(), anyhow::Error> {
let (mut ws, _) = async_tungstenite::async_std::connect_async(&args.server).await?;

    let connect = async {
        println!("Registering id {} with signalling server", our_id);
        ws.send(WsMessage::Text(format!("Hello {}", our_id)))
            .await?;

        let msg = ws
            .next()
            .await
            .ok_or_else(|| anyhow!("didn't receive anything"))??;

        if msg != WsMessage::Text("Hello".into()) {
            bail!("server didn't say Hello");
        }

        Ok(())
    };
    connect.await?;

    println!("Waiting for peer request");
    while let Some(Ok(msg)) = ws.next().await {
        match msg {
            WsMessage::Text(text) => {
                if text.starts_with("SERVER_JOIN") {
                    let mut split = text["SERVER_JOIN ".len()..].splitn(2, " ");
                    let room_id = split
                        .next()
                        .and_then(|s| str::parse::<u32>(s).ok())
                        .ok_or_else(|| anyhow!("Cannot parse room id"))?;
                    ws.send(WsMessage::Text(format!("ROOM {}", room_id)))
                        .await?;
                    // break;
                }
                if text.starts_with("ROOM_OK") {
                    println!("Joined ROOM {}\n", &args.room_id);
                    break;
                }
            }
            _ => (),
        }
    }
    if let Err(err) = run(_, _, ws).await {
        println!(
            "\nrun() has returned with Error: {}\nProgram is going to exit",
            err
        );
    }
    Ok(())
}

fn main() -> Result<(), anyhow::Error> {
    task::block_on(async_main())
}

The run() function takes ownership of the WebSocketStream (ws) and starts the video server message handling loop which uses futures::select!() inside a loop. It receives messages over an unbounded channel that the application creates and send it over the websocket to the signalling server, which the signalling server then relays to the relevant clients.

Since run() takes ownership of WebSocketStream, I cannot use it in a simple loop since ws.send() requires &mut self and I have to send multiple messages.

Currently, when all client leave, the socket server sends a message to video server notifying it. The message is handled, returning an Err() and the program exits. I want it to implement something so that instead of exiting, the logic loops back to the "waiting for peer request" part.

Please do know that this is my first time creating a post. If there's any information that I may have missed, please let me know and I will provide it.

Any help would be greatly appreciated.